HADOOP MapReduce 处理 Spark 抽取的 Hive 数据解决方案一
Posted 辉常努腻
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HADOOP MapReduce 处理 Spark 抽取的 Hive 数据解决方案一相关的知识,希望对你有一定的参考价值。
开端:
今天咱先说问题,经过几天测试题的练习,我们有从某题库中找到了新题型,并且成功把我们干趴下,昨天今天就干了一件事,站起来。
沙问题?
java mapeduce 清洗 hive 中的数据 ,清晰之后将driver代码 进行截图提交。
坑号1:
spark之前抽取的数据是.parquet格式的, 对 mapreduce 不太友好,我决定从新抽取, 还是用spark技术,换一种文件格式
坑号2:
使用新方法进行sink的时候我是直接like别的现成表结构折磨干的,后来hive分割字段都TM乱套啦,赞看看!
需求:
1.使用scala+spark技术实现抽取mysql到Hive中
2.使用java+ Mapeduce 技术实现清洗Hive数据
问题产生:
- Mapeduce 无法 正常读取Hive数据
- Mapeduce 无法 正常将结果sink到Hive中
解决路线:
首先从spark入手
为了解决spark写入hive后文件格式为 .parquet 问题
首先我们需要创建一个表,至于为什么不用自动建表,是因为自动建表 spark使用的是.parquet文件格式存储的
hive> CREATE TABLE `ods.region2`(
> `regionkey` string,
> `name` string,
> `comment` string)
> PARTITIONED BY (
> `etldate` string
> )
> row format delimited
> fields terminated by '|' ;
OK
Time taken: 0.055 seconds
spark sink hive 部分代码
spark.sql("select *,'20220616' as etldate from data ")
.write
.partitionBy("etldate")
.mode(saveMode = SaveMode.Overwrite)
.format("hive")
.option("delimiter","|")
.insertInto("ods.region2")
重点是这两条
.format("hive")
.insertInto("ods.region2")
我们看一下写好的数据
hdfs dfs -cat /user/hive/warehouse/ods.db/region2/etldate=20220616/*
3|EUROPE|ly final courts cajole furiously final excuse
4|MIDDLE EAST|uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl
0|AFRICA|lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to
1|AMERICA|hs use ironic, even requests. s
2|ASIA|ges. thinly even pinto beans ca
可以正常编写和运行java mapReduce 代码啦
代码不再一一贴出,放一个driver把
<groupId>org.li</groupId>
<artifactId>mapreduce_06-21</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<parquet.version>1.8.1</parquet.version>
<!-- JDateTime 依赖 -->
<jodd.version>3.3.8</jodd.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.6</version>
</dependency>
<!-- parquet-hadoop -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>$parquet.version</version>
</dependency>
<!-- jodd -->
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd</artifactId>
<version>$jodd.version</version>
</dependency>
</dependencies>
package com.li.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.parquet.hadoop.ParquetInputFormat;
//import org.apache.parquet.hadoop.ParquetInputFormat;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class HiveDriver
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException
System.setProperty("HADOOP_USER_NAME","root");
System.out.println("删除本地目录" + new File("/home/rjxy/output").delete());
Configuration configuration = new Configuration();
configuration.set("dfs.client.use.datanode.hostname","true");
//hadoop配值文件
//获取i工作势力
Job instance = Job.getInstance(configuration);
//关联driver
instance.setJarByClass(HiveDriver.class);
//关联mapper reduce
instance.setMapperClass(HiveMapper.class);
instance.setReducerClass(HiveReduce.class);
//设置map输出的kv类型
instance.setMapOutputKeyClass(LongWritable.class);
instance.setMapOutputValueClass(Text.class);
//设置最终的输入输出类型
instance.setOutputKeyClass(NullWritable.class);
instance.setOutputValueClass(Text.class);
//Parquet
// instance.setInputFormatClass();
// instance.setInputFormatClass(ParquetInputFormat.class);
//设置输入输出路径
FileInputFormat.setInputPaths(instance,new Path("hdfs://master:9000/user/hive/warehouse/ods.db/" + "region2" + "/*/*"));
Path outputDir = new Path("hdfs://master:9000/test4");
// Path outputDir = new Path("/home/rjxy/output");
FileOutputFormat.setOutputPath(instance, outputDir);
//7 提交job
boolean result = instance.waitForCompletion(true);
System.exit(result?0:1);
这些代码就包含啦我的resource 数据信息 sink 位置
现在看一下怎么将hdfs数据进行load进hive表中
先建好表
hive> CREATE TABLE `ods.region2`(
> `regionkey` string,
> `name` string,
> `comment` string)
> PARTITIONED BY (
> `etldate` string
> )
> row format delimited
> fields terminated by '|' ;
OK
Time taken: 0.055 seconds
LOAD DATA INPATH '/test2' INTO TABLE ods.region2 partition(etldate="20220622");
查看hive清洗后的数据
hive (default)> select * from ods.region2 where etldate="20220622";
OK
region2.regionkey region2.name region2.comment region2.etldate
3 EUROPE ly final courts cajole furiously final excuse 20220622
3 EUROPE ly final courts cajole furiously final excuse 20220622
4 MIDDLE EAST uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl 20220622
4 MIDDLE EAST uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl 20220622
0 AFRICA lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to 20220622
0 AFRICA lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to 20220622
1 AMERICA hs use ironic, even requests. s 20220622
1 AMERICA hs use ironic, even requests. s 20220622
2 ASIA ges. thinly even pinto beans ca 20220622
2 ASIA ges. thinly even pinto beans ca 20220622
Time taken: 0.194 seconds, Fetched: 10 row(s)
以上是关于HADOOP MapReduce 处理 Spark 抽取的 Hive 数据解决方案一的主要内容,如果未能解决你的问题,请参考以下文章