大数据(8p)SparkStreaming精准一次消费Kafka

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(8p)SparkStreaming精准一次消费Kafka相关的知识,希望对你有一定的参考价值。

1、数据容错语义

encn说明
at most once数据最多一条数据可能会丢,但不会重复
at least one数据至少一条数据绝不会丢,但可能重复
exactly once数据有且只有一条数据不会丢,也不会重复

2、SparkStreaming消费Kafka

2.1、Scala代码,设置自动提交消费者偏移量

如下代码所示:enable.auto.commit设置为true
自动提交消费者偏移量 会有 数据丢失的风险(可以自行模拟出错)

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建StreamingContext对象
    val c3: SparkConf = new SparkConf().setAppName("a3").setMaster("local[*]")
      // 下面这个设置用来解决序列化异常
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val ssc = new StreamingContext(c3, Seconds(9))
    // Kafka消费者配置
    val kafkaConsumerConfig: Map[String, Object] = Map[String, Object](
      // Kafka服务器
      "bootstrap.servers" -> "hadoop100:9092,hadoop101:9092,hadoop102:9092",
      // 反序列化类:字符串序列化器
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      // 消费者组
      "group.id" -> "g0",
      // 从最新处开始消费
      "auto.offset.reset" -> "latest",
      // 自动提交消费者偏移量
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    // 创建消费Kafka的Dstream
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](
        ssc = ssc,
        // locationStrategy(位置策略):在Executor上,如何来调度 给定的主题分区的消费者
        // PreferConsistent:一致地在所有Executors中分布分区
        locationStrategy = PreferConsistent,
        // consumerStrategy(消费策略)
        consumerStrategy = Subscribe[String, String](
          // Subscribe(订阅)参数:1、订阅的主题;2、Kafka消费者配置
          topics = Set("t0"), kafkaConsumerConfig)
      )
    // SparkStreaming输出:打印
    kafkaDStream.print()
    // 启动任务和等待终止
    ssc.start()
    ssc.awaitTermination()
  }
}

2.2、创建主题并生产数据进行测试

kafka-topics.sh \\
--zookeeper hadoop100:2181/kafka \\
--create \\
--replication-factor 1 \\
--partitions 1 \\
--topic t0
kafka-producer-perf-test.sh --topic t0 \\
--record-size 1024 \\
--producer-props bootstrap.servers=hadoop100:9092 \\
--throughput -1 \\
--num-records 400

3、 消费者偏移量的存储

为了规避数据丢失的风险,我们要关闭自动提交偏移量(enable.auto.commit设为false),并且 把消费者偏移量存储起来,下面介绍两种主流方法:

3.1、存Kafka的主题

  • Kafka带有提交偏移量的API,可将偏移量存储在一个特殊的Kafka主题中。
    但这个容错语义是at least one,且Kafka不是事务性的,故障出现后会数据可能会重复。
    如要实现at least one,要求SparkStreaming输出形式是幂等的。
  • 幂等(idempotent、idempotence):
    在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
  • 幂等函数:可以使用相同参数重复执行,并能获得相同结果的函数。
    例如setTrue()就是一个幂等函数。
  • SparkStreaming有哪些输出是幂等的?
    println不是幂等性的,因为我们看到了重复打印
    数据按Key写Redis是幂等性的,重复写结果相同
    数据按RowKey写HBase是幂等性的,重复写结果相同
ds.foreachRDD { rdd =>
  // 1、获取偏移量(如果有)
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 2、处理数据并输出结果

  // 3、输出结果成功后,异步提交偏移量
  ds.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{
  CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}

object Hello {
  def main(args: Array[String]): Unit = {
    val streamingContext = new StreamingContext("local[2]", "a0", Seconds(9))
    // Kafka消费者的配置
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop100:9092,hadoop101:9092,hadoop102:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g0",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> "false"
    )
    // 创建消费Kafka的Dstream
    val ds0: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](
        streamingContext,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Array("t0"), kafkaParams)
      )
    // 数据处理(此处不处理)
    val ds1 = ds0
    // SparkStreaming输出及提交偏移量
    ds1.foreachRDD { rdd =>
      // 1、获取Kafka消费者偏移量(如果有)
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      // 2、输出结果(此处为打印,别的输出方式有:写Redis、写HBase…)
      rdd.foreach(println)
      // 3、手动提交offset
      ds1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }
    // 启动任务和等待终止
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

3.2、存数据库

例如,要将SparkStreaming结果输出到mysql,那么可以把偏移量也存储在MySQL,并利用MySQL的事务来确保exactly once,伪代码如下:

// 根据主题和分区从数据库获取消费者偏移量
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  // 1、获取偏移量(如果有)
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 2、计算结果
  val results = yourCalculation(rdd)

  // 3.1、开始事务

  // 3.2、更新输出结果
  // 3.3、更新消费者偏移量

  // 3.4、结束事务
}

4、参考文献

以上是关于大数据(8p)SparkStreaming精准一次消费Kafka的主要内容,如果未能解决你的问题,请参考以下文章

大数据-SparkStreaming

大数据笔记(三十一)——SparkStreaming详细介绍

大数据-spark理论sparkSql,sparkStreaming,spark调优

第一次作业——大数据

戴琨受访《中国经济周刊》 分享如何用大数据精准挖掘二手车残值

大数据入门:Spark Streaming实际应用