怎样利用Spark Streaming和Hadoop实现近实时的会话连接

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了怎样利用Spark Streaming和Hadoop实现近实时的会话连接相关的知识,希望对你有一定的参考价值。

参考技术A 科普Spark,Spark是什么,如何使用Spark

1.Spark基于什么算法的分布式计算(很简单)
2.Spark与MapReduce不同在什么地方
3.Spark为什么比Hadoop灵活
4.Spark局限是什么
5.什么情况下适合使用Spark

什么是Spark
Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。其架构如下图所示:

Spark与Hadoop的对比
Spark的中间数据放到内存中,对于迭代运算效率更高。
Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。
Spark比Hadoop更通用
Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。
这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。
不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
容错性
在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。
可用性
Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。
Spark与Hadoop的结合
Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。
Spark的适用场景
Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小(大数据库架构中这是是否考虑使用Spark的重要因素)
由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。总的来说Spark的适用面比较广泛且比较通用。
运行模式
本地模式
Standalone模式
Mesoes模式
yarn模式
Spark生态系统
Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。
Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。
Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。
End.

mac上搭建kafka并利用spark streaming实时消费

Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,是由Apache软件基金会开发的一个开源流处理]平台,由Scala和Java编写。

APP流行起来之后,企业就有了大量用户的行为数据,怎么有效的利用这些数据成了企业的重中之重。利用数据的第一步就是高效的采集数据,Kafka就是企业传输收集用户行为数据的常用系统。

程序员开发的算法都是部署在服务器上,要想在本地测试就得自己在本地搭建Kafka环境。通过模拟用户行为数据来营造真实环境,这样可以更方便地对代码进行调试。

mac上安装Kafka

采用mac下的安装利器homebrew,在终端输入brew install kafka即可,homebrew会自动安装kafka的依赖zookeeper。

使用brew安装后,kafka和zookeeper的配置文件路径如下,通常情况下我们也不需要做任何修改。

/usr/local/etc/kafka/server.properties
/usr/local/etc/kafka/zookeeper.properties

安装完之后,终端会显示启动提示消息:

使用下面两个命令快速启动zookeeper和kafka:

brew services start zookeeper
brew services start kafka

模拟真实环境生产Kafka消息

虽然可以在终端生产消息,但要生产真实环境下的json格式的数据确很麻烦,所以写了脚本自动模拟真实环境下的数据。

先在idea上增加mavern-kafka依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

下面的代码创建了两个topic,每200毫秒随机产生一些用户在电商APP上的行为数据。

import java.util.Properties
import org.codehaus.jettison.json.JSONObject
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import org.apache.log4j.{Level, Logger}
import scala.util.Random

object KafkaEventProducer {
  private val goodsid = Array("1100", "1975", "24724", "4542")
  private val userid = Array("20ad3455", "47c4ea08","F4727214", "J9FE7E52")
  private val siteid = Array("67800", "60902")
  private val eventkey = Array("open", "goods_view", "addtobag", "impression", "checkout")
  private val pagename = Array("categories", "search", "goodsdetail", "Home")
  private val sid = Array("xie", "chen", "long")

  private val goodsLength = goodsid.length
  private val userdLength = userid.length
  private  val siteidLength = siteid.length
  private val eventkeyLength = eventkey.length
  private  val pagenameLength = pagename.length
  private val sidLength = sid.length

  private val random = new Random()
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    val topic = Array("impression", "event")
    val brokers = "127.0.0.1:9092"
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val kafkaConfig = new ProducerConfig(props)
    val producer = new Producer[String, String](kafkaConfig)

    while(true) {
      val event = new JSONObject()
      event
        .put("userid", userid(random.nextInt(useridLength)))
        .put("eventtime", System.currentTimeMillis.toString)
        .put("siteid", siteid(random.nextInt(siteidLength)))
        .put("goodsid", goodsid(random.nextInt(goodsid.length)))
        .put("pagename", pagename(random.nextInt(pagenameLength)))
        .put("sid", sid(random.nextInt(sidLength)))
        .put("eventkey", eventkey(random.nextInt(eventkeyLength)))

      // produce event message
      producer.send(new KeyedMessage[String, String](topic(random.nextInt(2)), event.toString))
      println("Message sent: " + event)

      Thread.sleep(200)
    }
  }
}

在终端输入以下命令,接收topic-impression的消息.

kafka-console-consumer --bootstrap-server localhost:9092 --topic impression --from-beginning

spark streaming消费消息

Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。

下面的程序实现spark streaming每6秒打印一次从kafka收集到数据。

import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.broadcast.Broadcast

/**
  * @author XieChenlong
  * 流处理例子
  */
object streamingMab {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("StreamingMab").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Durations.seconds(GlobalConfig.KAFKA_INTERVAL_TIME))

    val KAFKA_TOPIC_NAME = Array("impression", "event")
    val KAFKA_BROKERS = "127.0.0.1:9092"
    val KAFKA_PARAMS = Map[String, Object](
        "bootstrap.servers" -> "127.0.0.1:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> “xieTest”,
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](KAFKA_TOPIC_NAME, GlobalConfig.KAFKA_PARAMS)
    )

    val eventDStream = stream.map(record => record.value)

    eventDStream.foreachRDD(
      rdd => if (!rdd.isEmpty()) {
        rdd.foreach {x =    >
            println(x)
        }
      }
    )
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

参考资料

  1. 不停游动的鱼, mac 本地安装kafka)

  2. lusecond,mac环境下使用brew安装kafka

  3. spark, Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

  4. wiki, kafka

  5. Andi Ai, Spark编程指南 - 简体中文版

以上是关于怎样利用Spark Streaming和Hadoop实现近实时的会话连接的主要内容,如果未能解决你的问题,请参考以下文章

mac上搭建kafka并利用spark streaming实时消费

[Spark]-Streaming-调优

实战Spark streaming与kafka

如何将 Spark Streaming 检查点位置存储到 S3 中?

大数据之Spark:Spark Streaming

Spark Streaming实时计算