Вопрос или проблема
Обзор
Мне нужно обновить данные в базе данных, принадлежащей мне, с данными, которые извлекаются с сервера Snowflake. Я извлекаю данные за конкретную дату из Snowflake и нуждаюсь в их вставке в локальную базу данных на эту дату. Эта операция в настоящее время занимает 273 секунды, и я должен оптимизировать ее как можно больше. Все это делается на Python с использованием библиотеки pyodbc
для вставки в локальную базу данных и sqlalchemy
для извлечения данных из Snowflake.
Подробный процесс
Я использую cProfile
в сочетании с Snakeviz
для профилирования и визуализации производительности моей операции, которая выполняется для конкретной даты и называется update_snf
. Весь процесс состоит из следующих шагов:
- Создание соединения и выполнение SQL-запроса для извлечения данных с сервера Snowflake. Назовем эту операцию
get_data_from_snf
. Выполнение этой операции занимает 111 секунд, из которых большинство (108 секунд) занимает вызов функцииPandas.read_sql(query, connection)
(определенной вpandas/io/sql.py
), которая выполняет запрос и возвращает результат в датафреймеdf
. - Форматирование
df
, что занимает относительно меньше времени (17.8 секунд) - Вставка отформатированного датафрейма
df
в мою локальную базу данных (назовем эту функциюinsert_data_into_db
). Эта операция занимает большинство времени (143 секунды, из которых 93.1 секунды занимает методexecutemany
объектаpyodbc.Cursor
и 43.3 секунды методомexecute
). Назовем таблицу, в которую происходит эта вставкаtable
.table
— это довольно большая таблица с примерно 150 миллионами кортежей и 130 столбцами.
Я пытаюсь сократить время выполнения всей этой операции любым возможным способом, оптимизируя любой или даже все шаги. Ожидается, что df
будет содержать около 200,000 строк для каждой даты.
Текущая попытка
Используются довольно базовые API Python и в данный момент не используется многопоточность или многопроцессорность. Вставка данных в очень большую table
занимала неприемлемое количество времени, поэтому сейчас используется альтернатива, которая реализует insert_data_into_db
следующим образом:
- Создается новая таблица под названием
table_temp
(эта таблица уже существует в базе данных; она не создается заново каждый раз, когда функция запускается) - Для выполнения следующего SQL-запроса используется
cursor.execute
:
TRUNCATE TABLE table_temp
DELETE FROM table where Date = {date}
Операция удаления необходима для обеспечения того, чтобы только обновленные записи из Snowflake присутствовали на локальном сервере и не было дублирования. Она фактически удаляет устаревшие данные перед тем, как новые данные могут быть вновь вставлены.
3. df
разделяется на части по 18000 строк и cursor.fast_executemany
устанавливается в True
. Теперь мы используем cursor.executemany(insert_query, df_chunk)
для вставки каждой из частей (список кортежей) по одной в table_temp
.
4. Мы теперь вставляем всю table_temp
(которая содержит все записи для конкретной даты из Snowflake) в table
используя:
INSERT INTO table
SELECT * FROM table_temp
Таким образом, вместо того чтобы вставлять данные непосредственно в table
, мы сначала вставляем их частями во временную таблицу, а затем вставляем всю временную таблицу в table
.
Вопросы в этом текущем подходе:
- Есть ли способ избежать подхода с разбивкой и напрямую вставить в
table
, который не занимает слишком много времени? Похоже, что время, необходимое для этого прямого вставления, увеличивается по мере того, какtable
становится больше, и при текущем размере в 150 миллионов строк это занимает слишком много времени. - Можем ли мы использовать параллелизм в форме многопроцессорности или многопоточности в любом месте этого подхода?
- Можем ли мы использовать стриминг где-то здесь? Если да, то как?
- Можем ли мы ускорить извлечение данных из Snowflake в датафрейм в памяти перед вставкой датафрейма в базу данных? Есть ли способ избежать этого вообще?
Я использую Python 3.11 на сервере Windows 8 с использованием SQL Server. Все оптимизации приветствуются! Большое спасибо!
Ответ или решение
Вот несколько рекомендаций о том, как снизить время, необходимое для вставки большого количества данных из DataFrame в большую таблицу базы данных SQL Server, с учетом вашего процесса и используемых технологий.
1. Оптимизация запроса к Snowflake
Скорость получения данных:
- Используйте подстановочные параметры: Вместо использования
pandas.read_sql(query, connection)
, попробуйте использовать подготовленные заявления и подстановочные параметры. Это может ускорить выполнение запроса. - Настройка размера выборки: Проверьте, есть ли возможность оптимизации размера выборки в запросе. Возможно, вы сможете уменьшить количество возвращаемых столбцов и строк, если какие-то данные не нужны.
2. Оптимизация процесса вставки в SQL Server
Временная таблица
Уже используемый вами подход с временной таблицей полезен, но его можно улучшить:
-
Bulk Insert: Если ожидается, что данные могут быть достаточно большими, используйте
BULK INSERT
для вставки данных из файла CSV или других форматов. Для этого сначала экспортируйте DataFrame в CSV, а затем используйте соответствующий SQL Server запрос для загрузки. -
Batch Insert: Вместо
executemany
можно использовать пакетные вставки. Однако вам нужно будет проверить ограничения на максимальный размер пакета. - Параметр «TABLOCK»: Используйте режим блокировки таблицы (
TABLOCK
) при взаимодействии с большими таблицами, чтобы улучшить производительность вставки.
INSERT INTO table WITH (TABLOCK)
SELECT * FROM table_temp;
Параллелизм
-
Многопоточность/Мультипроцесс: Для вставки данных можно использовать Python библиотеку
concurrent.futures
. Вы можете разбить DataFrame на части и использовать несколько потоков или процессов для вставки данных параллельно. - Курсор-содержимое в мультипроцессинге: Каждое окно обработки может работать с отдельной частью данных, что значительно ускорит общий процесс.
3. Использование стриминга данных
Стриминг данных означает обработку данных по частям, как только они будут доступны:
-
Итерация по DataFrame: Используйте метод
DataFrame.iterrows()
илиDataFrame.itertuples()
для построчного извлечения данных, и сразу же вставляйте каждую строку в базу данных. Это не всегда бывает быстрее, но может снизить потребление памяти. - SQLAlchemy для вставки: Если вы используете SQLAlchemy, функция
to_sql()
может быть полезной, потому что она поддерживает вставку данных по частям.
4. Прочие рекомендации
-
Индексы и триггеры: Убедитесь, что индексы на целевой таблице останавливаются во время вставки больших объемов данных (отключите их) и восстановите их после завершения вставки. Также отключите триггеры во время этих операций.
-
Параметры настройки SQL Server: Рассмотрите возможность изменения настроек SQL Server, таких как размер страницы или конфигурация памяти, чтобы лучше обрабатывать большие объемы данных.
- Мониторинг и оптимизация: Используйте SQL Server Profiler или другие средства для анализа времени выполнения запросов и оптимизации производительности.
Эти изменения могут помочь вам значительно уменьшить время, необходимое для вставки данных. Каждая ситуация уникальна, поэтому важно протестировать различные подходы и определить, какие из них лучше всего работают в вашей конкретной среде.