Вопрос или проблема
Я обрабатываю некоторые CSV файлы, которые не помещаются в оперативную память.
Оба CSV файла имеют следующую структуру:
first.csv
id | name | timestamp |
---|---|---|
serial | str | yyyy-mm-dd hh:mm:ss |
second.csv
id | name | date |
---|---|---|
serial | str | yyyy-mm-dd |
Цель заключается в том, чтобы выбрать строки из first.csv
, которые соответствуют некоторым критериям по сравнению с second.csv
:
name
равенtimestamp
находится в диапазоне [date
-1,date
+1].
После перебора всех этих строк вывод может быть объединен в один выходной файл.
Я не знаю, что возможно для этого в оболочке, но считаю, что это будет сложно написать и очень трудно читать (а возможно, и модифицировать) позже.
Я протестировал Go по сравнению с awk для базовых задач CSV (выбор/удаление столбцов, фильтрация строк), и Go работает быстрее (иногда “значительно” быстрее).
Для вашего поста я создал тестовый файл, который содержит 8 640 001 строк и ~271 МБ, а затем создал два примера обработчиков, один на Python и один на Go, которые используют модель “запись по мере чтения”, чтобы избежать промежуточного хранения (и оба используют буферизованный ввод-вывод, что даёт прирост эффективности при работе с большими файлами).
- Скрипт на Python: выполнялся за ~70 секунд и использовал ~6.5 МБ памяти
- Бинарный файл Go: выполнялся за ~3.5 секунды и использовал ~10 МБ памяти
Но прежде всего, выполняет ли это свою задачу?
Основная настройка
Я создал эти два небольших образца для разработки:
first.csv
id,name,timestamp
1,foo,2000-01-01 00:00:00
2,foo,2000-01-02 00:00:00
3,foo,2000-01-03 00:00:00
4,foo,2000-01-04 00:00:00
5,foo,2000-01-05 00:00:00
6,bar,2000-02-01 00:00:00
7,bar,2000-02-02 00:00:00
8,bar,2000-02-03 00:00:00
9,bar,2000-02-04 00:00:00
second.csv
id,name,date
10,foo,2000-01-03
11,bar,2000-02-02
Не было ясно, что значит “date-1” и “date+1”, поэтому я предположил, что вы хотели “плюс-минус один день”.
Когда я запускаю код на Go или Python с этими файлами, я получаю:
2,foo,2000-01-02 00:00:00
3,foo,2000-01-03 00:00:00
4,foo,2000-01-04 00:00:00
6,bar,2000-02-01 00:00:00
7,bar,2000-02-02 00:00:00
8,bar,2000-02-03 00:00:00
что соответствует тому, чего я ожидаю исходя из моего толкования ваших требований и входных данных:
foo 2000-01-03
и bar 2000-02-02
Тестовый файл
Я создал этот генератор тестов, который создает записи только для foo, с интервалом в 1 секунду, на протяжении 100 дней:
import csv
from datetime import datetime, timedelta
dt_start = datetime(2000, 1, 1)
with open('test.csv', 'w', newline="") as f:
writer = csv.writer(f)
writer.writerow(['id', 'name', 'timestamp'])
# 1 строка в секунду на протяжении 100 дней
for i in range(86400 * 100):
plus_secs = timedelta(seconds=i + 1)
writer.writerow([i + 1, 'foo', dt_start + plus_secs])
Вот как выглядит test.csv:
% ll test.csv
-rw-r--r-- 1 alice bob 271M Nov 19 22:19 test.csv
% wc -l test.csv
8640001 test.csv
Свяжите тестовый файл с first.csv, ln -fs test.csv first.csv
, и я готов запустить следующее…
Python
import csv
import sys
from datetime import datetime, timedelta
DATE_FMT = f'%Y-%m-%d'
DATETIME_FMT = f'%Y-%m-%d %H:%M:%S'
# Создать таблицу для поиска из second
# {name: [date-1day, date+1day]}
lookup = {}
with open('second.csv', newline="") as f:
reader = csv.reader(f)
header = next(reader)
nm_col = header.index('name')
dt_col = header.index('date')
for row in reader:
name = row[nm_col]
dt_str = row[dt_col]
dt = datetime.strptime(dt_str, DATE_FMT)
min_dt = dt - timedelta(days=1)
max_dt = dt + timedelta(days=1) # - timedelta(seconds=1)
lookup[name] = [min_dt, max_dt]
# Создать записыватель по требованию и перебрать first, записывая, когда это необходимо
writer = csv.writer(sys.stdout)
with open('first.csv', newline="") as f:
reader = csv.reader(f)
header = next(reader)
nm_col = header.index('name')
dt_col = header.index('timestamp')
writer.writerow(header)
for row in reader:
name = row[nm_col]
if name not in lookup:
continue
dt_str = row[dt_col]
dt = datetime.strptime(dt_str, DATETIME_FMT)
min_dt = lookup[name][0]
max_dt = lookup[name][1]
if dt < min_dt or dt > max_dt:
continue
writer.writerow(row)
И запустите скрипт:
% time python3 main.py > result.csv
python3 main.py > result.csv 69.93s user 0.40s system 98% cpu 1:11.07 total
% head -n5 result.csv
id,name,timestamp
86400,foo,2000-01-02 00:00:00
86401,foo,2000-01-02 00:00:01
86402,foo,2000-01-02 00:00:02
86403,foo,2000-01-02 00:00:03
% tail -n5 result.csv
259196,foo,2000-01-03 23:59:56
259197,foo,2000-01-03 23:59:57
259198,foo,2000-01-03 23:59:58
259199,foo,2000-01-03 23:59:59
259200,foo,2000-01-04 00:00:00 # это правильно?
Что мне кажется правильным: только записи за 48-часовой промежуток, сосредоточенные вокруг даты поиска. Я не уверен насчет последней найденной записи, которая из первого мгновения четвертого — это то, о чем идет речь в закомментированной части - timedelta(seconds=1)
.
Go
package main
import (
"encoding/csv"
"io"
"os"
"time"
)
type LookupEntry struct {
oneDayBefore time.Time
oneDayAfter time.Time
}
const DATE_FMT = "2006-01-02"
const DATETIME_FMT = "2006-01-02 15:04:05"
var lookup = make(map[string]LookupEntry)
func main() {
makeLookupTable()
findMatchingEntries()
}
func makeLookupTable() {
f, _ := os.Open("second.csv")
defer f.Close()
r := csv.NewReader(f)
r.Read() // Удаляем заголовок
for {
record, err := r.Read()
if err == io.EOF {
break
}
dt, _ := time.Parse(DATE_FMT, record[2])
oneDayBefore := dt.AddDate(0, 0, -1)
oneDayAfter := dt.AddDate(0, 0, 1) // .Add(-time.Millisecond * 1000)
lookup[record[1]] = LookupEntry{oneDayBefore, oneDayAfter}
}
}
func findMatchingEntries() {
f1, _ := os.Open("first.csv")
defer f1.Close()
w := csv.NewWriter(os.Stdout)
r := csv.NewReader(f1)
header, _ := r.Read()
w.Write(header)
for {
record, err := r.Read()
if err == io.EOF {
break
}
lookupEntry, ok := lookup[record[1]]
if !ok {
continue
}
dt, _ := time.Parse(DATETIME_FMT, record[2])
if dt.Before(lookupEntry.oneDayBefore) || dt.After(lookupEntry.oneDayAfter) {
continue
}
w.Write(record)
}
w.Flush()
}
Соберите и запустите тест:
% go build main.go
% time ./main > result.csv
./main > result.csv 3.53s user 0.14s system 104% cpu 3.504 total
% head -n5 result.csv
86400,foo,2000-01-02 00:00:00
86401,foo,2000-01-02 00:00:01
86402,foo,2000-01-02 00:00:02
86403,foo,2000-01-02 00:00:03
86404,foo,2000-01-02 00:00:04
% tail -n5 result.csv
259196,foo,2000-01-03 23:59:56
259197,foo,2000-01-03 23:59:57
259198,foo,2000-01-03 23:59:58
259199,foo,2000-01-03 23:59:59
259200,foo,2000-01-04 00:00:00
Ответ или решение
Фильтрация очень большого CSV на основе значений из другого CSV
В этой статье мы разберем, как эффективно фильтровать данные из одного огромного CSV-файла, используя значения из другого CSV-файла, при этом все операции будут производиться в памяти с использованием минимальных ресурсов. Чаще всего подобные задачи возникают в обработке больших объемов данных, когда файлы не помещаются в оперативную память. Мы рассмотрим как на Python, так и на Go, чтобы пользователи могли выбрать подходящий им инструмент.
Структура файлов CSV
first.csv
Файл first.csv
содержит следующие колонки:
id
(серийный номер)name
(строка)timestamp
(датавремя в форматеyyyy-mm-dd hh:mm:ss
)
id,name,timestamp
1,foo,2000-01-01 00:00:00
...
second.csv
Файл second.csv
содержит следующие колонки:
id
(серийный номер)name
(строка)date
(дата в форматеyyyy-mm-dd
)
id,name,date
10,foo,2000-01-03
...
Условия фильтрации
Необходимо выбрать строки из first.csv
, которые соответствуют следующим критериям:
name
равен значению изsecond.csv
timestamp
находится в диапазоне [date-1 день, date+1 день]
Реализация на Python
Используя Python, мы создадим словарь для быстрого поиска диапазонов дат, затем будем построчно читать first.csv
и отбирать подходящие записи.
import csv
import sys
from datetime import datetime, timedelta
DATE_FMT = '%Y-%m-%d'
DATETIME_FMT = '%Y-%m-%d %H:%M:%S'
# Создание словаря для второго файла
lookup = {}
with open('second.csv', newline='') as f:
reader = csv.reader(f)
header = next(reader)
nm_col = header.index('name')
dt_col = header.index('date')
for row in reader:
name = row[nm_col]
dt_str = row[dt_col]
dt = datetime.strptime(dt_str, DATE_FMT)
min_dt = dt - timedelta(days=1)
max_dt = dt + timedelta(days=1)
lookup[name] = [min_dt, max_dt]
# Запись результатов из первого файла на основе условий
writer = csv.writer(sys.stdout)
with open('first.csv', newline='') as f:
reader = csv.reader(f)
header = next(reader)
nm_col = header.index('name')
dt_col = header.index('timestamp')
writer.writerow(header)
for row in reader:
name = row[nm_col]
if name not in lookup:
continue
dt_str = row[dt_col]
dt = datetime.strptime(dt_str, DATETIME_FMT)
min_dt, max_dt = lookup[name]
if dt < min_dt or dt > max_dt:
continue
writer.writerow(row)
Реализация на Go
Go обеспечивает более высокую производительность, поэтому рассмотрим реализацию на этом языке.
package main
import (
"encoding/csv"
"io"
"os"
"time"
)
type LookupEntry struct {
oneDayBefore time.Time
oneDayAfter time.Time
}
const DATE_FMT = "2006-01-02"
const DATETIME_FMT = "2006-01-02 15:04:05"
var lookup = make(map[string]LookupEntry)
func main() {
makeLookupTable()
findMatchingEntries()
}
func makeLookupTable() {
f, _ := os.Open("second.csv")
defer f.Close()
r := csv.NewReader(f)
r.Read() // Пропускаем заголовок
for {
record, err := r.Read()
if err == io.EOF {
break
}
dt, _ := time.Parse(DATE_FMT, record[2])
oneDayBefore := dt.AddDate(0, 0, -1)
oneDayAfter := dt.AddDate(0, 0, 1)
lookup[record[1]] = LookupEntry{oneDayBefore, oneDayAfter}
}
}
func findMatchingEntries() {
f1, _ := os.Open("first.csv")
defer f1.Close()
w := csv.NewWriter(os.Stdout)
r := csv.NewReader(f1)
header, _ := r.Read()
w.Write(header)
for {
record, err := r.Read()
if err == io.EOF {
break
}
lookupEntry, ok := lookup[record[1]]
if !ok {
continue
}
dt, _ := time.Parse(DATETIME_FMT, record[2])
if dt.Before(lookupEntry.oneDayBefore) || dt.After(lookupEntry.oneDayAfter) {
continue
}
w.Write(record)
}
w.Flush()
}
Заключение
Оба предложенных метода позволяют эффективно фильтровать данные в больших CSV-файлах. Python обеспечивает простоту и читаемость кода, в то время как Go предлагает быстрые и оптимизированные решения для обработки данных. Выбор языка зависит от специфики вашей задачи — быстродействия или удобочитаемости. Ключевым моментом является использование памяти: оба подхода поощряют работу по принципу «чтение по мере записи», что критично при работе с объемными файлами.