我们已经知道,用Ariflow做任务调度是十分方便的。然而,任何程序都免不了面临执行失败的问题。虽然有retry大法,但当retry也失败的话,还是需要人工干预。这时候,及时而全面的报警就显得相当重要。今天,就来介绍下Airflow中的报警机制。包括:
自带Email通知
相信大多数用过airflow的人对其自带的email alert功能不会陌生。只要一些简单的conf和code添加,就可以实现DAG失败时发送邮件至指定邮箱。需要两步:
-
添加smtp配置,有两种方法,conf文件和环境变量(env var),见: airflow config。如果两个地方都配置了的话,环境变量的的优先级会高于conf中的配置。
- 在
airflow.cfg
中添加smtp配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
[email] email_backend = airflow.utils.email.send_email_smtp [smtp] # If you want airflow to send emails on retries, failure, and you want to use # the airflow.utils.email.send_email_smtp function, you have to configure an # smtp server here smtp_host = localhost smtp_starttls = True smtp_ssl = False # Uncomment and set the user/pass settings if you want to use SMTP AUTH smtp_user = airflow smtp_password = airflow smtp_port = 25 smtp_mail_from = airflow@example.com
- 在环境变量(
env var
)中添加,格式为$AIRFLOW__{SECTION}__{KEY}
(注意中间是两个下划线)。类似:
1
export AIRFLOW__SMTP__SMTP_HOST=smtp.example.com
完成smtp配置后,可以在UI界面的Admin->Configuration中查到:
- 在
-
在DAG的
default_args
中配置DAG run failed时发送报警:1 2 3 4 5 6 7 8 9 10 11 12
args = { ... 'email': ["airflow@example.com"], 'email_on_failure': True, 'email_on_retry': False, ... } dag = DAG( dag_id = "dag_id", default_args = args, )
完成后,我们就可以在DAG执行失败是收到报警邮件了,邮件内容类似下面这样:
title: Airflow alert: <TaskInstance: dag_id.task_id 2020-04-02T03:59:29.527624+00:00 [failed]>
----
content:
Try 4 out of 1
Exception:
The conn_id `***` isn't defined
Log: Link
Host: airflow-worker-0.airflow-worker.airflow.svc.cluster.local
Log file: /usr/local/airflow/logs/airflow-worker-0/dag_id/task_id/2020-04-02T03:59:29.527624+00:00.log
Mark success: Link
自定义Email
然而,自带的邮件报警有一些局限性,当业务增多变得复杂时会不能满足我们定制化的需求。例如:
- 不能自定义title;
- 邮件内容相对简单,只有出错的exception message 信息;
- 收件人只能配置一次,所有DAG的报警会发给同样的邮箱。
其实,在Airflow中发送自定义的Email也十分方便。Airflow自带了一个email模块,只要调用里面的send_email
方法,传入自定义的收件人,标题,内容,附件,就可以实现邮件的定制化。
1
2
3
4
5
6
7
8
from airflow.utils.email import send_email
mail_to = "airflow@example.com"
subject = "This is alert email title"
content = "This is alert email content"
attachments = ["attach1.txt", "attach2.txt"]
send_email(mail_to, subject, content, attachments)
那么,我们在哪里调用这个send_email
呢?我们可以修改DAG的default_args参数,定义DAG失败是的callback函数:
1
2
3
4
5
6
7
8
9
args = {
...
'on_failure_callback': task_fail_alert,
...
}
def task_fail_alert(context):
...
send_email(mail_to, subject, content, attachments)
除了send_email
,Airflow还提供了一个send_email_smtp
方法,同样可以发邮件。两者的输入参数是一样的。那有什么区别呢,我们来看下源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def send_email(to, subject, html_content,
files=None, dryrun=False, cc=None, bcc=None,
mime_subtype='mixed', mime_charset='utf-8', **kwargs):
"""
Send email using backend specified in EMAIL_BACKEND.
"""
path, attr = conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
module = importlib.import_module(path)
backend = getattr(module, attr)
to = get_email_address_list(to)
to = ", ".join(to)
return backend(to, subject, html_content, files=files,
dryrun=dryrun, cc=cc, bcc=bcc,
mime_subtype=mime_subtype, mime_charset=mime_charset, **kwargs)
def send_email_smtp(to, subject, html_content, files=None,
dryrun=False, cc=None, bcc=None,
mime_subtype='mixed', mime_charset='utf-8',
**kwargs):
"""
Send an email with html content
>>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True)
"""
smtp_mail_from = conf.get('smtp', 'SMTP_MAIL_FROM')
to = get_email_address_list(to)
msg = MIMEMultipart(mime_subtype)
msg['Subject'] = subject
msg['From'] = smtp_mail_from
msg['To'] = ", ".join(to)
recipients = to
#more code be omitted.
从源码可以看出,send_email调用了Airflow conf里email_backend
中指定的发邮件方法,一般不改的话默认值是airflow.utils.email.send_email_smtp
。所以默认配置中两者是等价的。如果觉得自带的send_email_smpt
方法不能满足需求的话,还可以自定义邮件发送方法,在email_backend
中指定就可以了。
消息发送至Slack
不过,对于邮箱中的邮件,我们其实经常不能及时看到。所以对于重要报警,更好的方式是能发到即时通信软件(例如Slack)中,方便我们立刻收到处理。Slack是类似于钉钉、RTX的一款优秀的企业通信工具。这里以Slack为例,介绍如何通过Slack及时接收DAG失败的消息。当然,根据需要,也可以换成其他的即时通信工具。
我们知道,Airflow贡献的Operator是十分丰富的。其中,也有用于Slack的operator可以选择:SlackWebhookOperator
, SlackAPIOperator
, SlackAPIPostOperator
。是不是有点多?先解释下它们的区别吧。
在目前为止最新的release版本(v1-10)code中:
SlackWebhookOperator
位于airflow/contrib/operators/slack_webhook_operator.py,继承于SimpleHttpOperator
。从位置可看出这是由第三方贡献的。SlackAPIOperator
和SlackAPIPostOperator
位于airflow/operators/slack_operator.py,前者继承于BaseOperator
,后者继承与前者。从位置可看出这个是airflow团队写的。
上面两类operator的功能是重复的,我们任选一种来用就可以了。
而在最新的master branch上,Airflow对文件的组织路径做了较大的调整,给所有第三方的providers建立了目录,把不同位置的相关code都统一放到了一块。例如,与Slack相关的code都统一挪到了airflow/providers/slack下,上面的两个slack operator的文件都位于airflow/providers/slack/operators下,文件的组织更为清晰。
下面,以SlackWebhookOperator
为例,介绍怎么向Slack发报警通知。attachments为一个json数组,定义见slack api。同样,也需要为slack建立一个connection。这里,我们把webhook_token存在password里。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def send_alert_to_slack(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
attachments = [
...
]
failed_alert = SlackWebhookOperator(
task_id='slack_notify',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
attachments=attachments,
message="slack alert message",
username='Airflow-alert',
link_names=True,
channel=SLACK_CHANNEL
)
return failed_alert.execute(context=context)
完成后,就可以在对应的slack channel里收到报警了,效果如下(图片来自于slack官网):
与Opsgenie集成获取oncaller
经过上面几个步骤,已经可以成功的把报警同时发到邮箱和Slack中了。但是,发送到Slack channel里的消息,默认的Notifications设置是just @mensions
。也就是说,除非改了设置,对于channel里的没有@给你的消息Slack是不会向你推送的,需要你打开对应的channel才能看到。所以为了不错过报警,还需要发消息时@到具体的人。
那么@who
合适呢?相信大多数team都会有轮流oncall的机制,或是指定功能模块的负责人,我们只要能通过某种方法获取到这个oncaller或负责人,把消息@him/her
就可以了。 当然,你也可以用@channel
或@here
的方式通知所有人,但这样会产生很多垃圾信息,而且效果也不如指定处理人来的好。
这里以Opsgenie为例,介绍从Opsgenie获取oncaller,让其在Slack中及时收到DAG的报警。
1
2
3
4
5
6
7
8
9
10
11
12
13
#get oncaller by http request
curl_cmd = "curl -X GET 'https://api.opsgenie.com/v2/schedules/%s/on-calls?scheduleIdentifierType=name&flat=true' --header 'Authorization: GenieKey %s'" % (schedule_name, token)
oncall_json = subprocess.check_output(curl_cmd, shell=True).decode().strip()
oncall = (json.loads(oncall_json))["data"]["onCallRecipients"]
notify = "@"+" @".join(oncall)
failed_alert = SlackWebhookOperator(
task_id='slack_notify',
...
message="slack alert message %s" % notify,
link_names=True, #this filed shoud be true to link the name with @mention
...
)
到此为止,就打造了一个相对强大的报警通知机制。既不会漏报,也不会过多的打扰到不相干的人。
今天的内容就到这了,如果觉得对你有帮助的话,请关注我的微信号,让我们共同成长进步~
本文作者:Jessychen
版权声明:本博客所有文章除特别声明外,均采用CC-BY-NC-SA 4.0 Int'l许可协议
如需转载,烦请注明出处: