Airflow中自定义报警机制及对Slack和Opsgenie集成

Custom alert mechanism in Airflow with Slack and Opsgenie

Posted by jessychen on April 17, 2020

我们已经知道,用Ariflow做任务调度是十分方便的。然而,任何程序都免不了面临执行失败的问题。虽然有retry大法,但当retry也失败的话,还是需要人工干预。这时候,及时而全面的报警就显得相当重要。今天,就来介绍下Airflow中的报警机制。包括:

  1. 自带Email通知
  2. 自定义Email
  3. 消息发送至Slack
  4. 与Opsgenie集成获取oncaller

自带Email通知

相信大多数用过airflow的人对其自带的email alert功能不会陌生。只要一些简单的conf和code添加,就可以实现DAG失败时发送邮件至指定邮箱。需要两步:

  1. 添加smtp配置,有两种方法,conf文件和环境变量(env var),见: airflow config。如果两个地方都配置了的话,环境变量的的优先级会高于conf中的配置。

    • airflow.cfg中添加smtp配置:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
    [email]
    email_backend = airflow.utils.email.send_email_smtp
       
    [smtp]
    # If you want airflow to send emails on retries, failure, and you want to use
    # the airflow.utils.email.send_email_smtp function, you have to configure an
    # smtp server here
    smtp_host = localhost
    smtp_starttls = True
    smtp_ssl = False
    # Uncomment and set the user/pass settings if you want to use SMTP AUTH
    smtp_user = airflow
    smtp_password = airflow
    smtp_port = 25
    smtp_mail_from = airflow@example.com
    
    • 在环境变量(env var)中添加,格式为$AIRFLOW__{SECTION}__{KEY}(注意中间是两个下划线)。类似:
    1
    
    export AIRFLOW__SMTP__SMTP_HOST=smtp.example.com
    

    完成smtp配置后,可以在UI界面的Admin->Configuration中查到:

    image-20200416162741711

  2. 在DAG的default_args中配置DAG run failed时发送报警:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    args = {
        ...
        'email': ["airflow@example.com"],
        'email_on_failure': True,
        'email_on_retry': False,
        ...
    }
       
    dag = DAG(
        dag_id = "dag_id",
        default_args = args,
    )
    

完成后,我们就可以在DAG执行失败是收到报警邮件了,邮件内容类似下面这样:

title: Airflow alert: <TaskInstance: dag_id.task_id 2020-04-02T03:59:29.527624+00:00 [failed]>
----
content:
Try 4 out of 1
Exception:
The conn_id `***` isn't defined
Log: Link
Host: airflow-worker-0.airflow-worker.airflow.svc.cluster.local
Log file: /usr/local/airflow/logs/airflow-worker-0/dag_id/task_id/2020-04-02T03:59:29.527624+00:00.log
Mark success: Link

自定义Email

然而,自带的邮件报警有一些局限性,当业务增多变得复杂时会不能满足我们定制化的需求。例如:

  • 不能自定义title;
  • 邮件内容相对简单,只有出错的exception message 信息;
  • 收件人只能配置一次,所有DAG的报警会发给同样的邮箱。

其实,在Airflow中发送自定义的Email也十分方便。Airflow自带了一个email模块,只要调用里面的send_email方法,传入自定义的收件人,标题,内容,附件,就可以实现邮件的定制化。

1
2
3
4
5
6
7
8
from airflow.utils.email import send_email

mail_to = "airflow@example.com"
subject = "This is alert email title"
content = "This is alert email content"
attachments = ["attach1.txt", "attach2.txt"]
send_email(mail_to, subject, content, attachments)

那么,我们在哪里调用这个send_email呢?我们可以修改DAG的default_args参数,定义DAG失败是的callback函数:

1
2
3
4
5
6
7
8
9
args = {
    ...
    'on_failure_callback': task_fail_alert,
    ...
}

def task_fail_alert(context):
  ...
  send_email(mail_to, subject, content, attachments)

除了send_email,Airflow还提供了一个send_email_smtp方法,同样可以发邮件。两者的输入参数是一样的。那有什么区别呢,我们来看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def send_email(to, subject, html_content,
               files=None, dryrun=False, cc=None, bcc=None,
               mime_subtype='mixed', mime_charset='utf-8', **kwargs):
    """
    Send email using backend specified in EMAIL_BACKEND.
    """
    path, attr = conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
    module = importlib.import_module(path)
    backend = getattr(module, attr)
    to = get_email_address_list(to)
    to = ", ".join(to)

    return backend(to, subject, html_content, files=files,
                   dryrun=dryrun, cc=cc, bcc=bcc,
                   mime_subtype=mime_subtype, mime_charset=mime_charset, **kwargs)
  
