1, 准备脚本
[root@do-airflow ~]# vi test.b.script.sh#!/bin/bashS_FILE=\"\"S_DAY=$3if [ -z $S_DAY ]; thenS_DAY=`date \'+%Y%m%d\'`fiS_FILE=\"/root/$S_DAY.$1.$2.log\"rm -f $S_FILEI=0while true; doS_MSG=`date \"+%Y-%m-%d %H:%M:%S\"`echo $S_MSGecho $S_MSG >> $S_FILE((I=I+1))if [[ $I == 10 ]]; thenbreakfisleep 1done[root@do-airflow ~]#
2, 准备 dag
[root@do-airflow ~]# vi /opt/airflow/dags/b_hello.pyimport airflowfrom airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom datetime import timedeltadefault_args = {\'owner\': \'dosrain\',\'depends_on_past\': False,\'start_date\': airflow.utils.dates.days_ago(2)}dag = DAG(dag_id=\'b_hello\',default_args=default_args,description=\'my first DAG\',schedule_interval=None)# 一期汇聚a1_operator = BashOperator(task_id=\'a1_task\',bash_command=\'/root/test.c.script.sh a 1 {{ dag_run.conf[\"sday\"] }}\',dag=dag)[root@do-airflow ~]# python3 /opt/airflow/dags/b_hello.py[root@do-airflow ~]# airflow list_tasks b_hello[2020-07-24 17:05:49,937] {__init__.py:50} INFO - Using executor LocalExecutor[2020-07-24 17:05:49,939] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dagsa1_task[root@do-airflow ~]#
3, 触发 dag
3.1, Web UI 方式
3.2, 命令行方式
[root@do-airflow ~]# rm -f *.log# 启用 b_hello[root@do-airflow ~]# airflow unpause b_hello[2020-07-24 17:11:51,126] {__init__.py:50} INFO - Using executor LocalExecutor[2020-07-24 17:11:51,127] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/b_hello.pyDag: b_hello, paused: False# 触发 b_hello,注意,是带参数的[root@do-airflow ~]# airflow trigger_dag -c \'{\"sday\":\"20200401\"}\' b_hello[2020-07-24 17:14:47,195] {__init__.py:50} INFO - Using executor LocalExecutor[2020-07-24 17:14:47,197] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/b_hello.pyCreated <DagRun b_hello @ 2020-07-24 17:14:47+08:00: manual__2020-07-24T17:14:47+08:00, externally triggered: True># 查看结果文件[root@do-airflow ~]# ll *.log-rw-r--r--. 1 root root 200 Jul 24 17:14 20200401.a.1.log[root@do-airflow ~]#
3.3, REST Api 方式
参考链接:
https://airflow.apache.org/docs/stable/security.html
https://airflow.apache.org/docs/stable/rest-api-ref.html
[root@do-airflow ~]# rm -f *.log# 修改后端认证方式[root@do-airflow ~]# vi /opt/airflow/airflow.cfgauth_backend = airflow.api.auth.backend.default# 重启 airflow 的 web 服务[root@do-airflow ~]# systemctl restart airflow-webserver# 发起测试 rest api 是否正常的 get 请求[root@do-airflow ~]# curl http://192.168.109.131:8080/api/experimental/test{\"status\":\"OK\"}# 获取 b_hello 的执行历史[root@do-airflow ~]# curl http://192.168.109.131:8080/api/experimental/dags/b_hello/dag_runs[{\"dag_id\":\"b_hello\",\"dag_run_url\":\"/admin/airflow/graph?dag_id=b_hello&execution_date=2020-07-24+17%3A14%3A47%2B08%3A00\",\"execution_date\":\"2020-07-24T17:14:47+08:00\",\"id\":3,\"run_id\":\"manual__2020-07-24T17:14:47+08:00\",\"start_date\":\"2020-07-24T17:14:47.212520+08:00\",\"state\":\"success\"}]# 触发 b_hello[root@do-airflow ~]# curl -X POST \\http://192.168.109.131:8080/api/experimental/dags/b_hello/dag_runs \\-H \'Cache-Control: no-cache\' \\-H \'Content-Type: application/json\' \\-d \'{\"conf\":\"{\\\"sday\\\":\\\"20400101\\\"}\"}\'{\"execution_date\":\"2020-07-24T17:26:33+08:00\",\"message\":\"Created <DagRun b_hello @ 2020-07-24 17:26:33+08:00: manual__2020-07-24T17:26:33+08:00, externally triggered: True>\",\"run_id\":\"manual__2020-07-24T17:26:33+08:00\"}# 查看结果文件[root@do-airflow ~]# ll *.log-rw-r--r--. 1 root root 200 Jul 24 17:26 20400101.a.1.log