Airflow中利用xcom实现task间信息传递

Using XCOM to realize data transfer between airflow tasks

Posted by jessychen on April 6, 2020

XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”.

Airflow作为一款优秀的任务调度系统,必然需要解决一个重要问题:如何在多个调度任务中实现信息共享?即怎么把task A执行得到的结果传递给task B,让task B可以基于task A的结果进行后续操作。XCom就是给出的答案。

基本原理

XCom是”cross-communication”的缩写。它被设计于用来在Airflow 各个task间进行数据共享。XCom属于Airflow的model,见airflow.models.xcom.XCom,包含下面几个字段:

1
2
3
4
5
6
7
8
__tablename__ = "xcom"
key = Column(String(512, **COLLATION_ARGS), primary_key=True)
value = Column(LargeBinary)
timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
execution_date = Column(UtcDateTime, primary_key=True)
# source information
task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)

从上述code定义可以了解到,XCom里的内容其实是存在Airflow DB中的xcom这张表里的。如果配置了Airflow UI,可以很方面的查到当前XCom里存的所以内容(入口位于Admin->XComs)

image-20200406113247869

表里的字段与model中的定义一一对应,很好理解。其中execution_date是task开始执行的时间,timestamp默认是这条记录生成的时间。图上的XCom存的是对应的task的返回值(key=”return_value”, value=”Success“)。如果你的task定义了其他类型的返回值,也可以通过return_value这个key来获取。

如此,显而易见,XCom的本质就是把task需要传递的信息以KV的形式存到DB中,而其他task则可以从DB中获取。由于XCom是存在DB而不是内存中,这也说明了对于已经执行完的DAG,如果重跑其中某个task的话依然可以获取到同次DAG运行时其他task传递的内容。

看到这里,发现什么问题没?没错,由于XCom不会自己清理,所以DB里的数据会越来越多🙀。这里推荐在每次DAG执行success的最后加一个clear的操作,可以通过在DAG中加一个on_success_callback来实现,参考code如下,请按实际情况调整filter条件:

1
2
3
4
5
6
7
8
9
10
11
12
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context['ti']['dag_id']
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

Push和Pull

下面来讲讲如何往XCom中添加和获取记录。上面已经说了,XCom存储的是KV形式的数据对,Airflow包装了xcom_pushxcom_pull两个方法,可以很方便的进行存取,定义在taskinstance.py中:source

1
2
3
4
5
6
7
8
9
10
11
12
def xcom_push(
            self,
            key: str,
            value: Any,
            execution_date: Optional[datetime] = None) -> None:
  
def xcom_pull(
            self,
            task_ids: Optional[Union[str, Iterable[str]]] = None,
            dag_id: Optional[str] = None,
            key: str = XCOM_RETURN_KEY,
            include_prior_dates: bool = False) -> Any:

如果没有特殊的需求,我们只需关注里面的keyvalue这两个参数即可。其他参数Airflow会根据task的上下文自动添加。看个PythonOperator的例子更能说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def push_data(**context):
  context['ti'].xcom_push(key='test_key', value='test_val')
  
push_data_op = PythonOperator(
    task_id = 'push_data',
    python_callable = push_data,
    provide_context=True,
    dag = dag
)

def pull_data(**context):
  test_data = context['ti'].xcom_pull(key='test_key')
  
pull_data_op = PythonOperator(
    task_id = 'pull_data',
    python_callable = pull_data,
    provide_context=True,
    dag = dag
)

push_data_op >> pull_data_op

上面的code就在push_datapull_data两个任务中传递了key='test_key', value='test_val'这一条数据。更方便的是,这里的value不仅限于str类型,这就提供了更大的自由度。注意,在opreator中必须要有provide_context=True,才能在operator内部通过context['ti'](有的文档给的例子是context['task_instance'])获得当前task的TaskInstance,进行XCom push/pull的相关操作。

此外,如果一个task返回一个值(从operator的execute()方法,或者从PythonOperator的python_callable指定的方法),那么包含该值的XCom将被自动push,默认的key就是上面见过的return_value。官方文档中给了下面这个例子,这里可以不指定key:

1
2
3
4
5
6
7
# inside a PythonOperator called 'pushing_task'
def push_function():
    return value

# inside another PythonOperator where provide_context=True
def pull_function(**context):
    value = context['ti'].xcom_pull(task_ids='pushing_task')

很好理解,这个例子是通过传递task_ids参数来获取对应task的返回值。这里的task_ids如果传递多个,则返回相应tasks的XCom值列表。当然,你还可以同时指定key参数,来获取指定tasks里xcom_push的key对应的value。

Operator的do_xcom_push参数

