电商数仓azkaban

Posted 今夜月色很美

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了电商数仓azkaban相关的知识,希望对你有一定的参考价值。

1、azkaban概论

1.1 为什么需要工作流调度系统

1)一个完整的数据分析系统通常都是由大量任务单元组成:

Shell脚本程序,Java程序,MapReduce程序、Hive脚本等

2)各任务单元之间存在时间先后及前后依赖关系

3)为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行;

1.2 常见工作流调度系统

1)简单的任务调度:直接使用Linux的Crontab来定义;

2)复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如Ooize、Azkaban、 Airflow、DolphinScheduler等。

1.3 Azkaban与Oozie对比

总体来说,Ooize相比Azkaban是一个重量级的任务调度系统,功能全面,但配置使用也更复杂。如果可以不在意某些功能的缺失,轻量级调度器Azkaban是很不错的候选对象。

各大数据调度系统比较

Xxl-jobDolphinSchedulerAzkabanAirflowOozie
定位一个轻量级分布式的任务调度框架解决数据处理流程中错综复杂的依赖关系为了解决Hadoop的任务依赖关系问题通用的批量数据处理管理Hdoop作业(job)的工作流程调度管理系统
任务类型支持Java支持传统的shell任务,同时支持大数据平台任务调度:MR、Spark、SQL(mysql、postgresql、hive/sparksql)、python、procedure、sub_processCommand、HadoopShell、Java、HadoopJava、Pig、Hive等,支持插件式扩展Python、Bash、HTTP、Mysql等,支持Operator的自定义扩展。统一调度hadoop系统中常见的mr任务启动、Java MR、Streaming MR、Pig、Hive、Sqoop、Spark、Shell等
可视化流程定义无,可配置任务级联触发是所有流定时操作都是可视化的,通过拖拽来绘制DAG,配置数据源及资源,同时对于第三方系统,提供api方式的操作。否通过自定义DSL绘制DAG并打包上传否通过python代码来绘制DAG,使用不便否 配置相关的调度任务复杂,依赖关系、时间触发、事件触发使用xml语言进行表达
任务监控支持任务状态、任务类型、重试次数、任务运行机器、可视化变量等关键信息一目了然只能看到任务状态不能直观区分任务类型任务状态、任务类型、任务运行机器、创建时间、启动时间、完成时间等。
自定义任务类型支持是需要java先开发具体执行器
暂停/恢复/补数支持暂停、恢复操作支持暂停、恢复 补数操作否只能先将工作流杀死在重新运行否只能先将工作流杀死在重新运行支持启动/停止/暂停/恢复/重新运行:Oozie支持Web,RestApi,Java API操作
高可用支持支持HA 调度中心HA和执行器HA支持HA 去中心化的多Master和多Worker通过DB支持HA -但Web Server存在单点故障风险通过DB支持HA -但Scheduler存在单点故障风险通过DB支持HA
多租户支持支持 dolphinscheduler上的用户可以通过租户和hadoop用户实现多对一或一对一的映射关系,这对大数据作业的调度是非常重要。
过载处理能力任务队列机制,轮询任务队列机制,单个机器上可调度的任务数量可以灵活配置,当任务过多时会缓存在任务队列中,不会操作机器卡死任务太多时会卡死服务器任务太多时会卡死服务器调度任务时可能出现死锁
集群扩展支持是 新注册执行器即可是 调度器使用分布式调度,整体的调度能力会随集群的规模线性正常,Master和Worker支持动态上下线是 -只Executor水平扩展是 -只Executor水平扩展

2、azkaban入门

2.1 集群模式安装

2.1.1 上传tar包

1)将azkaban-db-3.84.4.tar.gz,azkaban-exec-server-3.84.4.tar.gz,azkaban-web-server-3.84.4.tar.gz上传到h102的/opt/software路径

2)新建/opt/module/azkaban目录,并将所有tar包解压到这个目录下

mkdir /opt/module/azkaban

3)解压azkaban-db-3.84.4.tar.gz、 azkaban-exec-server-3.84.4.tar.gz和azkaban-web-server-3.84.4.tar.gz到/opt/module/azkaban目录下

tar -zxvf azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/

4)进入到/opt/module/azkaban目录,依次修改名称

mv azkaban-exec-server-3.84.4/ azkaban-exec
mv azkaban-web-server-3.84.4/ azkaban-web

2.1.2 配置MySQL

进入mysql容器,连接mysql

mysql -u root -p 

创建Azkaban数据库

create database azkaban;

创建azkaban用户并赋予权限,设置密码有效长度4位及以上

set global validate_password_length=4;

设置密码策略最低级别

set global validate_password_policy=0;

创建Azkaban用户,任何主机都可以访问Azkaban,密码是000000

CREATE USER 'azkaban'@'%' IDENTIFIED BY '000000';

