Вопрос или проблема
У меня есть dag Airflow с функцией on_failure_callback. Но функция on_failure_callback не запускается при сбое, и я не вижу логов.
Вот код моего dag.
Все @tasks в этом коде работают, нет необходимости показывать специальную функцию сбоев. Я работаю с экземпляром задачи, добавляя **kwargs в функции (другие варианты не работают в моем случае).
Почему send_message_on_dag_fail не работает, когда dag потерпел неудачу?
Я добавил тот же код в задачу get_start_end_dates, просто чтобы убедиться, что функция send_message_on_dag_fail работает.
Airflow 2.8.2, Python 3.11.8.
import pandas as pd
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pytz
import httpx
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
from airflow.models import Variable
def get_current_period(date: datetime.date = None):
tz = pytz.timezone('Europe/Moscow')
if date:
now = date.date()
else:
now = datetime.now(tz).date()
if now.day <= 15:
start_date = (now - relativedelta(months=1)).replace(day=16)
end_date = now.replace(day=1) - relativedelta(days=1)
else:
start_date = now.replace(day=1)
end_date = now.replace(day=15)
return str(start_date), str(end_date)
def send_msg(bot_token: str, chat_id: str, message: str, type:str="message" or 'code'):
if type == 'message':
url = f'https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}'
client = httpx.Client(base_url="https://")
return client.post(url)
elif type == 'code':
url = f'https://api.telegram.org/bot{bot_token}/sendMessage'
params = {
'chat_id': chat_id,
'text': message,
'parse_mode': 'Markdown'
}
client = httpx.Client(base_url="https://")
return client.post(url, params=params)
def get_xcom_from_context(context, task_id: str, dict_key:str = False):
if dict_key:
xcom = context['ti'].xcom_pull(task_ids=task_id)[dict_key]
else:
xcom = context['ti'].xcom_pull(task_ids=task_id)
return xcom
default_args = {
'owner': 'user',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': datetime(2024, 9, 19)
}
host = Variable.get('host')
database_name = Variable.get('database_name')
user_name = Variable.get('user_name')
password_for_db = Variable.get('password_for_db')
server_host_name = Variable.get('server_host_name')
bearer_key = Variable.get('bearer_key')
user_key = Variable.get('user_key')
sales_plans_url = Variable.get('sales_plans_url')
specialization_prices_url = Variable.get('specialization_prices_url')
bot_token = Variable.get('bot_token')
chat_id = Variable.get('chat_id')
def send_message_on_dag_fail(bot_token = bot_token, chat_id = chat_id, **kwargs):
context = kwargs
log = context['ti'].log
log.error('DAG FINISHED WITH ERROR __________________') # этот текст ошибки легче найти
task_id = context['ti'].task_id
dag_id = context['dag'].dag_id
message = f"Task {task_id} из Dag {dag_id} потерпел неудачу."
log.error(message)
send_msg(bot_token, chat_id, message, 'message')
@dag(default_args=default_args, schedule_interval=None, catchup=False, concurrency=4, on_failure_callback=send_message_on_dag_fail)
def dag_get_bonus_and_penaltys_for_staff():
@task
def check_time():
tz = pytz.timezone('Europe/Moscow')
current_time = datetime.now(tz).time()
if current_time >= datetime.strptime("00:00", "%H:%M").time() and current_time <= datetime.strptime("01:00", "%H:%M").time():
return False
else:
return True
@task
def get_start_end_dates(bot_token = bot_token, chat_id = chat_id,**kwargs):
context = kwargs
log = context['ti'].log
check_time = get_xcom_from_context(context, 'check_time')
log.info('Объекты Xcom извлечены из контекста')
if check_time:
start_date, end_date = get_current_period()
result = {
'start_date': start_date
, 'end_date': end_date
}
task_id = context['ti'].task_id
dag_id = context['dag'].dag_id
message = f"TASK {task_id} DAG {dag_id}."
log.error(message)
send_msg(bot_token, chat_id, message, 'message')
return result
else:
raise AirflowSkipException("Время для очистки базы данных, пропустить выполнение DAG.")
...
check_time_task = check_time()
get_start_end_dates_task = get_start_end_dates()
check_time_task >> get_start_end_dates_task >> ...
dag_get_bonus_and_penaltys_for_staff = dag_get_bonus_and_penaltys_for_staff()
Я попытался переписать send_message_on_dag_fail только с контекстом в качестве входных данных, теперь это работает.
Я не знаю, почему, потому что когда я пытался работать с контекстом в задаче с контекстом (не **kwargs), я потерпел неудачу.
Есть идеи, почему это работает таким образом?
def send_message_on_dag_fail(context):
log = context['ti'].log
log.error('DAG FINISHED WITH ERROR __________________') # этот текст ошибки легче найти
task_id = context['ti'].task_id
dag_id = context['dag'].dag_id
message = f"Task {task_id} из Dag {dag_id} потерпел неудачу."
log.error(message)
send_msg(bot_token, chat_id, message, 'message')
Ответ или решение
Вопрос, который вы подняли, связан с тем, почему функция on_failure_callback
для вашего DAG в Apache Airflow не срабатывает при ошибках в задачах. Это может происходить по нескольким причинам, и мы постараемся их разобрать, чтобы понять, в чем именно заключается проблема и как ее решить.
1. Подход к передаче аргументов
Ваше первое упоминание о том, что функция send_message_on_dag_fail
не работала должным образом, связано с тем, как вы подавали аргументы. Ваша изначально реализованная версия функции принимала параметр **kwargs
, но при вызове колбеков Airflow передается контекст выполнения, который не является непосредственно kwargs
, что может вызвать путаницу.
Ваша переработанная версия функции, которая принимает только context
, действительно сработала, так как она получает стандартный объект контекста от Airflow, который содержит всю необходимую информацию о текущем состоянии задачи и DAG. Это позволяет вам без проблем получать такие данные, как task_id
, dag_id
и доступ к логам.
2. Структура DAG
Ваша конструкция DAG и задачи выглядит корректной, и на первый взгляд не должно быть проблем с выполнением колбеков при ошибках. Тем не менее, важно отметить, что колбеки, заданные для DAG, срабатывают при завершении самого DAG, а не отдельных задач. Если у вас есть логика потока задач, в которой ошибка в одной задаче не предотвращает завершение всего DAG (например, благодаря обработчикам исключений), это может повлиять на вызов колбека.
3. Справочные данные и переменные окружения
Ваша функция send_message_on_dag_fail
использует переменные, такие как bot_token
и chat_id
, которые вы извлекаете из Airflow Variable. Убедитесь, что эти переменные успешно загружены и доступны в контексте вашего DAG. На случай, если они недоступны при вызове функции, это также может помешать отправке сообщений.
4. Логи и отладка
Как вы отметили, вы не видели логов с сообщением об ошибке. Если ваша функция колбека не срабатывает, возможно, стоит условий для ведения логов и отслеживания правильности выполнения. Убедитесь, что в конфигурации Airflow правильно настроены уровни логирования, это поможет устранить сомнения о том, выполняется ли функция или нет.
5. Версии и документация
Вы используете Airflow версии 2.8.2, поэтому убедитесь, что вы ознакомлены с возможными изменениями в API, которые могли произойти между версиями. Возможно, в более поздних версиях были внесены обновления в архитектуру колбеков или другие критические компоненты.
Заключение
Согласно вашему описанию, функция send_message_on_dag_fail
сработала, когда вы изменили ее на более простой вариант с одним параметром — контекстом. Это идеально демонстрирует, что правильный способ передачи аргументов критически важен в контексте Airflow. Убедитесь, что вы всегда принимаете контекст, который предоставляет Airflow, и проверяйте, чтобы все используемые переменные были доступны и корректны.
Рекомендуется продолжать мониторинг задач, чтобы гарантировать, что все колбеки работают как задумано, и возможно попрактикуйтесь с округлением кода, чтобы повысить его читаемость и поддержку в будущем. Если возникнут дополнительные вопросы, не стесняйтесь обращаться!