XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”.
Airflow作为一款优秀的任务调度系统,必然需要解决一个重要问题:如何在多个调度任务中实现信息共享?即怎么把task A执行得到的结果传递给task B,让task B可以基于task A的结果进行后续操作。XCom就是给出的答案。
基本原理
XCom是”cross-communication”的缩写。它被设计于用来在Airflow 各个task间进行数据共享。XCom属于Airflow的model,见airflow.models.xcom.XCom
,包含下面几个字段:
1
2
3
4
5
6
7
8
__tablename__ = "xcom"
key = Column(String(512, **COLLATION_ARGS), primary_key=True)
value = Column(LargeBinary)
timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
execution_date = Column(UtcDateTime, primary_key=True)
# source information
task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
从上述code定义可以了解到,XCom里的内容其实是存在Airflow DB中的xcom
这张表里的。如果配置了Airflow UI,可以很方面的查到当前XCom里存的所以内容(入口位于Admin->XComs):
表里的字段与model中的定义一一对应,很好理解。其中execution_date是task开始执行的时间,timestamp默认是这条记录生成的时间。图上的XCom存的是对应的task的返回值(key=”return_value”, value=”Success“)。如果你的task定义了其他类型的返回值,也可以通过return_value
这个key来获取。
如此,显而易见,XCom的本质就是把task需要传递的信息以KV的形式存到DB中,而其他task则可以从DB中获取。由于XCom是存在DB而不是内存中,这也说明了对于已经执行完的DAG,如果重跑其中某个task的话依然可以获取到同次DAG运行时其他task传递的内容。
看到这里,发现什么问题没?没错,由于XCom不会自己清理,所以DB里的数据会越来越多🙀。这里推荐在每次DAG执行success的最后加一个clear的操作,可以通过在DAG中加一个on_success_callback
来实现,参考code如下,请按实际情况调整filter条件:
1
2
3
4
5
6
7
8
9
10
11
12
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(context, session=None):
dag_id = context['ti']['dag_id']
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
dag = DAG( ...
on_success_callback=cleanup_xcom,
)
Push和Pull
下面来讲讲如何往XCom中添加和获取记录。上面已经说了,XCom存储的是KV形式的数据对,Airflow包装了xcom_push
和xcom_pull
两个方法,可以很方便的进行存取,定义在taskinstance.py
中:source
1
2
3
4
5
6
7
8
9
10
11
12
def xcom_push(
self,
key: str,
value: Any,
execution_date: Optional[datetime] = None) -> None:
def xcom_pull(
self,
task_ids: Optional[Union[str, Iterable[str]]] = None,
dag_id: Optional[str] = None,
key: str = XCOM_RETURN_KEY,
include_prior_dates: bool = False) -> Any:
如果没有特殊的需求,我们只需关注里面的key
和value
这两个参数即可。其他参数Airflow会根据task的上下文自动添加。看个PythonOperator
的例子更能说明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def push_data(**context):
context['ti'].xcom_push(key='test_key', value='test_val')
push_data_op = PythonOperator(
task_id = 'push_data',
python_callable = push_data,
provide_context=True,
dag = dag
)
def pull_data(**context):
test_data = context['ti'].xcom_pull(key='test_key')
pull_data_op = PythonOperator(
task_id = 'pull_data',
python_callable = pull_data,
provide_context=True,
dag = dag
)
push_data_op >> pull_data_op
上面的code就在push_data
和pull_data
两个任务中传递了key='test_key', value='test_val'
这一条数据。更方便的是,这里的value不仅限于str类型,这就提供了更大的自由度。注意,在opreator中必须要有provide_context=True,
才能在operator内部通过context['ti']
(有的文档给的例子是context['task_instance']
)获得当前task的TaskInstance,进行XCom push/pull的相关操作。
此外,如果一个task返回一个值(从operator的execute()方法,或者从PythonOperator的python_callable指定的方法),那么包含该值的XCom将被自动push,默认的key就是上面见过的return_value
。官方文档中给了下面这个例子,这里可以不指定key:
1
2
3
4
5
6
7
# inside a PythonOperator called 'pushing_task'
def push_function():
return value
# inside another PythonOperator where provide_context=True
def pull_function(**context):
value = context['ti'].xcom_pull(task_ids='pushing_task')
很好理解,这个例子是通过传递task_ids
参数来获取对应task的返回值。这里的task_ids
如果传递多个,则返回相应tasks的XCom值列表。当然,你还可以同时指定key
参数,来获取指定tasks里xcom_push的key对应的value。
Operator的do_xcom_push参数
看完上面这些,你应该对XCom的使用有了初步的了解。可能又会有这样的疑问:Airflow提供了不少预定义的Operator,方便我们直接调用来完成一些基本的task,我们一般不会去改里面的内容。那么要怎样获取这类task的输出呢?这就是这节要讲的内容了。
在BaseOperator
中提供了一个do_xcom_push
参数,它的定义如下:
1
2
3
4
5
:param do_xcom_push: if True, an XCom is pushed containing the Operator's
result
:type do_xcom_push: bool
......
do_xcom_push: bool = True,
在Operator实现中可以用这个参数来决定是否往XCom传递一些需要的信息,BaseOperator中默认值是True。找了个DatabricksSubmitRunOperator
作为实现的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#param do_xcom_push: Whether we should push run_id and run_page_url to xcom.
class DatabricksSubmitRunOperator(BaseOperator):
@apply_defaults
def __init__(
......
do_xcom_push=False,
......
):
self.do_xcom_push = do_xcom_push
def execute(self, context):
hook = self._get_hook()
self.run_id = hook.run_now(self.json)
_handle_databricks_operator_execution(self, hook, self.log, context)
XCOM_RUN_ID_KEY = 'run_id'
XCOM_RUN_PAGE_URL_KEY = 'run_page_url'
def _handle_databricks_operator_execution(operator, hook, log, context):
if operator.do_xcom_push:
context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=operator.run_id)
log.info('Run submitted with run_id: %s', operator.run_id)
run_page_url = hook.get_run_page_url(operator.run_id)
if operator.do_xcom_push:
context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url)
log.info('View run status, Spark UI, and logs at %s', run_page_url)
do_xcom_push
在这里已经被默认置成了False,是因为Operator不确定其他task是否一定需要相关内容,置成False可以不往XCom传多余的东西。当task用Operator初始化时显示指定do_xcom_push=True
时,在执行时便会把run_id
和run_page_url
push给XCom,后面的task也可以读到对应的值。
当然,也可以通过retrun value
的形式来写入XCom,例如SSHOperator
和BashOperator
的实现。(BashOperator在之前版本传的是自定义的xcom_push
参数来作为标志,新code已经统一改了。但我看现在airflow master上的code,这两个operator都没有对do_xcom_push进行判断,而是直接retrun value了。)
与template结合使用
上面提到的XCom的获取都是在Operator内部调用xcom_pull。那么,如果我要在创建task的时候就获得这个值作为task的初始参数传递进去,有没有可能办到呢?这在给一些预定义的Operator传递参数是尤其需要。答案当然是肯定的。
想在task外部获取xcom里的值,可以用template来实现,给个BashOperator的例子:
1
2
3
4
5
bash_op = BashOperator(
task_id='xcom_pull_by_template',
bash_command='echo "{{ ti.xcom_pull(task_ids="push_task", key="push_key") }}"',
dag=dag
)
使用template {{ ti.xcom_pull(...) }}
可以达到目的。但需要注意的是,{{ *** }}
必须包含在引号内才生效。直接使用是会报错的。再看下Airflow使用的jinja template
的介绍:
A jinja template is simply a text file that contains the following:
- variables and/or expressions - these get replaced with values when a template is rendered.
- tags - these control the logic of the template.
Jinja templating allows you to defer the rendering of strings in your tasks until the actual running of those tasks.
就是说,template做的是简单的字符串替换,而不关心value的实际数据类型。所以,如果是你想传的是比较复杂的数据类型的话,template是不会处理的。这里,建议可以往XCom里分别多次push,在外部再构造成需要的类型。例如,要往DatabricksSubmitRunOperator
传一个json类型的参数,可以这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
databricks_op = DatabricksSubmitRunOperator(
task_id = "databricks_task",
json = {
"new_cluster":{
"node_type_id": '{{ ti.xcom_pull(key="node_type") }}',
"autoscale":{
"min_workers": 1,
"max_workers": '{{ ti.xcom_pull(key="instance_count") }}'
}
},
"libraries" : [
{
"jar": '{{ ti.xcom_pull(key="jar") }}'
}
]
}
)
总结
最后,对XCom的使用做下总结:
- XCom用来在Airflow 各个task间进行数据共享
- XCom通过把task需要传递的信息以KV的形式存到DB中来实现数据共享
- 可以通过TaskInstance的xcom_push和xcom_push来存取数据
- 有些预定义的Operator通过do_xcom_push变量判断是否往XCom存结果
- 在task外部可以通过template的方式获取XCom里的值
今天的内容就到这了,如果觉得对你有帮助的话,请关注我的微信号,让我们共同成长进步~
本文作者:Jessychen
版权声明:本博客所有文章除特别声明外,均采用CC-BY-NC-SA 4.0 Int'l许可协议
如需转载,烦请注明出处: