AI智能
改变未来

Airflow1.10.11 之 dag 的 SSHOperator


1, 准备脚本

[root@do-airflow ~]# vi test.b.script.sh#!/bin/bashS_FILE=\"\"S_DAY=$3if [ -z $S_DAY ]; thenS_DAY=`date \'+%Y%m%d\'`ficase $2 in\"1\")S_FILE=\"/root/$S_DAY.$1.1.log\";;\"2\")S_FILE=\"/root/$S_DAY.$1.2.log\";;\"3\")S_FILE=\"/root/$S_DAY.$1.3.log\";;*)S_FILE=\"\";;esacif [[ $S_FILE == \"\" ]]; thenexitfirm -f $S_FILEI=0while true; doS_MSG=`date \"+%Y-%m-%d %H:%M:%S\"`echo $S_MSGecho $S_MSG >> $S_FILE((I=I+1))if [[ $I == 10 ]]; thenbreakfisleep 1done

2, 配置 connection

命令行方式

# 添加[root@do-airflow ~]# airflow connections -a \\--conn_id ssh.192.168.109.131 \\--conn_type SSH \\--conn_host 192.168.109.131 \\--conn_login root \\--conn_password <你的密码> \\--conn_port 22Successfully added `conn_id`=ssh.192.168.109.131 : SSH://root:******@192.168.109.131:22[root@do-airflow ~]## 显示[root@do-airflow ~]# airflow connections -l╒═══════════════════════╤═════════════╤═══════════════════╤════════╤════════════════╤══════════════════════╤═════════╕│ Conn Id               │ Conn Type   │ Host              │   Port │ Is Encrypted   │ Is Extra Encrypted   │ Extra   │╞═══════════════════════╪═════════════╪═══════════════════╪════════╪════════════════╪══════════════════════╪═════════╡│ \'ssh.192.168.109.131\' │ \'SSH\'       │ \'192.168.109.131\' │     22 │ True           │ False                │ None    │╘═══════════════════════╧═════════════╧═══════════════════╧════════╧════════════════╧══════════════════════╧═════════╛[root@do-airflow ~]## 删除[root@do-airflow ~]# airflow connections -d --conn_id ssh.192.168.109.131Successfully deleted `conn_id`=ssh.192.168.109.131[root@do-airflow ~]#

Web UI 方式

3, 准备 dag

# 安装依赖包[root@do-airflow ~]# pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple paramiko sshtunnel[root@do-airflow ~]#
[root@do-airflow ~]# vi /opt/airflow/dags/d_hello.pyimport airflowfrom airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.contrib.operators.ssh_operator import SSHOperatorfrom datetime import timedeltadefault_args = {\'owner\': \'dosrain\',\'depends_on_past\': False,\'start_date\': airflow.utils.dates.days_ago(2)}dag = DAG(dag_id=\'d_hello\',default_args=default_args,description=\'my first DAG\',schedule_interval=None)# 一期汇聚a1_operator = SSHOperator(ssh_conn_id = \'ssh.192.168.109.131\',task_id=\'a1_task\',command=\'/root/test.script.sh a 1\',dag=dag)# 一期入库a2_operator = SSHOperator(ssh_conn_id = \'ssh.192.168.109.131\',task_id=\'a2_task\',command=\'/root/test.script.sh a 2\',dag=dag)# 二期汇聚b1_operator = SSHOperator(ssh_conn_id = \'ssh.192.168.109.131\',task_id=\'b1_task\',command=\'/root/test.script.sh b 1\',dag=dag)# 二期入库b2_operator = SSHOperator(ssh_conn_id = \'ssh.192.168.109.131\',task_id=\'b2_task\',command=\'/root/test.script.sh b 2\',dag=dag)# Oracle汇聚c1_operator = SSHOperator(ssh_conn_id = \'ssh.192.168.109.131\',task_id=\'c1_task\',command=\'/root/test.script.sh c 1\',dag=dag)a1_operator>>a2_operatora1_operator>>b1_operatorb1_operator>>b2_operatora2_operator>>c1_operatorb2_operator>>c1_operator[root@do-airflow ~]# python3 /opt/airflow/dags/d_hello.py[root@do-airflow ~]# airflow list_tasks d_hello[2020-07-28 10:03:21,524] {__init__.py:50} INFO - Using executor LocalExecutor[2020-07-28 10:03:21,525] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dagsa1_taska2_taskb1_taskb2_taskc1_task[root@do-airflow ~]#

4, 触发 dag

[root@do-airflow ~]# rm -f *.log# 启用 d_hello[root@do-airflow ~]# airflow unpause d_hello[2020-07-28 10:12:16,131] {__init__.py:50} INFO - Using executor LocalExecutor[2020-07-28 10:12:16,132] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/d_hello.pyDag: d_hello, paused: False# 触发 b_hello,注意,是带参数的[root@do-airflow ~]# airflow trigger_dag -c \'{\"sday\":\"20200501\"}\' d_hello[2020-07-28 10:13:12,815] {__init__.py:50} INFO - Using executor LocalExecutor[2020-07-28 10:13:12,816] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/d_hello.pyCreated <DagRun d_hello @ 2020-07-28 10:13:12+08:00: manual__2020-07-28T10:13:12+08:00, externally triggered: True># 查看结果文件[root@do-airflow ~]# ll *.log-rw-r--r--. 1 root root 200 Jul 28 10:13 20200728.a.1.log-rw-r--r--. 1 root root 200 Jul 28 10:13 20200728.a.2.log-rw-r--r--. 1 root root 200 Jul 28 10:13 20200728.b.1.log-rw-r--r--. 1 root root 200 Jul 28 10:13 20200728.b.2.log-rw-r--r--. 1 root root 200 Jul 28 10:14 20200728.c.1.log[root@do-airflow ~]#
赞(0) 打赏
未经允许不得转载:爱站程序员基地 » Airflow1.10.11 之 dag 的 SSHOperator