工作流调度 Azkaban 工作流-操作HDFS和执行MR任务

操作HDFS

  • node01节点用root用户启动hadoop集群
[hadoop@node01 bin]$ su root
密码:
[root@node01 bin]#
[root@node01 bin]# cd
[root@node01 ~]# start-all.sh
  • 编写flow文件operateHdfs.flow,内容如下
nodes:
  - name: jobA
    type: command
    config:
      command: echo "start execute"
      command.1: /export/servers/hadoop-2.7.5/bin/hdfs dfs -mkdir /azkaban
      command.2: /export/servers/hadoop-2.7.5/bin/hdfs dfs -put /export/servers/hadoop-2.7.5/NOTICE.txt  /azkaban
  • 生成zip项目文件、web ui上传zip、执行flow
  • 查看HDFS结果

image-20210322230044128

MR任务

  • 记得启动hadoop的historyserver,否则执行mr项目时,job的日志会报如下类似错误日志
22-03-2021 23:17:23 CST jobMR INFO - 21/03/22 23:17:23 INFO impl.YarnClientImpl: Submitted application application_1616423563192_0001
22-03-2021 23:17:39 CST jobMR INFO - 21/03/22 23:17:39 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
22-03-2021 23:17:41 CST jobMR INFO - 21/03/22 23:17:41 INFO ipc.Client: Retrying connect to server: node01/192.168.77.30:10020. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
22-03-2021 23:17:42 CST jobMR INFO - 21/03/22 23:17:42 INFO ipc.Client: Retrying connect to server: node01/192.168.77.30:10020. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
22-03-2021 23:17:44 CST jobMR INFO - 21/03/22 23:17:44 INFO ipc.Client: Retrying connect to server: node01/192.168.77.30:10020. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)

192.168.77.30:10020 应该是hadoop集群的historyserver服务

  • 编写flow文件mr.flow,内容如下
nodes:
  - name: jobMR
    type: command
    config:
      command: /export/servers/hadoop-2.7.5/bin/hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.5.jar pi 3 3
  • 为了避免执行mr过程中,对hdfs操作的一些权限问题
[hadoop@node01 azkaban-exec-server-4.0.0]$ su root
[root@node01 azkaban-exec-server-4.0.0]# hdfs dfs -chmod -R 777 /tmp/
  • 生成zip项目文件、web ui上传zip、执行flow
  • 查看结果

image-20210322232740822

  • 可以去yarn界面看看此job的执行情况

image-20210322233043321

Views: 4

工作流调度 AZKABAN 工作流-设置邮件通知

4. 邮件报警案例

azkaban邮件报警机制,需要发件人邮箱、收件人邮箱

1、注册邮箱

处理发件人邮箱

注册一个126邮箱,此处邮箱地址是kkb1117@126.com

登录邮箱

image-20210323171128806

开启SMTP服务(simple mail transfer protocol)

image-20210323171249293

image-20210326101955357
image-20210326102021252

image-20210323171857301

已经开启

image-20210323171933035

2、邮件报警案例

node03及web服务器,配置发送告警的发件人邮箱

[hadoop@node03 conf]$ cd /export/servers/azkaban-web-server-4.0.0/conf
[hadoop@node03 conf]$ vim azkaban.properties

文件中查找mail

image-20210323175039670

修改这两个属性,同时追加两个mail属性,如下

mail.sender=kkb1117@126.com
mail.host=smtp.126.com
mail.user=kkb1117@126.com
mail.password=PDCAHXAKUDQKOQFY

mail.sender指定邮件发件人的邮箱

mail.host指定所用邮件的smtp服务器

mail.user指定用户

mail.password为注册邮箱时,给的授权密码;根据自己的实际的“授权密码”进行修改

保存,退出文件

启web server

[hadoop@node03 ~]$ cd /export/servers/azkaban-web-server-4.0.0/
[hadoop@node03 azkaban-web-server-4.0.0]$ bin/shutdown-web.sh
Killing web-server. [pid: 27659], attempt: 1
shutdown succeeded
[hadoop@node03 azkaban-web-server-4.0.0]$ bin/start-web.sh
[hadoop@node03 azkaban-web-server-4.0.0]$ jps
48256 AzkabanWebServer
48279 Jps
27624 AzkabanExecutorServer

编辑flow文件email.flow,内容如下

nodes:
  - name: jobA
    type: command
    config:
      command: echo "inform user when job sucessful or failed by sending email."

