Есть ли способ запустить группу заданий с активами в Dagster после завершения другой группы?

Вопрос или проблема

Есть ли способ запустить группу задач активов в Dagster после завершения другой группы?

Для контекста, у меня есть две группы активов (foo и bar).

foo = AssetSelection.groups("foo")
bar = AssetSelection.groups("bar")

И две задачи, основанные на этих группах:

foo_pipeline = define_asset_job(
    name="foo_pipeline", selection=foo
)

bar_pipeline = define_asset_job(
    name="bar_pipeline", selection=bar
)

А также один график

foo_schedule = ScheduleDefinition(
    job=foo_pipeline,
    cron_schedule="30 9 * * MON-FRI",
    execution_timezone="America/Chicago",
    default_status=DefaultScheduleStatus.RUNNING,
)

Исходя из https://github.com/dagster-io/dagster/discussions/7160, я попытался сделать следующее, но безуспешно.

@sensor(job=bar)
def thing_sensor():
    run_records = context.instance.get_run_records(
        filters=RunsFilter(
            job_name=foo,
            statuses=[DagsterRunStatus.SUCCESS],
            # updated_after=..., # можно также фильтровать по времени, чтобы более эффективно получать данные
        ),
        order_by="update_timestamp",
        ascending=False,
    )
    for run_record in run_records:
        yield RunRequest(
            run_key=run_record.dagster_run.run_id,  # избегаем повторного запуска для одной и той же задачи
        )

thing_sensor()

Тем не менее, это не работает. Никаких ошибок не возникает при выполнении dagster dev -m projects, но когда я смотрю в UI Dagster, ни моя задача foo, ни сенсор foo не были определены.

Когда я захожу в iPython, я вижу:

In [74]: x = thing_sensor()
In [75]: [print(i) for i in x]
Cell In[75], line 1, in <listcomp>(.0)
----> 1 [print(i) for i in x]

Cell In[66], line 3, in thing_sensor()
      1 @sensor(job=internal_reporting_pipeline)
      2 def thing_sensor():
----> 3     run_records = context.instance.get_run_records(
      4         filters=RunsFilter(
      5             job_name=partner_reporting_pipeline,
      6             statuses=[DagsterRunStatus.SUCCESS],
      7             # updated_after=..., # можно также фильтровать по времени, чтобы более эффективно получать данные
      8         ),
      9         order_by="update_timestamp",
     10         ascending=False,
     11     )
     12     for run_record in run_records:
     13         yield RunRequest(
     14             run_key=run_record.dagster_run.run_id,  # избегаем повторного запуска для одной и той же задачи
     15         )

NameError: name 'context' is not defined

Я уверен, что неправильно понимаю ссылку на Github, но не знаю, как именно. В конечном итоге, я просто хочу запустить одну группу задач после завершения другой, но мне трудно разобраться в документации Dagster.

Есть ли способ запустить одну группу задач после завершения другой? Можете ли вы поделиться примером или ссылкой на код, если да?

Ответ или решение

Да, в Dagster есть способ запускать одну группу задач после завершения другой группы. Вы можете использовать сенсоры, чтобы реагировать на завершение задач из первой группы и инициировать задачи из второй группы.

Ваша проблема в том, что вы пытаетесь получить доступ к контексту внутри функции сенсора, но на самом деле вам нужно использовать его правильным образом. Позвольте мне предложить вам пример, чтобы вы могли выполнить вашу задачу.

Вот как вы можете это сделать:

  1. Определите ваши группы активов и соответствующие задачи:
from dagster import AssetSelection, define_asset_job, ScheduleDefinition, sensor, RunRequest
from dagster import DagsterRunStatus

foo = AssetSelection.groups("foo")
bar = AssetSelection.groups("bar")

foo_pipeline = define_asset_job(
    name="foo_pipeline", selection=foo
)

bar_pipeline = define_asset_job(
    name="bar_pipeline", selection=bar
)
  1. Настройте расписание для первой группы задач foo:
foo_schedule = ScheduleDefinition(
    job=foo_pipeline,
    cron_schedule="30 9 * * MON-FRI",
    execution_timezone="America/Chicago",
    default_status=DefaultScheduleStatus.RUNNING,
)
  1. Создайте сенсор, который будет отслеживать завершение группы foo_pipeline и запускать bar_pipeline:
from dagster import sensor, RunRequest, SensorEvaluationContext

@sensor(job=bar_pipeline)
def foo_completion_sensor(context: SensorEvaluationContext):
    # Получаем записи о выполнении для задачи foo_pipeline
    run_records = context.instance.get_run_records(
        filters=RunsFilter(
            job_name="foo_pipeline",
            statuses=[DagsterRunStatus.SUCCESS],
        ),
        order_by="update_timestamp",
        ascending=False,
    )

    # Если самая последняя успешная задача была завершена, запускаем bar_pipeline
    for run_record in run_records:
        yield RunRequest(
            run_key=run_record.dagster_run.run_id,  # предотвращаем повторный запуск для одного и того же run_id
        )
  1. Убедитесь, что вы зарегистрировали сенсор и задачи в Dagster. Если вы используете dagster dev, все должно работать, и вы сможете увидеть сенсоры и задания в интерфейсе Dagster.

Обратите внимание на следующие моменты:

  • Используйте context правильно – он передается в вашу функцию сенсора.
  • Вы можете добавить фильтры по времени или другим критериям для повышения эффективности.

Теперь, каждый раз, когда задача foo_pipeline успешно завершается, ваш сенсор сработает и инициирует выполнение bar_pipeline.

Если у вас есть какие-либо дополнительные вопросы или возникают ошибки, чувствуйтесь свободно обратиться за помощью.

Оцените материал
Добавить комментарий

Капча загружается...