看完上面这些,你应该对XCom的使用有了初步的了解。可能又会有这样的疑问:Airflow提供了不少预定义的Operator,方便我们直接调用来完成一些基本的task,我们一般不会去改里面的内容。那么要怎样获取这类task的输出呢?这就是这节要讲的内容了。

BaseOperator中提供了一个do_xcom_push参数,它的定义如下:

1
2
3
4
5
:param do_xcom_push: if True, an XCom is pushed containing the Operator's
    result
:type do_xcom_push: bool
......
do_xcom_push: bool = True,

在Operator实现中可以用这个参数来决定是否往XCom传递一些需要的信息,BaseOperator中默认值是True。找了个DatabricksSubmitRunOperator作为实现的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#param do_xcom_push: Whether we should push run_id and run_page_url to xcom.
class DatabricksSubmitRunOperator(BaseOperator):
	@apply_defaults
  	def __init__(
        ......
			do_xcom_push=False,
        ......
  	):
      self.do_xcom_push = do_xcom_push
       
    def execute(self, context):
    	hook = self._get_hook()
      self.run_id = hook.run_now(self.json)
      _handle_databricks_operator_execution(self, hook, self.log, context)

XCOM_RUN_ID_KEY = 'run_id'
XCOM_RUN_PAGE_URL_KEY = 'run_page_url'

def _handle_databricks_operator_execution(operator, hook, log, context):
	if operator.do_xcom_push:
		context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=operator.run_id)
  log.info('Run submitted with run_id: %s', operator.run_id)
  run_page_url = hook.get_run_page_url(operator.run_id)
  if operator.do_xcom_push:
  	context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url)
  log.info('View run status, Spark UI, and logs at %s', run_page_url)

do_xcom_push在这里已经被默认置成了False,是因为Operator不确定其他task是否一定需要相关内容,置成False可以不往XCom传多余的东西。当task用Operator初始化时显示指定do_xcom_push=True时,在执行时便会把run_idrun_page_urlpush给XCom,后面的task也可以读到对应的值。

当然,也可以通过retrun value的形式来写入XCom,例如SSHOperatorBashOperator的实现。(BashOperator在之前版本传的是自定义的xcom_push参数来作为标志,新code已经统一改了。但我看现在airflow master上的code,这两个operator都没有对do_xcom_push进行判断,而是直接retrun value了。)

与template结合使用

上面提到的XCom的获取都是在Operator内部调用xcom_pull。那么,如果我要在创建task的时候就获得这个值作为task的初始参数传递进去,有没有可能办到呢?这在给一些预定义的Operator传递参数是尤其需要。答案当然是肯定的。

想在task外部获取xcom里的值,可以用template来实现,给个BashOperator的例子:

1
2
3
4
5
bash_op = BashOperator(
    task_id='xcom_pull_by_template',
    bash_command='echo "{{ ti.xcom_pull(task_ids="push_task", key="push_key") }}"',
    dag=dag
)

使用template {{ ti.xcom_pull(...) }}可以达到目的。但需要注意的是,{{ *** }}必须包含在引号内才生效。直接使用是会报错的。再看下Airflow使用的jinja template的介绍:

A jinja template is simply a text file that contains the following:

  • variables and/or expressions - these get replaced with values when a template is rendered.
  • tags - these control the logic of the template.

Jinja templating allows you to defer the rendering of strings in your tasks until the actual running of those tasks.

就是说,template做的是简单的字符串替换,而不关心value的实际数据类型。所以,如果是你想传的是比较复杂的数据类型的话,template是不会处理的。这里,建议可以往XCom里分别多次push,在外部再构造成需要的类型。例如,要往DatabricksSubmitRunOperator传一个json类型的参数,可以这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
databricks_op = DatabricksSubmitRunOperator(
  task_id = "databricks_task",
  json = {
    "new_cluster":{
      "node_type_id": '{{ ti.xcom_pull(key="node_type") }}',
      "autoscale":{
        "min_workers": 1,
         "max_workers": '{{ ti.xcom_pull(key="instance_count") }}'
       }
    },
    "libraries" : [
      {
        "jar": '{{ ti.xcom_pull(key="jar") }}'
      }
    ]
  }
)

总结

最后,对XCom的使用做下总结:

  1. XCom用来在Airflow 各个task间进行数据共享
  2. XCom通过把task需要传递的信息以KV的形式存到DB中来实现数据共享
  3. 可以通过TaskInstance的xcom_push和xcom_push来存取数据
  4. 有些预定义的Operator通过do_xcom_push变量判断是否往XCom存结果
  5. 在task外部可以通过template的方式获取XCom里的值

今天的内容就到这了,如果觉得对你有帮助的话,请关注我的微信号,让我们共同成长进步~


本文作者:Jessychen
版权声明:本博客所有文章除特别声明外,均采用CC-BY-NC-SA 4.0 Int'l许可协议
如需转载,烦请注明出处: