Huid学习六:Structured Streaming实时写Hudi
Posted NC_NE
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Huid学习六:Structured Streaming实时写Hudi相关的知识,希望对你有一定的参考价值。
一、整体架构
二、创建Kafka生产者,模拟生产数据
1、创建hudi_kafka topic
bin/kafka-topics.sh --zookeeper 192.168.74.100:2181 --create --replication-factor 1 --partitions 3 --topic hudi_kafka
2、模拟生产数据
/**
* @author oyl
* @create 2022-06-18 17:35
* @Description 摸你生产kafka数据
*/
public class HudiProducer
public static void main(String[] args)
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop100:9092,hadoop101:9092,hadoop102:9092");
props.put("acks", "-1");
props.put("batch.size", "1048576");
props.put("linger.ms", "5");
props.put("compression.type", "snappy");
props.put("buffer.memory", "33554432");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> producer = new KafkaProducer<>(props);
Random random = new Random();
for (int i = 0; i < 1000; i++)
JSONObject model = new JSONObject();
model.put("userid", i);
model.put("username", "name" + i);
model.put("age", 18);
model.put("partition", "20210808");
System.out.println("第"+i+"条数据");
producer.send(new ProducerRecord<String, String>("hudi_kafka", model.toJSONString()));
producer.flush();
producer.close();
3、增加pom依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<!-- <scope>provided</scope>-->
<version>$spark.version</version>
</dependency>
4、查看kafka数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.74.100:9092 --topic hudi_kafka --from-beginning
三、StructuredStreaming消费kafka数据写入Hudi
1、代码
/**
* @author oyl
* @create 2022-06-18 18:24
* @Description 使用StructuredStreaming消费kafka数据写入Hudi
*/
object StructuredStreamingToHudi
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf()
.setAppName("StructuredStreamingToHudi")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[2]")
//.set("spark.sql.shuffle.partitions", "2")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
//读取kafka数据
val df = sparkSession.readStream.format("kafka")
.option("kafka.bootstrap.servers", "hadoop100:9092,hadoop101:9092,hadoop102:9092")
.option("subscribe", "hudi_kafka")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", "1000")
.load()
import sparkSession.implicits._
val tableName = "kafka_hudi_mor_hive"
val basePath = "/datas/hudi-warehouse/kafka_hudi_mor_hive"
val query = df.selectExpr("cast (value as string)").as[String]
.map(item =>
val jsonObj: JSONObject = JSON.parseObject(item)
val userid = jsonObj.getString("userid").toLong;
val username = jsonObj.getString("username")
val age = jsonObj.getString("age").toInt
val partition = jsonObj.getString("partition").toInt
val ts = System.currentTimeMillis()
new Model(userid, username, age, partition, ts)
).writeStream.foreachBatch (batchDF: Dataset[Model], batchid: Long) =>
batchDF.write.format("hudi")
.option(TABLE_TYPE.key(), MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 MERGE_ON_READ
.option(RECORDKEY_FIELD.key(), "userid") //设置主键
.option(PRECOMBINE_FIELD.key(), "ts") //数据更新时间戳的
.option(PARTITIONPATH_FIELD.key(), "partition") //hudi分区列
.option("hoodie.table.name", tableName) //hudi表名
.option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://hadoop100:10000") //hiveserver2地址
.option("hoodie.datasource.hive_sync.username","oyl") //登入hiveserver2的用户
.option("hoodie.datasource.hive_sync.password","123123") //登入hiveserver2的密码
.option("hoodie.datasource.hive_sync.database", "hudi_hive") //设置hudi与hive同步的数据库
.option("hoodie.datasource.hive_sync.table", tableName) //设置hudi与hive同步的表名
.option("hoodie.datasource.hive_sync.partition_fields", "partition") //hive表同步的分区列
.option("hoodie.datasource.hive_sync.partition_extractor_class", classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
.option("hoodie.datasource.hive_sync.enable","true") //设置数据集注册并同步到hive
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(SaveMode.Append)
.save(basePath)
.option("checkpointLocation", "/datas/checkpoint/kafka_hudi_mor_hive")
// .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
.start()
query.awaitTermination()
case class Model(
userid: Long,
username: String,
age: Int,
partition: Long,
ts: Long
)
2、查看数据
1)、hdfs文件,数据在不断增加,文件也在不断合并
20220609分区数据在不断合并
但是发现mor模式,分区20220609下没有log文件(不是很明白为什么),而分区20220608下有log文件
2)、hive数据查询
以上是关于Huid学习六:Structured Streaming实时写Hudi的主要内容,如果未能解决你的问题,请参考以下文章
神经结构化学习 1 框架概述 Neural Structured Learning Part 1: Framework overview