通过Spark Streaming处理交易数据

Posted weishao-lsv

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过Spark Streaming处理交易数据相关的知识,希望对你有一定的参考价值。

Apache Spark 是加州大学伯克利分校的 AMPLabs 开发的开源分布式轻量级通用计算框架。

由于 Spark 基于内存设计,使得它拥有比 Hadoop 更高的性能(极端情况下可以达到 100x),并且对多语言(Scala、Java、Python)提供支持。

其一栈式设计特点使得我们的学习和维护成本大大地减少,而且其提供了很好的容错解决方案

 

业务场景

我们每天都有来自全国各地的天然气购气数据,并根据用户的充气,退气,核销等实时计算分析的是用户订单数数据,由于数据量比较大,单台机器处理已经达到了瓶颈;综合业务场景分析,我们选用 Spark Streaming + Kafka+Flume+Hbase+kudu 来处理这些日志;又因为业务系统不统一,先通过Spark Streaming对数据进行清洗后再回写kafka集群,因为会有其他业务也需要kafka的数据;通过通过不同的程序对kafka数据进行消费,用户记录以多版本方式记录到hbase;需要经常统计的指标业务数据写入kudu

 

业务代码:

  创建DStream

val sparkConf = new SparkConf().setAppName("OrderSpark")

val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(10))

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerAddress,"group.id" -> groupId)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, Set(topic))

返回的messages 是一个 DStream,它是对 RDD 的封装,其上的很多操作都类似于 RDD;

createDirectStream 函数是 Spark 1.3.0 开始引入的,其内部实现是调用 Kafka 的低层次 API,Spark 本身维护 Kafka 偏移量等信息,所以可以保证数据零丢失

但是机器一旦宕机或者重启时,可能会存在重复消费;因此我们可以通过自己对offset进行checkpoint

 

  获取kafkaoffset

   val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    var offsetRanges = Array[OffsetRange]()
    kafkaStream.transform{ rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD(rdd=>{
      for(o <- offsetRanges) {
        println(s"@@@@@@ topic  ${o.topic}  partition ${o.partition}  fromoffset ${o.fromOffset}  untiloffset ${o.untilOffset} #######")
      }
}

为了能够在 Spark Streaming 程序挂掉后又能从断点处恢复,我们每个批次进行向zookeeper进行 Checkpoint;

这里我们没有采用spark自带的checkpoint,是因为一旦程序修改,之前序列化的checkpoint数据会冲突报错,

当然checkpoint到文件也会随之越大。(读者可以自己搜索spark 文件checkpoint的弊端)

 

  启动实时程序

    ssc.start()
    ssc.awaitTermination()

 

  因业务所需需要向kafka回写数据

  

rdd.foreachPartition(partition=>{
        val props = new Properties()
        props.put("bootstrap.servers",Constans.brokers)
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[String,String](props)
        partition.foreach(r=>{
          val record = new ProducerRecord[String, String](Constans.topic_kc, new Random().nextInt(3), "", msg)
      producer.send(record,new Callback() {
       override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
       if (null != e) {
       println("发送消息失败=>"+msg)
       }
      }
      })
  }) producer.close() })

 

监控

系统部署上线之后,我们无法保证系统 7x24 小时都正常运行,即使是在运行着,我们也无法保证 Job 不堆积、是否及时处理 Kafka 中的数据;
而且 Spark Streaming 系统本身就不很稳定。所以我们需要实时地监控系统,包括监控Kafka 集群、Spark Streaming 程序。
我们所有的监控都是CDH自带监控管理和Ganglia以及nagios,一旦检测到异常,系统会自己先重试是否可以自己恢复,如果不行,就会给我们发送报警邮件和打电话。

 

 

以上是关于通过Spark Streaming处理交易数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark-Spark Streaming例子整理

spark配置-----Spark Streaming

spark知识体系04-Spark Streaming

Spark(14)——spark streaming 监控方案

Spark Streaming 2.2.0 性能调优

Spark Streaming 2.2.0 性能调优