Вопрос или проблема
Я использую Airflow 2.10.2. Я определяю DAG, который в основном выполняет 2 задачи.
- Некоторую основную логику, которая создает кучу файлов
- Запускает параллельно (порядок не важен) набор ‘рабочих’ задач
Поскольку первый шаг требует высокой вычислительной мощности, я определяю его как задачу, которая возвращает количество созданных файлов. Это число важно, потому что мне нужно запустить столько рабочих задач, сколько создано файлов. Мой код выглядит примерно так:
@dag
def my_dag:
@task
def create_files():
...
return count
count = create_files()
for i in range(count):
worker.override()()
my_dag()
Я не понимаю, как мне получить значение count в определении моего DAG. Я, вероятно, просто не понял, как работают Xcoms (так как я в основном полагался на API TaskFlow), но любая помощь будет оценена.
Я пытался получить значение с помощью xcoms_pull
:
count.xcoms_pull(task_ids="create_files")
count['ti'].xcoms_pull(task_ids="create_files")
И без xcoms_pull:
count['ti']
Но ни один из них не работает, поскольку XComs определены для межзадачной коммуникации.
Сообщение об ошибке:
AttributeError: 'XComArg' объект count не имеет атрибута 'xcoms_pull'
.
Теперь я увидел, что XComs – это просто специальный случай переменных, но в основном нет документации об этом (по крайней мере, я не нашел).
Ответ или решение
Для решения вашей задачи в Apache Airflow 2.10.2, касающейся возврата результата задачи и его использования в DAG (Directed Acyclic Graph), вы правильно обратили внимание на важность управления XCom. XCom (от "Cross Communication") используется для передачи данных между задачами. Давайте разберем, как можно корректно передать количество созданных файлов (значение count
), и запустить параллельные рабочие задачи (worker tasks) на основе этого значения.
Шаги для получения результата задачи и использования его в DAG
-
Определение задачи создания файлов:
Ваша задачаcreate_files
должна возвращать количество созданных файлов. Это можно сделать с использованием декоратора@task
, который автоматически создает XCom. Пример вашего кода будет выглядеть следующим образом:from airflow.decorators import dag, task from datetime import datetime @dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False) def my_dag(): @task def create_files(): # Логика создания файлов count = ... # предположим, что здесь вы делаете что-то и устанавливаете count return count count = create_files() # Теперь мы можем использовать count для создания рабочих задач @task def worker(i): # Логика для рабочего задания print(f"Рабочая задача {i} выполняется.") # Используем значение count для создания рабочих задач for i in range(count): worker(i) my_dag_instance = my_dag()
-
Обратите внимание на использование XCom:
В вашем первоначальном примере вы пытались использовать методxcoms_pull
, который не является необходимым при использовании TaskFlow API, так как возвращаемое значение из задачи автоматически помещается в XCom. -
Создание рабочих задач параллельно:
При использовании подхода с циклом для создания рабочих задач — каждоеworker(i)
будет запускаться параллельно, независимо друг от друга, поскольку в Airflow каждая работа выполняется в контексте своей индивидуальной задачи. -
Не используйте
xcoms_pull
в текущем контексте:
Как вы правильно заметили,XComArg
объект (например,count
) не имеет методаxcoms_pull
. Это не является обязательным в вашем случае, так как вы уже работаете с возвращаемыми значениями между задачами прямым образом. -
Использование
@task
для worker:
Убедитесь, что каждую из ваших рабочих задач вы также обернули в декоратор@task
. Это важно для того, чтобы Airflow смог обрабатывать их корректно и автоматически управлять их зависимостями, выполняя их параллельно, как вы и задумывали.
Заключение
Учитывая вышесказанное, создание DAG в Airflow, где одна задача производит результат, используемый другой задачей, требует правильной настройки XCom через TaskFlow API. Таким образом, вы сможете динамически запускать нужное количество параллельных задач в зависимости от контекста данных, которые были переданы через XCom. Используя представленный формат кода, вы сможете эффективно решить поставленную задачу.