1, 设计思想
有些任务的使用场景,不需要定时定期运行,只需要在特定的条件下,人工触发,顺带传入一些必要的参数,比如处理日期。
那么,我们就设计这样的使用方式:
- 传入 sday 参数,指定参数值为 20300101 日。
- dag 的每一个 task 都接收这个参数。
- 被 task 调用的 shell 脚本程序,在生成的日志文件名中,包含这个 sday 的值。
2, 编写被 dag 调用的 shell 程序
[root@do-airflow ~]# vi /root/test.script.sh#!/bin/bashS_FILE=\"\"S_DAY=$3if [ -z $S_DAY ]; thenS_DAY=`date \'+%Y%m%d\'`ficase $2 in\"1\")S_FILE=\"/root/$S_DAY.$1.1.log\";;\"2\")S_FILE=\"/root/$S_DAY.$1.2.log\";;\"3\")S_FILE=\"/root/$S_DAY.$1.3.log\";;*)S_FILE=\"\";;esacif [[ $S_FILE == \"\" ]]; thenexitfirm -f $S_FILEI=0while true; doS_MSG=`date \"+%Y-%m-%d %H:%M:%S\"`echo $S_MSGecho $S_MSG >> $S_FILE((I=I+1))if [[ $I == 10 ]]; thenbreakfisleep 1done[root@do-airflow ~]#
3, 配置 dag
[root@do-airflow ~]# vim /opt/airflow/dags/c_hello.pyimport airflowfrom airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom datetime import timedeltadefault_args = {\'owner\': \'dosrain\',\'depends_on_past\': False,\'start_date\': airflow.utils.dates.days_ago(2)}dag = DAG(dag_id=\'c_hello\',default_args=default_args,description=\'my first DAG\',schedule_interval=None)# 一期汇聚a1_operator = BashOperator(task_id=\'a1_task\',bash_command=\'/root/test.script.sh a 1 {{ dag_run.conf[\"sday\"] }}\',dag=dag)# 一期入库a2_operator = BashOperator(task_id=\'a2_task\',bash_command=\'/root/test.script.sh a 2 {{ dag_run.conf[\"sday\"] }}\',dag=dag)# 二期汇聚b1_operator = BashOperator(task_id=\'b1_task\',bash_command=\'/root/test.script.sh b 1 {{ dag_run.conf[\"sday\"] }}\',dag=dag)# 二期入库b2_operator = BashOperator(task_id=\'b2_task\',bash_command=\'/root/test.script.sh b 2 {{ dag_run.conf[\"sday\"] }}\',dag=dag)# Oracle汇聚c1_operator = BashOperator(task_id=\'c1_task\',bash_command=\'/root/test.script.sh c 1 {{ dag_run.conf[\"sday\"] }}\',dag=dag)a1_operator>>a2_operatora1_operator>>b1_operatorb1_operator>>b2_operatora2_operator>>c1_operatorb2_operator>>c1_operator[root@do-airflow ~]# python3 /opt/airflow/dags/c_hello.py[root@do-airflow ~]# airflow list_tasks c_hello[2020-07-27 15:04:15,411] {__init__.py:50} INFO - Using executor LocalExecutor[2020-07-27 15:04:15,412] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dagsa1_taska2_taskb1_taskb2_taskc1_task[root@do-airflow ~]#
4, 运行 dag
第一种运行方法,页面触发。
上述方法,我们之前就用过了,不再详细介绍。
下面我们再介绍一种新的运行方法。
第二种运行方法,命令行执行。
# 命令行触发[root@do-airflow ~]# airflow trigger_dag c_hello -c \'{\"sday\":\"20300101\"}\'Created <DagRun c_hello @ 2020-07-27 15:20:10+08:00: manual__2020-07-27T15:27:10+08:00, externally triggered: True>[root@do-airflow ~]## 等 dag 执行完成后[root@do-airflow ~]# ll *.log-rw-r--r--. 1 root root 200 Jul 27 15:24 20300101.a.1.log-rw-r--r--. 1 root root 200 Jul 27 15:24 20300101.a.2.log-rw-r--r--. 1 root root 200 Jul 27 15:24 20300101.b.1.log-rw-r--r--. 1 root root 200 Jul 27 15:24 20300101.b.2.log-rw-r--r--. 1 root root 200 Jul 27 15:24 20300101.c.1.log