email.flowflow20.project打包生成zip文件
web ui创建项目、上传zip、执行

image-20210323180315162

image-20210323180423474

image-20210323180440612

然后去邮箱中查看邮件

image-20210323180936804

Views: 7

工作流调度 Azkaban 工作流-定时任务

定时任务

以之前的入门例子Hello World为例
利用此例子的flow、project文件,生成zip文件scheduler.zip,再创建一个项目

image-20210323142840984

image-20210323142936042

image-20210323143025517

image-20210323143038838

image-20210323143302091

image-20210323143358370

image-20210323143437913

image-20210323143458419

image-20210323143528041

删除定时调度

image-20210323143945911

*/1 * ? * *  每分钟执行一次定时调度任务
0 1 ? * *  每天晚上凌晨一点钟执行这个任务
0 */2 ? * *  每隔两个小时定时执行这个任务
30 21 ? * * 每天晚上九点半定时执行这个任务

与crontab语法相似;参考crontab语法

Views: 7

工作流调度 Azkaban 带条件的工作流

官网文档

条件工作流功能允许用户自定义条件,决定是否运行某些Job

分两种情况

  • 运行时参数:可以根据一个job之前的 job的输出,决定此job是执行还是不执行
  • 预定义宏:也可以使用基于之前的job的status预定义宏,决定此job是执行还是不执行
    在这些条件下,用户可以在确定 Job执行逻辑时获得更大的灵活性
    例如,只要父 Job 之一成功,就可以运行当前 Job
1、运行时参数

原理

  • 父 Job 将参数写入JOB_OUTPUT_PROP_FILE环境变量所指向的文件
  • 子 Job 使用 ${jobName:param}来获取父 Job 输出的参数,参数值与一个字符串或数字进行比较,来定义执行条件

支持的比较、逻辑运算符有

  • == 等于
  • != 不等于
  • > 大于
  • >= 大于等于
  • < 小于
  • <= 小于等于
  • &&
  • ||
  • !

先定义一个flow文件,包含两个job

  • jobA执行一个sh脚本,脚本将当前是星期几的值,写入文件$JOB_OUTPUT_PROP_FILE
  • jobB执行一个脚本,判断如果jobA输出的是星期二或星期四,则jobB执行

flow文件runtimeParam.flow内容如下

nodes:
  - name: JobA
    type: command
    config:
      command: sh JobA.sh
  - name: JobB
    type: command
    dependsOn:
      - JobA
    config:
      command: sh JobB.sh
    condition: ${JobA:dayOfTheWeek} == 2 || ${JobA:dayOfTheWeek} == 4

JobA.sh内容如下

#!/bin/bash
echo "do JobA"
dayOfTheWeek=`date +%w`
echo "{\"dayOfTheWeek\":$dayOfTheWeek}" > $JOB_OUTPUT_PROP_FILE

运行时参数需要重定向到文件$JOB_OUTPUT_PROP_FILE

  • JobB.sh内容如下
#!/bin/bash
echo "do JobB"

JobA.shJobB.shruntimeParam.flowflow20.project 打包成runtimeParam.zip

web ui创建项目、上传zip、执行flow

image-20210325171031676

运行此flow时,是周二,所以符合执行jobB的条件,所以发现jobA、jobB都执行成功

image-20210325171138861
image-20210323114056229
image-20210323115042974

如果不是周二或周日,那么jobB执行失败,如下

image-20210325171350641
image-20210323114850913
2、预定义宏

Azkaban 中预置了几个预定义宏。
预定义宏会根据当前job的所有父 Job 的完成情况进行判断,再决定是否执行当前job。
可用的预定义宏如下:

  • 1、all_success: 表示父 Job 全部成功才执行(默认)
  • 2、all_done:表示父 Job 全部完成才执行
  • 3、all_failed:表示父 Job 全部失败才执行
  • 4、one_success:表示父 Job 至少一个成功才执行
  • 5、one_failed:表示父 Job 至少一个失败才执行

每个宏对应的job 状态如下

  • all_done: FAILED, KILLED, SUCCEEDED, SKIPPED, FAILED_SUCCEEDED, CANCELLED
  • all_success / one_success: SUCCEEDED, SKIPPED, FAILED_SUCCEEDED
  • all_failed / one_failed: FAILED, KILLED, CANCELLED

案例:flow中有3个job

  • jobA执行一个sh脚本
  • jobB执行一个sh脚本
  • jobC执行一个sh命令,只要jobA或jobB任一个成功,jobC才执行

flow文件macro.flow内容如下

nodes:
  - name: JobA
    type: command
    config:
      command: sh JobA.sh
  - name: JobB
    type: command
    config:
      command: sh JobB.sh
  - name: JobC
    type: command
    dependsOn:
      - JobA
      - JobB
    config:
      command: echo 'This is jobC'
    condition: one_success

JobB.shmacro.flowflow20.project 打包成macro.zip

  • 注意JobB.sh跟上个例子相同的文件
  • 注意:生成zip文件时,故意没有将JobA.sh打包到zip文件;这样JobA肯定执行失败

web ui创建项目、上传zip、执行flow

image-20210323121331536
image-20210325171515976

发现JobA失败;JobB、JobC成功;查看Job List

image-20210325171636151
image-20210323121543644
image-20210323121614626
3. 运行时参数结合预定义宏

指定job是否执行的条件,除了单独由一个“运行时参数”或一个“预定义宏”指定外
还可以用两类的组合结果指定条件:

  • 即若干个“运行时参数”
  • 再加上一个“预定义宏”
  • 通过逻辑运算,根据最终结果是true还是false,决定job是否执行

举例:

  • JobA的输出参数param1的值等于1成立
  • 并且JobB的输出参数param2大于5成立
  • 两个条件都成立true && true
  • 整个条件结果是true,那么当前job才执行
condition: ${JobA:param1} == 1 && ${JobB:param2} > 5

举例:当前job的父 Job 至少一个成功才执行,当前job才被执行

condition: one_success

举例:

  • 当前job的所有父 Job 全部完成,成立(true)
  • JobC的输出参数param3的值不等于foo,成立(true)
  • 两个条件都成立true && true
  • 整个条件结果是true,那么当前job才执行
condition: all_done && ${JobC:param3} != "foo"

举例:

  • 根据{JobD:param4}{JobE:parm5}all_success${JobF:parm6} == "bar"进行复杂的逻辑运算后
  • 如果最终结果是true
  • 那么,当前job才执行
condition: (!{JobD:param4} || !{JobE:parm5}) && all_success || ${JobF:parm6} == "bar"

flow文件mixtureCondition.flow,内容如下

nodes:
 - name: JobA
   type: command
   config:
     command: sh write_to_props.sh

 - name: JobB
   type: command
   dependsOn:
     - JobA
   config:
     command: echo “This is JobB.”
   condition: ${JobA:jobAParam1} == 1

 - name: JobC
   type: command
   dependsOn:
     - JobA
   config:
     command: echo “This is JobC.”
   condition: ${JobA:jobAParam1} == 2

 - name: JobD
   type: command
   dependsOn:
     - JobB
     - JobC
   config:
     command: pwd
   condition: one_success

write_to_props.sh内容如下

#!/bin/bash
echo '{"jobAParam1":"1"}' > $JOB_OUTPUT_PROP_FILE

mixtureCondition.flowflow20.projectwrite_to_props.sh打包生成zip

web ui创建项目、上传zip、执行flow

image-20210325171846367
image-20210325172014251

Views: 10

工作流调度 Azkaban 工作流-执行Java任务

执行Java任务

type 类型为 javaprocess的job,可以运行一个自定义Java类的main方法,可用的配置如下:

  • Xms:最小堆
  • Xmx:最大堆
  • classpath:类路径
  • java.class:要运行的 Java 对象,其中必须包含 Main 方法
  • main.args: main 方法的参数

案例:

  • 1、新建一个 azkaban 的 maven 工程
  • 2、创建包名: com.kkb.azkaban
  • 3、包中创建 JavaProcessTest 类
package com.kkb.azkaban;

public class JavaProcessTest {
  public static void main(String[] args) {
      System.out.println("This is " + args[0] + " javaprocess job type test!");
  }
}

image-20210323101748116

代码打包,生成jar包

image-20210323102431253

编写flow文件javaProcessTest.flow,内容如下

nodes:
- name: testJavaProcess
  type: javaprocess
  config:
    Xms: 96M
    Xmx: 200M
    java.class: com.kkb.azkaban.JavaProcessTest
    main.args: MyAzkaban

将jar包、flow文件、project文件压缩生成zip文件

web ui创建工程、上传zip、执行flow

image-20210323102717828

image-20210323102736081

image-20210323102811209

image-20210323102833439

image-20210323103156129

Views: 6