Structured Streaming 介绍(一)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Structured Streaming 介绍(一)相关的知识,希望对你有一定的参考价值。

参考技术A Spark2.2.0 在7月12号发布,这个版本的Structured Streaming 抛掉了试验的标签,可以正式在生产环境使用。
Structured Streaming 是基于Spark SQL 引擎的流式计算引擎,将流式计算应用于DataFrame.随着数据不断地到达,Spark SQL引擎会以一种增量的方式来执行这些操作,并且持续更新计算结果。其基本概念就是将输入数据流作为
“Input Table”,每次新收到的数据会成为该表新的一行。

每次针对数据的查询都会生成一个“Result Table”。每一次的
触发间隔(比如说1s),Input Table 新增的一行,最终都会在Result Table 进行更新。当result table 更新的
时候,我们可能会将改变的数据写入外部存储。

File source - 以文件流的形式读取目录中写入的文件。 支持的文件格式为text,csv,json,parquet。 有关更多最新列表,可以看下DataStreamReader界面的文档,并支持各种文件格式的选项。 请注意,文件必须是被移动到目录中的,比如用mv命令。
kafka source - 从kafka poll 数据,兼容 kafka broker 0.10.0 或更高版本。更多详情看
Kafka Integration Guide

Socket source (for testing )从socket 连接中读取 UTF8 数据,仅用于测试,不提供容错保证。

某些数据源是不支持容错的,因为它们不能保证在故障之后可以通过checkedpoint offsets 来重新消费数据。

Complete Mode - 更新后的整个Result Table将被写入外部存储。 由外部存储决定如何处理整个表的写入。
Append Mode - 在Result Table中,只有自上次触发后新增到result table中的数据将被写入外部存储。 这仅适用于不期望更改结果表中现有行的查询,也就是说,我们确定,result table中已有的数据是肯定不会被改变的,才使用这种模式。
Update Mode - 只有自上次触发以后在Result Table中更新的数据(包括新增的和修改的)将被写入外部存储(可用于Spark 2.1.1)。 这与完全模式不同,因为此模式仅输出自上次触发以来更改的行。 如果查询不包含聚合,它将等同于Append Mode。

lines 为DataFrame是input table,这个表包含了一个名为"value"的列,现在还没有开始收到任何数据,因为我们只是做了transformation操作。 接下来,我们使用.as [String]将DataFrame转换为String数据集,通过flatMap操作将每一行分割成多个单词。 最后,我们通过分组操作生成wordCounts DataFrame。通过start()方法开启流计算。

流数据生成的DataFrame经查询生成wordCounts与静态DataFrame完全相同。 但是,当该查询启动时,Spark将连续检查套接字连接中的新数据。 如果有新数据,Spark将运行一个“增量”查询,将以前的运行计数与新数据相结合,以计算更新的计数,如下所示。

数据湖(十六):Structured Streaming实时写入Iceberg

文章目录

Structured Streaming实时写入Iceberg

一、创建Kafka topic

二、编写向Kafka生产数据代码

三、编写Structured Streaming读取Kafka数据实时写入Iceberg

四、查看Iceberg中数据结果


​​​​​​​Structured Streaming实时写入Iceberg

目前Spark中Structured Streaming只支持实时向Iceberg中写入数据,不支持实时从Iceberg中读取数据,下面案例我们将使用Structured Streaming从Kafka中实时读取数据,然后将结果实时写入到Iceberg中。

一、创建Kafka topic

启动Kafka集群,创建“kafka-iceberg-topic”

[root@node1 bin]# ./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic kafka-iceberg-topic  --partitions 3 --replication-factor 3

二、编写向Kafka生产数据代码

/**
  * 向Kafka中写入数据
  */
object WriteDataToKafka 
  def main(args: Array[String]): Unit = 
    val props = new Properties()
    props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](props)
    var counter = 0
    var keyFlag = 0
    while(true)
      counter +=1
      keyFlag +=1
      val content: String = userlogs()
      producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", content))
      //producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", s"key-$keyFlag", content))
      if(0 == counter%100)
        counter = 0
        Thread.sleep(5000)
      
    
    producer.close()
  

  def userlogs()=
    val userLogBuffer = new StringBuffer("")
    val timestamp = new Date().getTime();
    var userID = 0L
    var pageID = 0L

    //随机生成的用户ID
    userID = Random.nextInt(2000)

    //随机生成的页面ID
    pageID =  Random.nextInt(2000);

    //随机生成Channel
    val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")
    val channel = channelNames(Random.nextInt(10))

    val actionNames = Array[String]("View", "Register")
    //随机生成action行为
    val action = actionNames(Random.nextInt(2))

    val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
    userLogBuffer.append(dateToday)
      .append("\\t")
      .append(timestamp)
      .append("\\t")
      .append(userID)
      .append("\\t")
      .append(pageID)
      .append("\\t")
      .append(channel)
      .append("\\t")
      .append(action)
    System.out.println(userLogBuffer.toString())
    userLogBuffer.toString()
  

