Huid学习六:Structured Streaming实时写Hudi

Posted NC_NE

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Huid学习六:Structured Streaming实时写Hudi相关的知识,希望对你有一定的参考价值。

一、整体架构

 二、创建Kafka生产者,模拟生产数据

  1、创建hudi_kafka topic

bin/kafka-topics.sh --zookeeper 192.168.74.100:2181 --create --replication-factor 1 --partitions 3 --topic hudi_kafka

  2、模拟生产数据

/**
 * @author oyl
 * @create 2022-06-18 17:35
 * @Description 摸你生产kafka数据
 */
public class HudiProducer 
    
    public static void main(String[] args) 

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop100:9092,hadoop101:9092,hadoop102:9092");
        props.put("acks", "-1");
        props.put("batch.size", "1048576");
        props.put("linger.ms", "5");
        props.put("compression.type", "snappy");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> producer = new KafkaProducer<>(props);

        Random random = new Random();
        for (int i = 0; i < 1000; i++) 
            JSONObject model = new JSONObject();
            model.put("userid", i);
            model.put("username", "name" + i);
            model.put("age", 18);
            model.put("partition", "20210808");
            System.out.println("第"+i+"条数据");
            producer.send(new ProducerRecord<String, String>("hudi_kafka", model.toJSONString()));
        
        producer.flush();
        producer.close();
    

  3、增加pom依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <!--            <scope>provided</scope>-->
    <version>$spark.version</version>
</dependency>

  4、查看kafka数据

bin/kafka-console-consumer.sh --bootstrap-server 192.168.74.100:9092 --topic hudi_kafka --from-beginning

 三、StructuredStreaming消费kafka数据写入Hudi

  1、代码

/**
  * @author oyl
  * @create 2022-06-18 18:24
  * @Description 使用StructuredStreaming消费kafka数据写入Hudi
  */
object StructuredStreamingToHudi 

  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf()
      .setAppName("StructuredStreamingToHudi")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local[2]")
      //.set("spark.sql.shuffle.partitions", "2")

    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    //读取kafka数据
    val df = sparkSession.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "hadoop100:9092,hadoop101:9092,hadoop102:9092")
      .option("subscribe", "hudi_kafka")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", "1000")
      .load()

    import sparkSession.implicits._

    val tableName = "kafka_hudi_mor_hive"
    val basePath = "/datas/hudi-warehouse/kafka_hudi_mor_hive"

    val query = df.selectExpr("cast (value as string)").as[String]
      .map(item => 
        val jsonObj: JSONObject = JSON.parseObject(item)
        val userid = jsonObj.getString("userid").toLong;
        val username = jsonObj.getString("username")
        val age = jsonObj.getString("age").toInt
        val partition = jsonObj.getString("partition").toInt
        val ts = System.currentTimeMillis()

        new Model(userid, username, age, partition, ts)
      ).writeStream.foreachBatch  (batchDF: Dataset[Model], batchid: Long) =>
      batchDF.write.format("hudi")
        .option(TABLE_TYPE.key(), MOR_TABLE_TYPE_OPT_VAL)   //选择表的类型 MERGE_ON_READ
        .option(RECORDKEY_FIELD.key(), "userid")            //设置主键
        .option(PRECOMBINE_FIELD.key(), "ts")               //数据更新时间戳的
        .option(PARTITIONPATH_FIELD.key(), "partition")     //hudi分区列
        .option("hoodie.table.name", tableName)             //hudi表名

        .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://hadoop100:10000") //hiveserver2地址
        .option("hoodie.datasource.hive_sync.username","oyl")                          //登入hiveserver2的用户
        .option("hoodie.datasource.hive_sync.password","123123")                       //登入hiveserver2的密码
        .option("hoodie.datasource.hive_sync.database", "hudi_hive")                   //设置hudi与hive同步的数据库
        .option("hoodie.datasource.hive_sync.table", tableName)                        //设置hudi与hive同步的表名
        .option("hoodie.datasource.hive_sync.partition_fields", "partition")               //hive表同步的分区列
        .option("hoodie.datasource.hive_sync.partition_extractor_class", classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
        .option("hoodie.datasource.hive_sync.enable","true")                           //设置数据集注册并同步到hive
        .option("hoodie.insert.shuffle.parallelism", "2")
        .option("hoodie.upsert.shuffle.parallelism", "2")
        .mode(SaveMode.Append)
        .save(basePath)
    .option("checkpointLocation", "/datas/checkpoint/kafka_hudi_mor_hive")
      //      .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
      .start()
    query.awaitTermination()

  
  case class Model(
                    userid: Long,
                    username: String,
                    age: Int,
                    partition: Long,
                    ts: Long
                    )

  2、查看数据

        1)、hdfs文件,数据在不断增加,文件也在不断合并

        20220609分区数据在不断合并

         但是发现mor模式,分区20220609下没有log文件(不是很明白为什么),而分区20220608下有log文件

         2)、hive数据查询

以上是关于Huid学习六:Structured Streaming实时写Hudi的主要内容,如果未能解决你的问题,请参考以下文章

Huid学习七:Hudi与Flink集成

Spark Structured Streaming

Spark Structured Streaming

神经结构化学习 1 框架概述 Neural Structured Learning Part 1: Framework overview

HUID 5558 Alice's Classified Message 后缀数组+单调栈+二分

学习Spark2.0中的Structured Streaming