Как получить результат задачи и использовать его в DAG Airflow

Вопрос или проблема

Я использую Airflow 2.10.2. Я определяю DAG, который в основном выполняет 2 задачи.

  1. Некоторую основную логику, которая создает кучу файлов
  2. Запускает параллельно (порядок не важен) набор ‘рабочих’ задач

Поскольку первый шаг требует высокой вычислительной мощности, я определяю его как задачу, которая возвращает количество созданных файлов. Это число важно, потому что мне нужно запустить столько рабочих задач, сколько создано файлов. Мой код выглядит примерно так:

@dag
def my_dag:
    
    @task  
    def create_files():
        ...
        return count

    count = create_files()
    
    for i in range(count):
        worker.override()()

my_dag()

Я не понимаю, как мне получить значение count в определении моего DAG. Я, вероятно, просто не понял, как работают Xcoms (так как я в основном полагался на API TaskFlow), но любая помощь будет оценена.

Я пытался получить значение с помощью xcoms_pull:

count.xcoms_pull(task_ids="create_files")
count['ti'].xcoms_pull(task_ids="create_files")

И без xcoms_pull:

count['ti']

Но ни один из них не работает, поскольку XComs определены для межзадачной коммуникации.
Сообщение об ошибке:

AttributeError: 'XComArg' объект count не имеет атрибута 'xcoms_pull'.

Теперь я увидел, что XComs – это просто специальный случай переменных, но в основном нет документации об этом (по крайней мере, я не нашел).

Ответ или решение

Для решения вашей задачи в Apache Airflow 2.10.2, касающейся возврата результата задачи и его использования в DAG (Directed Acyclic Graph), вы правильно обратили внимание на важность управления XCom. XCom (от "Cross Communication") используется для передачи данных между задачами. Давайте разберем, как можно корректно передать количество созданных файлов (значение count), и запустить параллельные рабочие задачи (worker tasks) на основе этого значения.

Шаги для получения результата задачи и использования его в DAG

  1. Определение задачи создания файлов:
    Ваша задача create_files должна возвращать количество созданных файлов. Это можно сделать с использованием декоратора @task, который автоматически создает XCom. Пример вашего кода будет выглядеть следующим образом:

    from airflow.decorators import dag, task
    from datetime import datetime
    
    @dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False)
    def my_dag():
    
        @task  
        def create_files():
            # Логика создания файлов
            count = ... # предположим, что здесь вы делаете что-то и устанавливаете count
            return count
    
        count = create_files()
    
        # Теперь мы можем использовать count для создания рабочих задач
        @task
        def worker(i):
            # Логика для рабочего задания
            print(f"Рабочая задача {i} выполняется.")
    
        # Используем значение count для создания рабочих задач
        for i in range(count):
            worker(i)
    
    my_dag_instance = my_dag()
  2. Обратите внимание на использование XCom:
    В вашем первоначальном примере вы пытались использовать метод xcoms_pull, который не является необходимым при использовании TaskFlow API, так как возвращаемое значение из задачи автоматически помещается в XCom.

  3. Создание рабочих задач параллельно:
    При использовании подхода с циклом для создания рабочих задач — каждое worker(i) будет запускаться параллельно, независимо друг от друга, поскольку в Airflow каждая работа выполняется в контексте своей индивидуальной задачи.

  4. Не используйте xcoms_pull в текущем контексте:
    Как вы правильно заметили, XComArg объект (например, count) не имеет метода xcoms_pull. Это не является обязательным в вашем случае, так как вы уже работаете с возвращаемыми значениями между задачами прямым образом.

  5. Использование @task для worker:
    Убедитесь, что каждую из ваших рабочих задач вы также обернули в декоратор @task. Это важно для того, чтобы Airflow смог обрабатывать их корректно и автоматически управлять их зависимостями, выполняя их параллельно, как вы и задумывали.

Заключение

Учитывая вышесказанное, создание DAG в Airflow, где одна задача производит результат, используемый другой задачей, требует правильной настройки XCom через TaskFlow API. Таким образом, вы сможете динамически запускать нужное количество параллельных задач в зависимости от контекста данных, которые были переданы через XCom. Используя представленный формат кода, вы сможете эффективно решить поставленную задачу.

Оцените материал
Добавить комментарий

Капча загружается...