三、编写Structured Streaming读取Kafka数据实时写入Iceberg

object StructuredStreamingSinkIceberg 
  def main(args: Array[String]): Unit = 
    //1.准备对象
    val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
      //指定hadoop catalog,catalog名称为hadoop_prod
      .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
      .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")
      .getOrCreate()
//    spark.sparkContext.setLogLevel("Error")

    //2.创建Iceberg 表
    spark.sql(
      """
        |create table if not exists hadoop_prod.iceberg_db.iceberg_table (
        | current_day string,
        | user_id string,
        | page_id string,
        | channel string,
        | action string
        |) using iceberg
      """.stripMargin)

    val checkpointPath = "hdfs://mycluster/iceberg_table_checkpoint"
    val bootstrapServers = "node1:9092,node2:9092,node3:9092"
    //多个topic 逗号分开
    val topic = "kafka-iceberg-topic"

    //3.读取Kafka读取数据
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("auto.offset.reset", "latest")
      .option("group.id", "iceberg-kafka")
      .option("subscribe", topic)
      .load()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)].toDF("id", "data")

    val transDF: DataFrame = resDF.withColumn("current_day", split(col("data"), "\\t")(0))
      .withColumn("ts", split(col("data"), "\\t")(1))
      .withColumn("user_id", split(col("data"), "\\t")(2))
      .withColumn("page_id", split(col("data"), "\\t")(3))
      .withColumn("channel", split(col("data"), "\\t")(4))
      .withColumn("action", split(col("data"), "\\t")(5))
      .select("current_day", "user_id", "page_id", "channel", "action")

    //结果打印到控制台,Default trigger (runs micro-batch as soon as it can)
//    val query: StreamingQuery = transDF.writeStream
//      .outputMode("append")
//      .format("console")
//      .start()

    //4.流式写入Iceberg表
    val query = transDF.writeStream
      .format("iceberg")
      .outputMode("append")
      //每分钟触发一次Trigger.ProcessingTime(1, TimeUnit.MINUTES)
      //每10s 触发一次 Trigger.ProcessingTime(1, TimeUnit.MINUTES)
      .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
      .option("path", "hadoop_prod.iceberg_db.iceberg_table")
      .option("fanout-enabled", "true")
      .option("checkpointLocation", checkpointPath)
      .start()

    query.awaitTermination()

  

注意:以上代码执行时由于使用的Spark版本为3.1.2,其依赖的Hadoop版本为Hadoop3.2版本,所以需要在本地Window中配置Hadoop3.1.2的环境变量以及将对应的hadoop.dll放入window "C:\\Windows\\System32"路径下。

Structuerd Streaming向Iceberg实时写入数据有以下几个注意点:

  • 写Iceberg表写出数据支持两种模式:append和complete,append是将每个微批数据行追加到表中。complete是替换每个微批数据内容。
  • 向Iceberg中写出数据时指定的path可以是HDFS路径,可以是Iceberg表名,如果是表名,要预先创建好Iceberg表。
  • 写出参数fanout-enabled指的是如果Iceberg写出的表是分区表,在向表中写数据之前要求Spark每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为true,可以针对每个Spark分区打开一个文件,直到当前task批次数据写完,这个文件再关闭。
  • 实时向Iceberg表中写数据时,建议trigger设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”(参照1.9.6.9)和删除旧的快照(1.9.6.10)。

 

四、查看Iceberg中数据结果

启动向Kafka生产数据代码,启动向Iceberg中写入数据的Structured Streaming程序,执行以下代码来查看对应的Iceberg结果:

//1.准备对象
val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
  //指定hadoop catalog,catalog名称为hadoop_prod
  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")
  .getOrCreate()

//2.读取Iceberg 表中的数据结果
spark.sql(
  """
    |select * from hadoop_prod.iceberg_db.iceberg_table
  """.stripMargin).show()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

以上是关于Structured Streaming 介绍(一)的主要内容,如果未能解决你的问题,请参考以下文章

Structured Streaming教程 —— 基本概念与使用

大数据Spark Structured Streaming

Structured Streaming教程 —— 常用输入与输出

Structured Streaming系列-4集成 Kafka

Structured Streaming编程模型

Structured Streaming曲折发展史