Вопрос или проблема
Здравствуйте, у меня есть случай использования, когда мне нужно выполнить последовательность преобразований над коллекцией данных.
Я сгруппировал последовательность преобразований в группе задач, чтобы динамически сопоставить группу задач со списком данных, полученных из предыдущей задачи.
Вот фрагмент, похожий на то, что я пытаюсь достичь с помощью DAG, чтобы вы могли лучше понять.
with DAG(
dag_id="sandbox",
start_date=datetime(2024, 1, 1, 9),
schedule_interval=None,
catchup=False,
max_active_runs=1,
default_args={
}
) as dag:
@task
def return_a_list():
return [i for i in range(20)]
@task_group
def tg(n):
@task
def task1(n):
print("task1 получил " + str(n) + " в " + str(datetime.now()))
return n
@task
def task2(n):
print("task2 получил " + str(n) + " в " + str(datetime.now()))
task2(task1(n))
nbrs = return_a_list()
tg.expand(n=nbrs)
Проблема в том, что первая задача выполняется 20 раз (т.е. для всех nbrs), прежде чем вторая задача выполнится для всех nbrs, вот так (task1)x20 >> (task2)x20.
в то время как на самом деле я хочу, чтобы задачи выполнялись одна за другой для каждого nbr, поступающего из расширения, вот так (task1>>task2)x20.
Мой вопрос касается использования расширения, является ли это правильным способом достичь того, что я описал, или мне следует сгруппировать задачи в одной функции и вернуться к использованию цикла for вместо того, чтобы пытаться сделать это “по-воздушному”? Если так, что я делаю не так здесь?
Ответ или решение
Вопрос, который вы подняли, касается динамической конфигурации задач в Apache Airflow и их последовательности выполнения. Давайте рассмотрим вашу ситуацию более подробно.
Описание проблемы
У вас есть задача, которая возвращает список чисел от 0 до 19, после чего вы хотите выполнить серию трансформаций, сгруппированных в Task Group. Однако вы столкнулись с проблемой: все экземпляры первой задачи (task1
) выполняются сначала, а затем все экземпляры второй задачи (task2
). То есть, фактически имеется последовательность выполнения: (task1) x 20 >> (task2) x 20
, а вам требуется изменить ее на (task1 >> task2) x 20
.
Анализ кода
Ваш код создает задачи, используя динамическое отображение (mapping) в Airflow:
tg.expand(n=nbrs)
Это приводит к тому, что для каждого значения из nbrs
создаются отдельные инстансы task1
и task2
. Однако, несмотря на то что задачи связаны, Airflow интерпретирует их выполнение так, что все экземпляры task1
сначала запускаются параллельно, и только после их завершения запускаются все экземпляры task2
.
Решение проблемы
Для достижения желаемой последовательности выполнения вам необходимо изменить структуру вашего кода для того, чтобы обеспечить зависимость между задачами. Вы можете сделать это следующим образом:
- Создайте отдельные задачи для каждого значения
n
внутри циклаfor
. - Используйте вызов
task_group
в цикле для создания правильной цепочки зависимостей.
Пример:
from airflow import DAG
from airflow.decorators import task, task_group
from datetime import datetime
with DAG(
dag_id="sandbox",
start_date=datetime(2024, 1, 1, 9),
schedule_interval=None,
catchup=False,
max_active_runs=1,
default_args={}
) as dag:
@task
def return_a_list():
return [i for i in range(20)]
@task_group
def tg(n):
@task
def task1(n):
print("task1 received " + str(n) + " at " + str(datetime.now()))
return n
@task
def task2(n):
print("task2 received " + str(n) + " at " + str(datetime.now()))
first_task = task1(n)
task2(first_task)
nbrs = return_a_list()
for number in nbrs:
tg(number)
Объяснение
В этом примере мы создали цикл, который перебирает все числа из списка nbrs
и для каждого числа создает новые задачи в Task Group tg
. Это гарантирует, что на каждую трансформацию будет выполняться соответствующая последующая трансформация:
- Задача
task1
дляn
выполняется>>
task2для того же
n`.
Заключение
Теперь ваши задачи будут организованы так, что для каждого числового значения будет выполнено последовательно: сначала task1
, затем task2
. Этот подход обеспечивает правильную последовательность выполнения задач в Apache Airflow, следуя вашим требованиям. Старайтесь избегать использования динамического отображения объема (mapping) без выполнения логики последовательной зависимости, так как это может привести к необратимым последствиям в планировании задач.