赋予Azkaban用户增删改查权限

GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;

创建Azkaban表,完成后退出MySQL

use azkaban;
# 这里如果命令行执行Sql脚本失败,也可以使用如navicat连接上数据库后通过navicat运行sql脚本文件。
source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql
quit;

更改MySQL包大小;防止Azkaban连接MySQL阻塞

vim /etc/my.cnf

在[mysqld]下面加一行max_allowed_packet=1024M

[mysqld]
max_allowed_packet=1024M

重启MySQL

sudo systemctl restart mysqld
# 如果是docker部署的mysql则重启容器
docker restart 7637831cdc75 

2.1.3 配置Executor Server

Azkaban Executor Server处理工作流和作业的实际执行。

编辑azkaban.properties

vim /opt/module/azkaban/azkaban-exec/conf/azkaban.properties

修改如下属性

#...
default.timezone.id=Asia/Shanghai
#...
azkaban.webserver.url=http://h102:8081

executor.port=12321
#...
mysql.host=h103
mysql.password=000000

同步azkaban-exec到所有节点

xsync /opt/module/azkaban/azkaban-exec

必须进入到/opt/module/azkaban/azkaban-exec路径,分别在三台机器上,启动executor server

bin/start-exec.sh

注意:如果在/opt/module/azkaban/azkaban-exec目录下出现executor.port文件,说明启动成功

下面激活executor,需要

curl -G "h102:12321/executor?action=activate" && echo
curl -G "h103:12321/executor?action=activate" && echo
curl -G "h104:12321/executor?action=activate" && echo

curl -G "localhost:12321/executor?action=activate" && echo

如果三台机器都出现如下提示,则表示激活成功

"status":"success"

我这里报错

curl: (7) Failed connect to h103:12321; Connection refused

查看executor启动日志,发现有错误信息

2022/04/28 07:27:17.090 +0800 ERROR [MySQLDataSource] [Azkaban] Failed to find write-enabled DB connection. Wait 15 seconds and retry. No.Attempt = 1
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
	at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2294)
	at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:2039)
	at azkaban.db.MySQLDataSource.getConnection(MySQLDataSource.java:78)
	at org.apache.commons.dbutils.AbstractQueryRunner.prepareConnection(AbstractQueryRunner.java:175)
	at org.apache.commons.dbutils.QueryRunner.query(QueryRunner.java:286)
	at azkaban.db.DatabaseOperator.query(DatabaseOperator.java:69)
	at azkaban.executor.ExecutorDao.fetchExecutor(ExecutorDao.java:63)
	at azkaban.executor.JdbcExecutorLoader.fetchExecutor(JdbcExecutorLoader.java:291)
	at azkaban.execapp.AzkabanExecutorServer.initActive(AzkabanExecutorServer.java:278)
	at azkaban.execapp.AzkabanExecutorServer.start(AzkabanExecutorServer.java:247)
	at azkaban.execapp.AzkabanExecutorServer.launch(AzkabanExecutorServer.java:163)
	at azkaban.execapp.AzkabanExecutorServer.main(AzkabanExecutorServer.java:159)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.

更换了用户,并把mysql连接驱动mysql-connector-java版本从5.1.28更换成8.0.16还是不行,然后尴尬的发现我用docker容器启动的mysql,镜像端口虽然是3306,但是我是映射的主机端口是3310,所以修改azkaban.properties中mysql的端口为3310问题解决。

2.1.4 配置Web Server

Azkaban Web Server处理项目管理,身份验证,计划和执行触发。

1)编辑azkaban.properties

vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties

修改如下属性

...
default.timezone.id=Asia/Shanghai
...
mysql.port=3306
mysql.host=h103
mysql.user=root
mysql.password=123456
...
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus

说明:

#StaticRemainingFlowSize:正在排队的任务数;

#CpuStatus:CPU占用情况

#MinimumFreeMemory:内存占用情况。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,不执行。

2)修改azkaban-users.xml文件,添加atguigu用户

vim /opt/module/azkaban/azkaban-web/conf/azkaban-users.xml
<azkaban-users>
  <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
  <user password="metrics" roles="metrics" username="metrics"/>
  <user password="atguigu" roles="admin" username="atguigu"/>
  <role name="admin" permissions="ADMIN"/>
  <role name="metrics" permissions="METRICS"/>
</azkaban-users>

3)必须进入到h102的/opt/module/azkaban/azkaban-web路径,启动web server

bin/start-web.sh

4)访问http://h102:8081,并用atguigu用户登陆

如果是阿里云搭建的azkaban,记得配置安全组策略放开8081端口。

2.2 Work Flow案例实操

2.2.1 HelloWorld案例

1)在windows环境,新建azkaban.project文件,编辑内容如下

azkaban-flow-version: 2.0

注意:该文件作用,是采用新的Flow-API方式解析flow文件。

2)新建basic.flow文件,内容如下

nodes:
  - name: jobA
    type: command
    config:
      command: echo "Hello World"

(1)Name:job名称

(2)Type:job类型。command表示你要执行作业的方式为命令

(3)Config:job配置

3)将azkaban.project、basic.flow文件压缩到一个zip文件,文件名称必须是英文。

4)在WebServer新建项目:http://h102:8081/index

5)给项目名称命名和添加项目描述

6)first.zip文件上传

7)执行任务流

8)在job list中找到对应job的日志,在日志中,查看运行结果

2.2.2 作业依赖案例

需求:JobA和JobB执行完了,才能执行JobC

具体步骤:

1)修改basic.flow为如下内容

nodes:
  - name: jobC
    type: command
    # jobC 依赖 JobA和JobB
    dependsOn:
      - jobA
      - jobB
    config:
      command: echo "I’m JobC"

  - name: jobA
    type: command
    config:
      command: echo "I’m JobA"

  - name: jobB
    type: command
    config:
      command: echo "I’m JobB"

​ dependsOn:作业依赖,后面案例中演示

2)将修改后的basic.flow和azkaban.project压缩成second.zip文件

3)重复2.3.1节HelloWorld后续步骤。

2.2.3 自动失败重试案例

需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms

具体步骤:

1)编译配置流

nodes:
  - name: JobA
    type: command
    config:
      command: sh /not_exists.sh
      retries: 3
      retry.backoff: 10000

​ 参数说明:

​ retries:重试次数

​ retry.backoff:重试的时间间隔

2)将修改后的basic.flow和azkaban.project压缩成four.zip文件

3)重复2.3.1节HelloWorld后续步骤。

4)也可以在Flow全局配置中添加任务失败重试配置,此时重试配置会应用到所有Job。

config:
  retries: 3
  retry.backoff: 10000
nodes:
  - name: JobA
    type: command
    config:
      command: sh /not_exists.sh

2.2.4 手动失败重试案例

需求:JobA=》JobB(依赖于A)=》JobC=》JobD=》JobE=》JobF。生产环境,任何Job都有可能挂掉,可以根据需求执行想要执行的Job。

具体步骤:

1)编译配置流

nodes:
  - name: JobA
    type: command
    config:
      command: echo "This is JobA."

  - name: JobB
    type: command
    dependsOn:
      - JobA
    config:
      command: echo "This is JobB."

  - name: JobC
    type: command
    dependsOn:
      - JobB
    config:
      command: echo "This is JobC."

  - name: JobD
    type: command
    dependsOn:
      - JobC
    config:
      command: echo "This is JobD."

  - name: JobE
    type: command
    dependsOn:
      - JobD
    config:
      command: echo "This is JobE."

  - name: JobF
    type: command
    dependsOn:
      - JobE
    config:
      command: echo "This is JobF."

2)将修改后的basic.flow和azkaban.project压缩成five.zip文件

3)重复2.3.1节HelloWorld后续步骤。

Enable和Disable下面都分别有如下参数:

​ Parents:该作业的上一个任务

​ Ancestors:该作业前的所有任务

​ Children:该作业后的一个任务

​ Descendents:该作业后的所有任务

​ Enable All:所有的任务

4)可以根据需求选择性执行对应的任务。

3、Azkaban进阶

3.1 JavaProcess作业类型案例

JavaProcess类型可以运行一个自定义主类方法,type类型为javaprocess,可用的配置为:

Xms:最小堆

Xmx:最大堆

classpath:类路径

java.class:要运行的Java对象,其中必须包含Main方法

main.args:main方法的参数

案例:

1)新建一个azkaban的maven工程

2)创建包名:com.atguigu

3)创建AzTest类

package com.atguigu;

public class AzTest 
    public static void main(String[] args) 
        System.out.println("This is for testing!");
    

4)打包成jar包azkaban-1.0-SNAPSHOT.jar

5)新建testJava.flow,内容如下

nodes:
  - name: test_java
    type: javaprocess
    config:
      Xms: 96M
      Xmx: 200M
      java.class: com.atguigu.AzTest

6)将Jar包、flow文件和project文件打包成javatest.zip

7)创建项目=》上传javatest.zip =》执行作业=》观察结果

3.2 条件工作流案例

条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job的父Job输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定Job执行逻辑时获得更大的灵活性,例如,只要父Job之一成功,就可以运行当前Job。

3.2.1 运行时参数案例

1)基本原理

(1)父Job将参数写入JOB_OUTPUT_PROP_FILE环境变量所指向的文件

(2)子Job使用 $jobName:param来获取父Job输出的参数并定义执行条件

2)支持的条件运算符:

(1)== 等于

(2)!= 不等于

(3)> 大于

(4)>= 大于等于

(5)< 小于

(6)<= 小于等于

(7)&& 与

(8)|| 或

(9)! 非

3)案例:

需求:

JobA执行一个shell脚本。

JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行。

(1)新建JobA.sh

#!/bin/bash
echo "do JobA"
wk=`date +%w`
echo "\\"wk\\":$wk" > $JOB_OUTPUT_PROP_FILE

(2)新建JobB.sh

#!/bin/bash
echo "do JobB"

(3)新建condition.flow

nodes:
 - name: JobA
   type: command
   config:
     command: sh JobA.sh

 - name: JobB
   type: command
   dependsOn:
     - JobA
   config:
     command: sh JobB.sh
   condition: $JobA:wk == 1

(4)将JobA.sh、JobB.sh、condition.flow和azkaban.project打包成condition.zip

(5)创建condition项目=》上传condition.zip文件=》执行作业=》观察结果

(6)按照我们设定的条件,JobB会根据当日日期决定是否执行。

3.2.2 预定义宏案例

Azkaban中预置了几个特殊的判断条件,称为预定义宏。

预定义宏会根据所有父Job的完成情况进行判断,再决定是否执行。可用的预定义宏如下:

(1)all_success: 表示父Job全部成功才执行(默认)

(2)all_done:表示父Job全部完成才执行

(3)all_failed:表示父Job全部失败才执行

(4)one_success:表示父Job至少一个成功才执行

(5)one_failed:表示父Job至少一个失败才执行

1)案例

需求:

JobA执行一个shell脚本

JobB执行一个shell脚本

JobC执行一个shell脚本,要求JobA、JobB中有一个成功即可执行

(1)新建JobA.sh

#!/bin/bash
echo "do JobA"

(2)新建JobC.sh

#!/bin/bash
echo "do JobC"

(3)新建macro.flow

nodes:
 - name: JobA
   type: command
   config:
     command: sh JobA.sh

 - name: JobB
   type: command
   config:
     command: sh JobB.sh

 - name: JobC
   type: command
   dependsOn:
     - JobA
     - JobB
   config:
     command: sh JobC.sh
   condition: one_success

3.3 定时执行案例

需求:JobA每间隔1分钟执行一次;

具体步骤:

1)Azkaban可以定时执行工作流。在执行工作流时候,选择左下角Schedule

2)右上角注意时区是上海,然后在左面填写具体执行事件,填写的方法和crontab配置定时任务规则一致。

3)观察结果

4)删除定时调度

​ 点击remove Schedule即可删除当前任务的调度规则。

3.4 邮件报警案例

3.4.1 开启SMTP服务

在邮箱设置重开启SMTP服务并记住授权码

3.4.2 默认邮件报警案例

Azkaban默认支持通过邮件对失败的任务进行报警,配置方法如下:

1)在azkaban-web节点h102上,编辑/opt/module/azkaban/azkaban-web/conf/azkaban.properties,修改如下内容:

vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties

添加如下内容:

#这里设置邮件发送服务器,需要 申请邮箱,切开通stmp服务,以下只是例子
mail.sender=atguigu@126.com
mail.host=smtp.126.com
mail.user=atguigu@126.com
mail.password=用邮箱的授权码

2)保存并重启web-server。

bin/shutdown-web.sh
bin/start-web.sh

3)配置邮件通知

工作流执行成功,邮件发送失败,查看azkaban-web的日志文件,发现报错

 ERROR [EmailMessage] [Azkaban] Connecting to SMTP server failed, attempt: 4
javax.mail.MessagingException: Could not connect to SMTP host: smtp.qq.com, port: 25;

网上有人说可以配置mail.port=465指定端口,实测无效。

很多人说需要重新编译azkaban源码,等晚上试一下。

3.5 电话报警案例

3.5.1 第三方告警平台集成

有时任务执行失败后邮件报警接收不及时,因此可能需要其他报警方式,比如电话报警。如有类似需求,可与第三方告警平台进行集成,例如睿象云。

1)进入睿象云官网注册账号并登录

官网地址:https://www.aiops.com/

2)集成告警平台,使用Email集成

3)获取邮箱地址,后边需将报警信息发送至该邮箱

4)配置通知策略

5) 执行上一个邮件通知的案例,将通知对象改为刚刚集成第三方平台时获取的邮箱。

3.6 Azkaban多Executor模式注意事项

Azkaban多Executor模式是指,在集群中多个节点部署Executor。在这种模式下, Azkaban web Server会根据策略,选取其中一个Executor去执行任务。

为确保所选的Executor能够准确的执行任务,我们须在以下两种方案任选其一,推荐使用方案二。

方案一:指定特定的Executor(h102)去执行任务。

1)在MySQL中azkaban数据库executors表中,查询h102上的Executor的id

2)在执行工作流程时加入useExecutor属性,如下

方案二:在Executor所在所有节点部署任务所需脚本和应用。

4、流程分析

5、创建MySQL数据库和表

1)创建gmall_report数据库

2)创建表

(1)访客统计

DROP TABLE IF EXISTS ads_visit_stats;
CREATE TABLE `ads_visit_stats` (
  `dt` DATE NOT NULL COMMENT '统计日期',
  `is_new` VARCHAR(255) NOT NULL COMMENT '新老标识,1:新,0:老',
  `recent_days` INT NOT NULL COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
  `channel` VARCHAR(255) NOT NULL COMMENT '渠道',
  `uv_count` BIGINT(20) DEFAULT NULL COMMENT '日活(访问人数)',
  `duration_sec` BIGINT(20) DEFAULT NULL COMMENT '页面停留总时长',
  `avg_duration_sec` BIGINT(20)  DEFAULT NULL COMMENT '一次会话,页面停留平均时长',
  `page_count` BIGINT(20) DEFAULT NULL COMMENT '页面总浏览数',
  `avg_page_count` BIGINT(20) DEFAULT NULL COMMENT '一次会话,页面平均浏览数',
  `sv_count` BIGINT(20) DEFAULT NULL COMMENT '会话次数',
  `bounce_count` BIGINT(20) DEFAULT NULL COMMENT '跳出数',
  `bounce_rate` DECIMAL(16,2) DEFAULT NULL COMMENT '跳出率',
  PRIMARY KEY (`dt`,`recent_days`,`is_new`,`channel`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

(2)页面路径分析

DROP TABLE IF EXISTS ads_page_path;
CREATE TABLE `ads_page_path` (      
  `dt` DATE NOT NULL COMMENT '统计日期',
  `recent_days` BIGINT(20) NOT NULL COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
  `source` VARCHAR(255) DEFAULT NULL COMMENT '跳转起始页面',
  `target` VARCHAR(255) DEFAULT NULL COMMENT '跳转终到页面',
  `path_count` BIGINT(255) DEFAULT NULL COMMENT '跳转次数',
  UNIQUE KEY (`dt`,`recent_days`,`source`,`target`) USING BTREE     
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

(3)用户统计

DROP TABLE IF EXISTS ads_user_total;
CREATE TABLE `ads_user_total` (          
  `dt` DATE NOT NULL COMMENT '统计日期',
  `recent_days` BIGINT(20) NOT NULL COMMENT '最近天数,0:累积值,1:最近1天,7:最近7天,30:最近30天',
  `new_user_count` BIGINT(20) DEFAULT NULL COMMENT '新注册用户数',
  `new_order_user_count` BIGINT(20) DEFAULT NULL COMMENT '新增下单用户数',
  `order_final_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '下单总金额',
  `order_user_count` BIGINT(20) DEFAULT NULL COMMENT '下单用户数',
  `no_order_user_count` BIGINT(20) DEFAULT NULL COMMENT '未下单用户数(具体指活跃用户中未下单用户)',
  PRIMARY KEY (`dt`,`recent_days`)           
) ENGINE=INNODB DEFAULT CHARSET=utf8;

(4)用户变动统计

DROP TABLE IF EXISTS ads_user_change;
CREATE TABLE `ads_user_change` (
  `dt` DATE NOT NULL COMMENT '统计日期',
  `user_churn_count` BIGINT(20) DEFAULT NULL  COMMENT '流失用户数',
  `user_back_count` BIGINT(20) DEFAULT NULL  COMMENT '回流用户数',
  PRIMARY KEY (`dt`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

(5)用户行为漏斗分析

DROP TABLE IF EXISTS ads_user_action;
CREATE TABLE `ads_user_action` (
  `dt` DATE NOT NULL COMMENT '统计日期',
  `recent_days` BIGINT(20) NOT NULL COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
  `home_count` BIGINT(20) DEFAULT NULL COMMENT '浏览首页人数',
  `good_detail_count` BIGINT(20) DEFAULT NULL COMMENT '浏览商品详情页人数',
  `cart_count` BIGINT(20) DEFAULT NULL COMMENT '加入购物车人数',
  `order_count` BIGINT(20) DEFAULT NULL COMMENT '下单人数',
  `payment_count` BIGINT(20) DEFAULT NULL COMMENT '支付人数',
  PRIMARY KEY (`dt`,`recent_days`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

(6)用户留存率分析

DROP TABLE IF EXISTS ads_user_retention;
CREATE TABLE `ads_user_retention` (      
  `dt` DATE DEFAULT NULL COMMENT '统计日期',
  `create_date` VARCHAR(255) NOT NULL COMMENT '用户新增日期',
  `retention_day` BIGINT(20) NOT NULL COMMENT '截至当前日期留存天数',
  `retention_count` BIGINT(20) DEFAULT NULL COMMENT '留存用户数量',
  `new_user_count` BIGINT(20) DEFAULT NULL COMMENT '新增用户数量',
  `retention_rate` DECIMAL(16,2) DEFAULT NULL COMMENT '留存率',
  PRIMARY KEY (`create_date`,`retention_day`) USING BTREE        
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

(7)订单统计

DROP TABLE IF EXISTS ads_order_total;
 CREATE TABLE `ads_order_total` (   
  `dt` DATE NOT NULL COMMENT '统计日期', 
  `recent_days` BIGINT(20) NOT NULL COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
  `order_count` BIGINT(255) DEFAULT NULL COMMENT '订单数', 
  `order_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '订单金额', 
  `order_user_count` BIGINT(255) DEFAULT NULL COMMENT '下单人数',
  PRIMARY KEY (`dt`,`recent_days`)  
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

(8)各省份订单统计

DROP TABLE IF EXISTS ads_order_by_province;
CREATE TABLE `ads_order_by_province` (
  `dt` DATE NOT NULL,
  `recent_days` BIGINT(20) NOT NULL COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
  `province_id` VARCHAR(255) NOT NULL COMMENT '统计日期',
  `province_name` VARCHAR(255) DEFAULT NULL COMMENT '省份名称',
  `area_code` VARCHAR(255) DEFAULT NULL COMMENT '地区编码',
  `iso_code` VARCHAR(255) DEFAULT NULL COMMENT '国际标准地区编码',
  `iso_code_3166_2` VARCHAR(255) DEFAULT NULL COMMENT '国际标准地区编码',
  `order_count` BIGINT(20) DEFAULT NULL COMMENT '订单数',
  `order_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '订单金额',
  PRIMARY KEY (`dt`, `recent_days` ,`province_id`) USING BTREE       
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

(9)品牌复购率

DROP TABLE IF EXISTS ads_repeat_purchase;
CREATE TABLE `ads_repeat_purchase` (         
  `dt` DATE NOT NULL COMMENT '统计日期',
  `recent_days` BIGINT(20) NOT NULL COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
  `tm_id` VARCHAR(255) NOT NULL COMMENT '品牌ID',
  `tm_name` VARCHAR(255) DEFAULT NULL COMMENT '品牌名称',
  `order_repeat_rate` DECIMAL(16,2) DEFAULT NULL COMMENT '复购率',
  PRIMARY KEY (`dt` ,`recent_days`,`tm_id`)          
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

(10)商品统计

DROP TABLE IF EXISTS ads_order_spu_stats;
CREATE TABLE `ads_order_spu_stats` (
  `dt` DATE NOT NULL COMMENT '统计日期',
  `recent_days` BIGINT(20) NOT NULL COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
  `spu_id` VARCHAR(255) NOT NULL COMMENT '商品ID',
  `spu_name` VARCHAR(255) DEFAULT NULL COMMENT '商品名称',
  `tm_id` VARCHAR(255) NOT NULL COMMENT '品牌ID',
  `tm_name` VARCHAR(255) DEFAULT NULL COMMENT '品牌名称',
  `category3_id` VARCHAR(255) NOT NULL COMMENT '三级品类ID',
  `category3_name` VARCHAR(255) DEFAULT NULL COMMENT '三级品类名称',
  `category2_id` VARCHAR(255) NOT NULL COMMENT '二级品类ID',
  `category2_name` VARCHAR(255) DEFAULT NULL COMMENT '二级品类名称',
  `category1_id` VARCHAR(255) NOT NULL COMMENT '一级品类ID',
  `category1_name` VARCHAR(255) NOT NULL COMMENT '一级品类名称',
  `order_count` BIGINT(20) DEFAULT NULL COMMENT '订单数',
  `order_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '订单金额', 
  PRIMARY KEY (`dt`,`recent_days`,`spu_id`)  
) ENGINE=INNODB DEFAULT CHARSET=utf8;

(11)活动统计

DROP TABLE IF EXISTS ads_activity_stats;
CREATE TABLE `ads_activity_stats` (
  `dt` DATE NOT NULL COMMENT '统计日期',
  `activity_id` VARCHAR(255) NOT NULL COMMENT '活动ID',
  `activity_name` VARCHAR(255) DEFAULT NULL COMMENT '活动名称',
  `start_date` DATE DEFAULT NULL COMMENT '开始日期',
  `order_count` BIGINT(11) DEFAULT NULL COMMENT '参与活动订单数',
  `order_original_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '参与活动订单原始金额',
  `order_final_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '参与活动订单最终金额',
  `reduce_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '优惠金额',
  `reduce_rate` DECIMAL(16,2) DEFAULT NULL COMMENT '补贴率',
  PRIMARY KEY (`dt`,`activity_id` )
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

(12)优惠券统计

DROP TABLE IF EXISTS ads_coupon_stats;
CREATE TABLE `ads_coupon_stats` (
  `dt` DATE NOT NULL COMMENT '统计日期',
  `coupon_id` VARCHAR(255) NOT NULL COMMENT '优惠券ID',
  `coupon_name` VARCHAR(255) DEFAULT NULL COMMENT '优惠券名称',
  `start_date` DATE DEFAULT NULL COMMENT '开始日期',  
  `rule_name`  VARCHAR(200) DEFAULT NULL COMMENT '优惠规则',
  `get_count`  BIGINT(20) DEFAULT NULL COMMENT '领取次数',
  `order_count` BIGINT(20) DEFAULT NULL COMMENT '使用(下单)次数',
  `expire_count`  BIGINT(20) DEFAULT NULL COMMENT '过期次数',
  `order_original_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '使用优惠券订单原始金额',
  `order_final_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '使用优惠券订单最终金额',
  `reduce_amount` DECIMAL(16,2) DEFAULT NULL COMMENT '优惠金额',
  `reduce_rate` DECIMAL(16,2) DEFAULT NULL COMMENT '补贴率',
  PRIMARY KEY (`dt`,`coupon_id` )
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

6、Sqoop导出脚本

1)编写Sqoop导出脚本

在/root/bin目录下创建脚本hdfs_to_mysql.sh

vim hdfs_to_mysql.sh

在脚本中填写如下内容(连接信息、数据库名修改成自己的)

#!/bin/bash

hive_db_name=gmall
mysql_db_name=gmall-report

export_data() 
/opt/module/sqoop/bin/sqoop export \\
--connect "jdbc:mysql://h103:3310/$mysql_db_name?useUnicode=true&characterEncoding=utf-8"  \\
--username root \\
--password 123456 \\
--table $1 \\
--num-mappers 1 \\
--export-dir /warehouse/$hive_db_name/ads/$1 \\
--input-fields-terminated-by "\\t" \\
--update-mode allowinsert \\
--update-key $2 \\
--input-null-string '\\\\N'    \\
--input-null-non-string '\\\\N'


case $1 in
  "ads_activity_stats" )
    export_data "ads_activity_stats" "dt,activity_id"
  ;;

  "ads_coupon_stats" )
    export_data "ads_coupon_stats" "dt,coupon_id"
  ;;

  "ads_order_by_province" )
    export_data "ads_order_by_province" "dt,recent_days,province_id"
  ;;

  "ads_order_spu_stats" )
    export_data "ads_order_spu_stats" "dt,recent_days,spu_id"
  ;;

  "ads_order_total" )
    export_data "ads_order_total" "dt,recent_days"
  ;;

  "ads_page_path" )
    export_data "ads_page_path" "dt,recent_days,source,target"
  ;;

  "ads_repeat_purchase" )
    export_data "ads_repeat_purchase" "dt,recent_days,tm_id"
  ;;

  "ads_user_action" )
    export_data "ads_user_action" "dt,recent_days"
  ;;

  "ads_user_change" )
    export_data "ads_user_change" "dt"
  ;;

  "ads_user_retention" )
    export_data "ads_user_retention" "create_date,retention_day"
  ;;

  "ads_user_total" )
    export_data "ads_user_total" "dt,recent_days"
  ;;

  "ads_visit_stats" )
    export_data "ads_visit_stats" "dt,recent_days,is_new,channel"
  ;;
  "all" )
    export_data "ads_activity_stats" "dt,activity_id"
    export_data "ads_coupon_stats" "dt,coupon_id"
    export_data "ads_order_by_province" "dt,recent_days,province_id"
    export_data "ads_order_spu_stats" "dt,recent_days,spu_id"
    export_data "ads_order_total" "dt,recent_days"
    export_data "ads_page_path" "dt,recent_days,source,target"
    export_data "ads_repeat_purchase" "dt,recent_days,tm_id"
    export_data "ads_user_action" "dt,recent_days"
    export_data "ads_user_change" "dt"
    export_data "ads_user_retention" "create_date,retention_day"
    export_data "ads_user_total" "dt,recent_days"
    export_data "ads_visit_stats" "dt,recent_days,is_new,channel"
  ;;
esac

关于导出update还是insert的问题

  • –update-mode:

    updateonly 只更新,无法插入新数据

    allowinsert 允许新增

  • –update-key:允许更新的情况下,指定哪些字段匹配视为同一条数据,进行更新而不增加。多个字段用逗号分隔。

  • –input-null-string和–input-null-non-string:

分别表示,将字符串列和非字符串列的空串和“null”转义。

官网地址:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html

Sqoop will by default import NULL values as string null. Hive is however using string \\N to denote NULL values and therefore predicates dealing with NULL(like IS NULL) will not work correctly. You should append parameters --null-string and --null-non-string in case of import job or --input-null-string and --input-null-non-string in case of an export job if you wish to properly preserve NULL values. Because sqoop is using those parameters in generated code, you need to properly escape value \\N to \\\\N:

Hive中的Null在底层是以“\\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用–input-null-string和–input-null-non-string两个参数。导入数据时采用–null-string和–null-non-string。

2)执行Sqoop导出脚本

chmod 777 hdfs_to_mysql.sh
hdfs_to_mysql.sh all

7、全调度流程

7.1 数据准备

1)用户行为数据准备

(1)修改/opt/module/applog下的application.properties

#业务日期
mock.date=2022-04-12

注意:分发至其他需要生成数据的节点

xsync application.properties

(2)生成数据

lg.sh

注意:生成数据之后,记得查看HDFS数据是否存在!

(3)观察HDFS的/origin_data/gmall/log/topic_log/2022-04-12路径是否有数据

如果执行完在hdfs中没有当日数据,检查各应用进程,如:

各服务器的kafka是否启动(Kafka)
h102和h103上的日志采集flume进程是否启动(Application)
h104上的消费者flume进程是否启动(Application)

我使用的阿里云,由于内存不足的原因,kafka和flume进程都挂掉过,如果内存不足,可以先只启动hadoop、zookeeper、kafka、flume来先生成hdfs数据。

如果进程都正常,再检查是否可以从kafka中正常消费数据,如果不能正常消费数据,通常会有一些错误信息输出。

bin/kafka-console-consumer.sh --bootstrap-server h102:9092 --from-beginning --topic topic_log

2)业务数据准备

(1)修改/opt/module/db_log下的application.properties

#业务日期
mock.date=2022-04-12

(2)生成数据

java -jar gmall2020-mock-db-2021-01-22.jar

(3)观察SQLyog中order_infor表中operate_time中有2022-04-12日期的数据

7.2 编写Azkaban工作流程配置文件

1)编写azkaban.project文件,内容如下

azkaban-flow-version: 2.0

2)编写gmall.flow文件,内容如下

nodes:
  - name: mysql_to_hdfs
    type: command
    config:
     command: /root/bin/mysql_to_hdfs.sh all $dt
    
  - name: hdfs_to_ods_log
    type: command
    config:
     command: /root/bin/hdfs_to_ods_log.sh $dt
     
  - name: hdfs_to_ods_db
    type: command
    dependsOn: 
     - mysql_to_hdfs
    config: 
     command: /root/bin/hdfs_to_ods_db.sh all $dt
  
  - name: ods_to_dim_db
    type: command
    dependsOn: 
     - hdfs_to_ods_db
    config: 
     command: /root/bin/ods_to_dim_db.sh all $dt

  - name: ods_to_dwd_log
    type: command
    dependsOn: 
     - hdfs_to_ods_log
    config: 
     command: /root/bin/ods_to_dwd_log.sh all $dt
    
  - name: ods_to_dwd_db
    type: command
    dependsOn: 
     - hdfs_to_ods_db
    config: 
     command: /root/bin/ods_to_dwd_db.sh all $dt
    
  - name: dwd_to_dws
    type: command
    dependsOn:
     - ods_to_dim_db
     - ods_to_dwd_log
     - ods_to_dwd_db
    config:
     command: /root/bin/dwd_to_dws.sh all $dt
    
  - name: dws_to_dwt
    type: command
    dependsOn:
     - dwd_to_dws
    config:
     command: /root/bin/dws_to_dwt.sh all $dt
    
  - name: dwt_to_ads
    type: command
    dependsOn: 
     - dws_to_dwt
    config:
     command: /root/bin/dwt_to_ads.sh all $dt
     
  - name: hdfs_to_mysql
    type: command
    dependsOn:
     - dwt_to_ads
    config:
      command: /root/bin/hdfs_to_mysql.sh all

3)将azkaban.project、gmall.flow文件压缩到一个zip文件,文件名称必须是英文。

4)在WebServer新建项目:http://h102:8081/index

5)给项目名称命名和添加项目描述

6)gmall.zip文件上传

7)选择上传的文件

8)查看任务流

9)详细任务流展示

10)配置输入dt时间参数

11)执行成功

11)在gmall-report数据库查看结果

以上是关于电商数仓azkaban的主要内容,如果未能解决你的问题,请参考以下文章

电商数仓数仓理论

电商数仓数仓环境搭建

电商门户网站商品品类多级联动SpringBoot+Thymeleaf实现

电商数仓2.0----4.7总结

电商数仓kylin

超级淘:电商导购618全品类京券