def send_email_smtp(to, subject, html_content, files=None,
                    dryrun=False, cc=None, bcc=None,
                    mime_subtype='mixed', mime_charset='utf-8',
                    **kwargs):
    """
    Send an email with html content
    >>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True)
    """
    smtp_mail_from = conf.get('smtp', 'SMTP_MAIL_FROM')

    to = get_email_address_list(to)

    msg = MIMEMultipart(mime_subtype)
    msg['Subject'] = subject
    msg['From'] = smtp_mail_from
    msg['To'] = ", ".join(to)
    recipients = to
    #more code be omitted.

从源码可以看出,send_email调用了Airflow conf里email_backend中指定的发邮件方法,一般不改的话默认值是airflow.utils.email.send_email_smtp。所以默认配置中两者是等价的。如果觉得自带的send_email_smpt方法不能满足需求的话,还可以自定义邮件发送方法,在email_backend中指定就可以了。

消息发送至Slack

不过,对于邮箱中的邮件,我们其实经常不能及时看到。所以对于重要报警,更好的方式是能发到即时通信软件(例如Slack)中,方便我们立刻收到处理。Slack是类似于钉钉、RTX的一款优秀的企业通信工具。这里以Slack为例,介绍如何通过Slack及时接收DAG失败的消息。当然,根据需要,也可以换成其他的即时通信工具。

我们知道,Airflow贡献的Operator是十分丰富的。其中,也有用于Slack的operator可以选择:SlackWebhookOperator, SlackAPIOperator, SlackAPIPostOperator。是不是有点多?先解释下它们的区别吧。

在目前为止最新的release版本(v1-10)code中:

  • SlackWebhookOperator 位于airflow/contrib/operators/slack_webhook_operator.py,继承于SimpleHttpOperator。从位置可看出这是由第三方贡献的。
  • SlackAPIOperatorSlackAPIPostOperator位于airflow/operators/slack_operator.py,前者继承于BaseOperator,后者继承与前者。从位置可看出这个是airflow团队写的。

上面两类operator的功能是重复的,我们任选一种来用就可以了。

而在最新的master branch上,Airflow对文件的组织路径做了较大的调整,给所有第三方的providers建立了目录,把不同位置的相关code都统一放到了一块。例如,与Slack相关的code都统一挪到了airflow/providers/slack下,上面的两个slack operator的文件都位于airflow/providers/slack/operators下,文件的组织更为清晰。

下面,以SlackWebhookOperator为例,介绍怎么向Slack发报警通知。attachments为一个json数组,定义见slack api。同样,也需要为slack建立一个connection。这里,我们把webhook_token存在password里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def send_alert_to_slack(context):
    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    attachments = [
      ...
    ]
    failed_alert = SlackWebhookOperator(
        task_id='slack_notify',
        http_conn_id=SLACK_CONN_ID,
        webhook_token=slack_webhook_token,
        attachments=attachments,
        message="slack alert message",
        username='Airflow-alert',
        link_names=True,
        channel=SLACK_CHANNEL
    )
    return failed_alert.execute(context=context)

完成后,就可以在对应的slack channel里收到报警了,效果如下(图片来自于slack官网):

image-20200416202732049

与Opsgenie集成获取oncaller

经过上面几个步骤,已经可以成功的把报警同时发到邮箱和Slack中了。但是,发送到Slack channel里的消息,默认的Notifications设置是just @mensions。也就是说,除非改了设置,对于channel里的没有@给你的消息Slack是不会向你推送的,需要你打开对应的channel才能看到。所以为了不错过报警,还需要发消息时@到具体的人。

那么@who 合适呢?相信大多数team都会有轮流oncall的机制,或是指定功能模块的负责人,我们只要能通过某种方法获取到这个oncaller或负责人,把消息@him/her就可以了。 当然,你也可以用@channel@here的方式通知所有人,但这样会产生很多垃圾信息,而且效果也不如指定处理人来的好。

这里以Opsgenie为例,介绍从Opsgenie获取oncaller,让其在Slack中及时收到DAG的报警。

1
2
3
4
5
6
7
8
9
10
11
12
13
#get oncaller by http request
curl_cmd = "curl -X GET  'https://api.opsgenie.com/v2/schedules/%s/on-calls?scheduleIdentifierType=name&flat=true' --header 'Authorization: GenieKey %s'" % (schedule_name, token)
oncall_json = subprocess.check_output(curl_cmd, shell=True).decode().strip()
oncall = (json.loads(oncall_json))["data"]["onCallRecipients"]
notify = "@"+" @".join(oncall)

failed_alert = SlackWebhookOperator(
    task_id='slack_notify',
    ...
    message="slack alert message %s" % notify,
    link_names=True,  #this filed shoud be true to link the name with @mention
    ...
)

到此为止,就打造了一个相对强大的报警通知机制。既不会漏报,也不会过多的打扰到不相干的人。


今天的内容就到这了,如果觉得对你有帮助的话,请关注我的微信号,让我们共同成长进步~


本文作者:Jessychen
版权声明:本博客所有文章除特别声明外,均采用CC-BY-NC-SA 4.0 Int'l许可协议
如需转载,烦请注明出处: