Как загрузить DataFrame Pandas в таблицу Postgres в Airflow?

Вопрос или проблема

Мне нужно создать задачу в Airflow, которая загружает данные из одной базы данных Postgres и загружает их в другую.

Мой код выглядит следующим образом:

def task_data_upload():

    postgres_sql_download = PostgresHook(postgres_conn_id="a", schema="b")
    data = postgres_sql_download.get_pandas_df(''' SELECT * FROM table_name1 ''')

    postgres_sql_upload = PostgresHook(postgres_conn_id="c", schema="d")
    postgres_sql_upload.bulk_load('table_name2', data)

Таблица table_name2 создаётся на предыдущей задаче, postgres_conn_id/схемы в порядке, и get_pandas_df работает. Проблема, похоже, заключается в последней строке – она не загружает dataframe.

Есть идеи, как это сделать?

Короче говоря, это можно сделать так:

postgres_sql_upload = PostgresHook(postgres_conn_id='c', schema="d") 
postgres_sql_upload.insert_rows('table_name2', data)

Я выбрал немного другой подход:

  • Pandas DataFrames предоставляют метод to_sql, который принимает sql alchemy engine как аргумент.

В коде это выглядело так:

postgres_hook = PostgresHook(postgres_conn_id="conn_id")
df.to_sql('data_table', postgres_hook.get_sqlalchemy_engine(), if_exists="replace", chunksize=1000)

Как видите, я предоставил два дополнительных параметра:

  • if exists обрабатывает, что делать, если таблица/данные уже существуют
  • chunk size коммитит строки партиями, что отлично, если у вас большие объемы данных

Вы можете использовать Postgres Operator в Airflow для загрузки Pandas DataFrame в таблицу Postgres.
шаги для выполнения

  1. Установите библиотеку psycopg2, которая необходима для подключения к Postgres из Python.
  2. Импортируйте необходимые библиотеки:
  3. import pandas as pd
    from airflow.providers.postgres.operators.postgres import PostgresOperator
  4. Загрузите ваши данные в Pandas DataFrame. Например, предположим, что ваши данные находятся в CSV-файле с названием data.csv:
    df = pd.read_csv(‘data.csv’)
  5. Определите SQL-оператор для создания таблицы в Postgres. Имена столбцов и типы данных в SQL-операторе должны совпадать со столбцами в вашем Pandas DataFrame,

Ответ или решение

Как загрузить DataFrame Pandas в таблицу Postgres в Airflow?

Загрузка DataFrame Pandas в таблицу Postgres через Apache Airflow может быть выполнена несколькими способами. Давайте разберем процесс пошагово, учитывая все детали и контекст вашего запроса.

1. Установка необходимых библиотек

Для начала необходимо установить библиотеку psycopg2, которая требуется для подключения к PostgreSQL из Python. Вы можете установить её с помощью pip:

pip install psycopg2-binary

2. Импорт необходимых библиотек

В вашем Airflow DAG (Directed Acyclic Graph) вам необходимо импортировать библиотеки, которые вы будете использовать:

import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

3. Загрузка данных в DataFrame Pandas

Предположим, что данные находятся в таблице PostgreSQL. Используя PostgresHook, вы можете загружать данные в DataFrame:

def task_data_download():
    postgres_hook = PostgresHook(postgres_conn_id='a', schema='b')
    data = postgres_hook.get_pandas_df(''' SELECT * FROM table_name1 ''')
    return data

4. Загрузка DataFrame в таблицу PostgreSQL

Используя метод to_sql, вы можете загрузить данные из DataFrame в таблицу PostgreSQL. Вам понадобится SQLAlchemy, чтобы получить соединение:

def task_data_upload(df):
    postgres_hook = PostgresHook(postgres_conn_id='c', schema='d')
    engine = postgres_hook.get_sqlalchemy_engine()

    # Параметр if_exists определяет поведение при существовании таблицы:
    # 'replace' - заменяет таблицу, 'append' - добавляет данные в существующую таблицу
    df.to_sql('table_name2', engine, if_exists='replace', chunksize=1000, index=False)

5. Определение DAG и задач

Теперь вы можете определить ваш DAG и связать задачи:

with DAG('data_transfer_dag',
         start_date=datetime(2023, 10, 1),
         schedule_interval='@daily') as dag:

    download_task = PythonOperator(
        task_id='download_data',
        python_callable=task_data_download
    )

    upload_task = PythonOperator(
        task_id='upload_data',
        python_callable=task_data_upload,
        op_kwargs={'df': download_task.output}  # Передача DataFrame как аргумент
    )

    download_task >> upload_task  # Устанавливаем порядок выполнения

Заключение

Теперь у вас есть полный процесс загрузки данных из одной таблицы PostgreSQL в другую через Apache Airflow, используя Pandas DataFrame. Ваша задача загрузки будет состоять из двух шагов: сначала вы получите данные, а затем загрузите их в нужную таблицу.

При использовании метода to_sql у вас есть дополнительные параметры, такие как if_exists, который позволяет вам управлять тем, что делать, если таблица уже существует, и chunksize, который позволяет загружать данные частями для повышения производительности. Это удобно для работы с большими объемами данных.

Этот подход обеспечивает гибкость и удобство при работе с данными в рамках рабочих процессов Airflow, а также позволяет легко интегрировать данные из различных источников.

Оцените материал
Добавить комментарий

Капча загружается...