Динамически отображаемые задачи Airflow не выполняются последовательно.

Вопрос или проблема

Здравствуйте, у меня есть случай использования, когда мне нужно выполнить последовательность преобразований над коллекцией данных.
Я сгруппировал последовательность преобразований в группе задач, чтобы динамически сопоставить группу задач со списком данных, полученных из предыдущей задачи.

Вот фрагмент, похожий на то, что я пытаюсь достичь с помощью DAG, чтобы вы могли лучше понять.

with DAG(
    dag_id="sandbox",
    start_date=datetime(2024, 1, 1, 9),
    schedule_interval=None,
    catchup=False,
    max_active_runs=1,
    default_args={
    }
) as dag:
    @task
    def return_a_list():
        return [i for i in range(20)]
    
    @task_group
    def tg(n):
        @task
        def task1(n):
            print("task1 получил " + str(n) + " в " + str(datetime.now()))
            return n
        
        @task
        def task2(n):
            print("task2 получил " + str(n) + " в " + str(datetime.now()))

        task2(task1(n))

    nbrs = return_a_list()
    tg.expand(n=nbrs)

Проблема в том, что первая задача выполняется 20 раз (т.е. для всех nbrs), прежде чем вторая задача выполнится для всех nbrs, вот так (task1)x20 >> (task2)x20.

в то время как на самом деле я хочу, чтобы задачи выполнялись одна за другой для каждого nbr, поступающего из расширения, вот так (task1>>task2)x20.

Мой вопрос касается использования расширения, является ли это правильным способом достичь того, что я описал, или мне следует сгруппировать задачи в одной функции и вернуться к использованию цикла for вместо того, чтобы пытаться сделать это “по-воздушному”? Если так, что я делаю не так здесь?

Ответ или решение

Вопрос, который вы подняли, касается динамической конфигурации задач в Apache Airflow и их последовательности выполнения. Давайте рассмотрим вашу ситуацию более подробно.

Описание проблемы

У вас есть задача, которая возвращает список чисел от 0 до 19, после чего вы хотите выполнить серию трансформаций, сгруппированных в Task Group. Однако вы столкнулись с проблемой: все экземпляры первой задачи (task1) выполняются сначала, а затем все экземпляры второй задачи (task2). То есть, фактически имеется последовательность выполнения: (task1) x 20 >> (task2) x 20, а вам требуется изменить ее на (task1 >> task2) x 20.

Анализ кода

Ваш код создает задачи, используя динамическое отображение (mapping) в Airflow:

tg.expand(n=nbrs)

Это приводит к тому, что для каждого значения из nbrs создаются отдельные инстансы task1 и task2. Однако, несмотря на то что задачи связаны, Airflow интерпретирует их выполнение так, что все экземпляры task1 сначала запускаются параллельно, и только после их завершения запускаются все экземпляры task2.

Решение проблемы

Для достижения желаемой последовательности выполнения вам необходимо изменить структуру вашего кода для того, чтобы обеспечить зависимость между задачами. Вы можете сделать это следующим образом:

  1. Создайте отдельные задачи для каждого значения n внутри цикла for.
  2. Используйте вызов task_group в цикле для создания правильной цепочки зависимостей.

Пример:

from airflow import DAG
from airflow.decorators import task, task_group
from datetime import datetime

with DAG(
    dag_id="sandbox",
    start_date=datetime(2024, 1, 1, 9),
    schedule_interval=None,
    catchup=False,
    max_active_runs=1,
    default_args={}
) as dag:

    @task
    def return_a_list():
        return [i for i in range(20)]

    @task_group
    def tg(n):
        @task
        def task1(n):
            print("task1 received " + str(n) + " at " + str(datetime.now()))
            return n

        @task
        def task2(n):
            print("task2 received " + str(n) + " at " + str(datetime.now()))

        first_task = task1(n)
        task2(first_task)

    nbrs = return_a_list()

    for number in nbrs:
        tg(number)

Объяснение

В этом примере мы создали цикл, который перебирает все числа из списка nbrs и для каждого числа создает новые задачи в Task Group tg. Это гарантирует, что на каждую трансформацию будет выполняться соответствующая последующая трансформация:

  • Задача task1 для n выполняется>>task2для того жеn`.

Заключение

Теперь ваши задачи будут организованы так, что для каждого числового значения будет выполнено последовательно: сначала task1, затем task2. Этот подход обеспечивает правильную последовательность выполнения задач в Apache Airflow, следуя вашим требованиям. Старайтесь избегать использования динамического отображения объема (mapping) без выполнения логики последовательной зависимости, так как это может привести к необратимым последствиям в планировании задач.

Оцените материал
Добавить комментарий

Капча загружается...