Oozie4.3.1各种Action及综合实例

Posted fansy1990

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Oozie4.3.1各种Action及综合实例相关的知识,希望对你有一定的参考价值。

Oozie4.3.1各种Action及综合实例

如果没有编译好Oozie,并部署到集群,请参考:Build Oozie4.3.1 on Hadoop3.X及Hadoop生态圈Action调用(一)

  • 本篇主要是针对Hadoop生态圈Action的调用。

1. 准备工作

  • 确保Hadoop集群启动;
  • 确保Oozie启动;
  • 确保HiveServer2启动(测试主要使用Hiveserver2);
  • 并且参考上篇把相关examples资源上传到HDFS的/user/root/examples目录;

2. Hive2 Action

2.1 准备工作

先读取Hive2的example的代码意思:
oozie-4.3.1/examples/apps/hive2/workflow.xml : 这个代码是一个通用的调用Hive的代码,这里就不贴了。
oozie-4.3.1/examples/apps/hive2/script.q:

-- 删除test表
DROP TABLE IF EXISTS test;
-- 根据路径创建外部表 test
CREATE EXTERNAL TABLE test (a INT) STORED AS TEXTFILE LOCATION '$INPUT';
-- 把外部表test的数据导出到HDFS
INSERT OVERWRITE DIRECTORY '$OUTPUT' SELECT * FROM test;

其中$INPUT的值在workflow.xml以及job.properties中有,job.properties设置为:

# 集群相关的配置,需修改为实际地址
nameNode=hdfs://master:8020 
jobTracker=master:8032
queueName=default
jdbcURL=jdbc:hive2://master:10000/default
examplesRoot=examples

oozie.use.system.libpath=true
oozie.wf.application.path=$nameNode/user/$user.name/$examplesRoot/apps/hive2

匹配后,输入为: /user/root/examples/input-data/table/
输出为:/user/root/example/output-data/hive2
由于这里并没有测试数据,同时根据script.q的创建test表的代码来看,可以只需要用一列数值即可作为其输入,比如(命名为data.txt):

1
2
3
4
5

上传到HDFS

hdfs dfs -put data.txt /user/root/examples/input-data/table

2.2 执行

在oozie-4.3.1/examples/apps/hive2执行命令:

oozie job --oozie http://master:11000/oozie -config job.properties -run

2.3 查看结果

2.3.1 Oozie监控

首先,可以看到任务在Oozie上面的结果:

从图中可以看到任务的发起时间及结束时间,及任务运行状态,可以看到是成功执行并完成的。

2.3.2 YARN 监控

Oozie发起Hive任务的时候,会首先发起一个MapReduce的任务,然后在这个任务里面再次发起另外的一个MapReduce任务(这个主要去执行hive的SQL),只有当hive sql的MapReduce任务执行完成之后,Oozie的MapReduce才会完成。

从图中可以看到两个任务的Name,和发起的任务也有很大的关系。其次是两个任务开始的时间和结束的时间。

2.3.3 输出

任务输出有两个,第一个,在hive中应该可以看到一个test表,同时查看数据,应该是1,2,3,4,5 ;

第二个,在HDFS的输出目录应该会有对应的数据产生,同时数据也是一样的。

3. Pig Action

3.1 准备工作

oozie-4.3.1/examples/apps/pig/workflow.xml :

<script>id.pig</script>
            <param>INPUT=/user/$wf:user()/$examplesRoot/input-data/text</param>
            <param>OUTPUT=/user/$wf:user()/$examplesRoot/output-data/pig</param>

从上面可以看到输入以及要执行的pig脚本,输入是在、/user/root/examples/input-data/text;
oozie-4.3.1/examples/apps/pig/id.pig:

A = load '$INPUT' using PigStorage(':');
B = foreach A generate $0 as id;
store B into '$OUTPUT' USING PigStorage();

该脚本就是从INPUT中加载数据到关系A(字段间分隔符用冒号:),然后取得里面的第一个字段的值并存入关系B,把关闭B的数据存储到HDFS的Output目录;
通过观察发现输入数据的是文本,不适合作为id.pig的输入,所以修改下,简单示例如下(假设名字为data.txt):

1:a
2:b
3:c

然后存入HDFS:

hdfs dfs -rm -r /user/root/examples/input-data/text/data.txt
hdfs dfs -put data.txt /user/root/examples/input-data/text/

3.2 执行

在oozie-4.3.1/examples/apps/pig执行命令:

oozie job --oozie http://master:11000/oozie -config job.properties -run

3.3 查看输出

由于这里的监控和Hive的类似,这里不再展开介绍。

3.3.1 Oozie监控

3.3.2 YARN监控

3.3.3 HDFS监控

4. Spark Action

4.1 准备工作

oozie-4.3.1/examples/apps/spark/job.properties:

nameNode=hdfs://master:8020
jobTracker=master:8032
master=yarn-cluster
queueName=default
examplesRoot=examples
oozie.use.system.libpath=true
oozie.wf.application.path=$nameNode/user/$user.name/$examplesRoot/apps/spark

特别注意,把master改为yarn-cluster ,使用这种模式来运行,当然也可以使用其他模式,具体可参考:http://oozie.apache.org/docs/4.3.1/DG_SparkActionExtension.html#Spark_on_YARN
其运行的代码如下所示:

public final class SparkFileCopy 

    public static void main(String[] args) throws Exception 

        if (args.length < 2) 
            System.err.println("Usage: SparkFileCopy <file> <file>");
            System.exit(1);
        

        SparkConf sparkConf = new SparkConf().setAppName("SparkFileCopy");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        JavaRDD<String> lines = ctx.textFile(args[0]);
        lines.saveAsTextFile(args[1]);
        System.out.println("Copied file from " + args[0] + " to " + args[1]);
        ctx.stop();
    

这个代码就是读取输入,然后原封不动的写入到输出目录,其实就是复制。

4.2 运行

在oozie-4.3.1/examples/apps/spark执行命令:

oozie job --oozie http://master:11000/oozie -config job.properties -run

4.3 查看结果

4.3.1 运行异常


这里应该是缺少YARN的jar包,进行下面的处理:

hdfs dfs -mv /user/root/share/lib/lib_20180809104438/spark /user/root/share/lib/lib_20180809104438/spark_bak
root@master:/usr/local/spark-2.3.0-bin-hadoop2.7# hdfs dfs -mkdir /user/root/share/lib/lib_20180809104438/spark
root@master:/usr/local/spark-2.3.0-bin-hadoop2.7# hdfs dfs -put /usr/local/spark-2.3.0-bin-hadoop2.7/jars/* /user/root/share/lib/lib_20180809104438/spark/
root@master:/usr/local/spark-2.3.0-bin-hadoop2.7# hdfs dfs -cp /user/root/share/lib/lib_20180809104438/spark_bak/oozie* /user/root/share/lib/lib_20180809104438/spark/
Deleted /user/root/share/lib/lib_20180809104438/spark/zkclient-0.3.jar
root@master:/usr/local/spark-2.3.0-bin-hadoop2.7# oozie admin -sharelibupdate
[ShareLib update status]
    sharelibDirOld = hdfs://master:8020/user/root/share/lib/lib_20180809104438
    host = http://master:11000/oozie
    sharelibDirNew = hdfs://master:8020/user/root/share/lib/lib_20180809104438
    status = Successful

同时,你可以看到其实我Spark安装的版本是2.3的,但是oozie匹配的是2.1 。这个暂时还没出现问题。这段代码解决参考:https://stackoverflow.com/questions/30904316/sparkaction-for-yarn-cluster
处理完成之后再次运行,即可正确运行程序;

4.3.2 Oozie监控

4.3.3 YARN监控

4.3.4 HDFS监控

5. 综合实例

建立一个综合的例子,把spark、pig、MapReduce、Hive都整合起来。

5.1 总流程

5.2 相关资源准备

5.2.1 需编辑资源:

  1. job.properties:
oozie.use.system.libpath=true

nameNode=hdfs://master:8020
resourceManager=master:8032
queueName=default
examplesRoot=examples

oozie.wf.application.path=$nameNode/user/$user.name/$examplesRoot/apps/all/workflow.xml
mr_inputDir=$nameNode/user/$user.name/$examplesRoot/input-data/all
mr_outputDir=$nameNode/user/$user.name/$examplesRoot/output-data/all/mr-output
mr_reducer_num=2
mr_arg_num=2

pig_inputDir=$mr_outputDir
pig_outputDir=$nameNode/user/$user.name/$examplesRoot/output-data/all/pig-output

hive_inputDir=$pig_outputDir
hive_tableName=wordcount_top
jdbcURL=jdbc:hive2://master:10000/default

spark_inputTable=$hive_tableName
spark_outputTable=wordcount_top_10
spark_column=num
spark_hive_file=$nameNode/user/$user.name/$examplesRoot/apps/all/hive-site.xml
master=yarn-cluster
  1. workflow.xml:
<workflow-app xmlns="uri:oozie:workflow:0.2" name="all-wf">
    <start to="mr-node"/>
   <action name="mr-node">
        <map-reduce>
            <job-tracker>$resourceManager</job-tracker>
            <name-node>$nameNode</name-node>
            <prepare>
                <delete path="$mr_outputDir"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>$queueName</value>
                </property>
        <property>
            <name>mapred.mapper.new-api</name>
            <value>true</value>
        </property>
        <property>
            <name>mapred.reducer.new-api</name>
            <value>true</value>
        </property>
                <property>
            <name>mapreduce.job.map.class</name>
                    <value>demo.fansy.WordCountMapper</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>demo.fansy.WordCountReducer</value>
                </property>
        <property>
            <name>mapreduce.job.inputformat.class</name>
            <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
        </property>
        <property>
            <name>mapreduce.job.outputformat.class</name>
            <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
        </property>
        <property>
            <name>mapreduce.job.output.key.class</name>
            <value>org.apache.hadoop.io.Text</value>
        </property>
        <property>
            <name>mapreduce.job.output.value.class</name>
            <value>org.apache.hadoop.io.IntWritable</value>
        </property>
        <property>
            <name>mapreduce.job.reduces</name>
            <value>$mr_reducer_num</value>
        </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>$mr_inputDir</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>$mr_outputDir</value>
                </property>
        <property>
            <name>arg.num</name>
            <value>$mr_arg_num</value>
        </property>
            </configuration>
        </map-reduce>
        <ok to="pig-node"/>
        <error to="fail"/>
    </action>
    <action name="pig-node">
        <pig>
            <job-tracker>$resourceManager</job-tracker>
            <name-node>$nameNode</name-node>
            <prepare>
                <delete path="$pig_outputDir"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>$queueName</value>
                </property>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <script>change_splitter.pig</script>
            <param>INPUT=$pig_inputDir</param>
            <param>OUTPUT=$pig_outputDir</param>
        </pig>
        <ok to="hive2-node"/>
        <error to="fail"/>
    </action>   
    <action name="hive2-node">
        <hive2 xmlns="uri:oozie:hive2-action:0.1">
            <job-tracker>$resourceManager</job-tracker>
            <name-node>$nameNode</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>$queueName</value>
                </property>
            </configuration>
            <jdbc-url>$jdbcURL</jdbc-url>
            <script>create_table.hive</script>
            <param>hive_inputDir=$hive_inputDir</param>
            <param>hive_tableName=$hive_tableName</param>
        </hive2>
        <ok to="spark-node"/>
        <error to="fail"/>
    </action>
    <action name='spark-node'>
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>$resourceManager</job-tracker>
            <name-node>$nameNode</name-node>
            <master>$master</master>
            <name>find top 10 words</name>
            <class>demo.TopTen</class>
            <jar>$nameNode/user/$wf:user()/$examplesRoot/apps/all/lib/demo.jar</jar>
            <spark-opts>--files $spark_hive_file</spark-opts>
        <arg>$spark_inputTable</arg>
            <arg>$spark_outputTable</arg>
            <arg>$spark_column</arg>
        </spark>
        <ok to="end" />
        <error to="fail" />
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[$wf:errorMessage(wf:lastErrorNode())]</message>
    </kill>
    <end name="end"/>
</workflow-app>
  1. change_splitter.pig
A = load '$INPUT' using PigStorage('\\t');
B = foreach A generate $1 as num,$0 as word;
store B into '$OUTPUT' USING PigStorage(',');
  1. create_table.hive
DROP TABLE IF EXISTS $hive_tableName;
CREATE TABLE $hive_tableName (num INT, word STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' STORED AS TEXTFILE ;
LOAD DATA INPATH '$hive_inputDir' OVERWRITE INTO TABLE $hive_tableName;
  1. WordCountMapper.java:
package demo.fansy;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * @Author: fansy
 * @Time: 2018/8/9 17:22
 * @Email: fansy1990@foxmail.com
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> 
    private Logger logger = LoggerFactory.getLogger(WordCountMapper.class);
    private int num = 0;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException 
        num = context.getConfiguration().getInt("arg.num",-1);
        logger.info("num:",num);
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String line = value.toString();

        StringTokenizer itr = new StringTokenizer(line);
        while (itr.hasMoreTokens()) 
            word.set(itr.nextToken());
            context.write(word, one);
        
    

  1. WordCountReducer.java:
package demo.fansy;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * @Author: fansy
 * @Time: 2018/8/9 17:27
 * @Email: fansy1990@foxmail.com
 */
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
    private IntWritable result = new IntWritable();
    private Logger logger = LoggerFactory.getLogger(WordCountReducer.class);
    private int num = 0;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException 
        num = context.getConfiguration().getInt("arg.num",-1);
        logger.info("reducer num:",num);
    
    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
    ) throws IOException, InterruptedException 
        int sum = 0;
        for (IntWritable val : values) 
            sum += val.get();
        
        if(sum> num) 
            result.set(sum);
            context.write(key, result);
        
    

TopTen.scala

package demo

import org.apache.spark.sql.SparkSession

/**
  * //@Author: fansy 
  * //@Time: 2018/8/9 17:32
  * //@Email: fansy1990@foxmail.com
  */
object TopTen 
  def main(args: Array[String]): Unit = 
    if (args.length < 3) 
      System.err.println("Usage: TopTen <inputTable> <outputTable> <column>" )
      System.exit(1)
    
    val input = args(0)
    val output = args(1)
    val column = args(2)
    val spark = SparkSession.builder().enableHiveSupport().appName("Top Ten :"+ input).enableHiveSupport().getOrCreate()
    val data = spark.sql("select * from " + input +" order by "+ column +" desc limit 10")
    val tmpTable = "tmp" + System.currentTimeMillis()
    data.createOrReplaceTempView(tmpTable)
    spark.sql("drop table if exists "+output)
    spark.sql("create table " + output + " as select * from " + tmpTable)
    System.out.println("read from " + input + " to " + output)
    spark.stop()
  

5.2.2 资源上传

  1. 把WordCountMapper、WordCountReducer、TopTen打成jar包,比如名字叫demo.jar ,如下:
  2. 把文件workflow.xml , create_table.hive , change_splitter.pig 上传到HDFS 路径:/user/root/examples/apps/all/下面;
  3. 把demo.jar 上传到HDFS的 /user/root/examples/apps/all/lib/下面;
  4. 把hive-site.xml(可以在Hive安装路径/conf下面找到)上传到HDFS的/user/root/examples/apps/all/路径下面(这个路径需要和job.properties中对应);
  5. 把oozie下面的oozie-4.3.1/examples/input-data/text/data.txt 上传到HDFS的/user/root/examples/input-data/all下面,作为整个流程的输入;

5.3 运行:

oozie job --oozie http://master:11000/oozie -config job.properties -run

5.4 监控:

5.4.1 oozie :


所有发起的MR/Spark任务:

5.4.2 Mapreduce

在MapReduce任务中,传入了一个参数2,用于指定最小的出现次数,可以在MapReduce日志中看到这个参数:

同时设置reduce的个数为2,也可以看出:

5.4.3 Pig

由于Pig输出的数据导入了Hive,所以数据会经过转移,所以可以查看其在Hive数据中的存储即可,如下:

5.4.4 Hive

hive主要是建表,可以通过查看表数据得到:

通过上面可以看到数据一共有12条记录,所以也可以通过对比spark的输出,来看spark任务做了哪些工作。

5.4.5

Spark 主要是求top10 ,所以这里有两个工作一个是排序,一个是前10,其结果也是存在Hive表的,所以直接查看Hive表即可。

以上是关于Oozie4.3.1各种Action及综合实例的主要内容,如果未能解决你的问题,请参考以下文章

Build Oozie4.3.1 on Hadoop3.X及Hadoop生态圈Action调用

Build Oozie4.3.1 on Hadoop3.X及Hadoop生态圈Action调用

C# WPF通过反射及Ioc容器综合实例

vuex基础详解及项目实例

shell综合实例

Android开发之Intent.Action Android中Intent的各种常见作用【转】