mac上搭建kafka并利用spark streaming实时消费
Posted 学习之术
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mac上搭建kafka并利用spark streaming实时消费相关的知识,希望对你有一定的参考价值。
Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,是由Apache软件基金会开发的一个开源流处理]平台,由Scala和Java编写。
APP流行起来之后,企业就有了大量用户的行为数据,怎么有效的利用这些数据成了企业的重中之重。利用数据的第一步就是高效的采集数据,Kafka就是企业传输收集用户行为数据的常用系统。
程序员开发的算法都是部署在服务器上,要想在本地测试就得自己在本地搭建Kafka环境。通过模拟用户行为数据来营造真实环境,这样可以更方便地对代码进行调试。
mac上安装Kafka
采用mac下的安装利器homebrew,在终端输入brew install kafka
即可,homebrew会自动安装kafka的依赖zookeeper。
使用brew安装后,kafka和zookeeper的配置文件路径如下,通常情况下我们也不需要做任何修改。
/usr/local/etc/kafka/server.properties
/usr/local/etc/kafka/zookeeper.properties
安装完之后,终端会显示启动提示消息:
使用下面两个命令快速启动zookeeper和kafka:
brew services start zookeeper
brew services start kafka
模拟真实环境生产Kafka消息
虽然可以在终端生产消息,但要生产真实环境下的json格式的数据确很麻烦,所以写了脚本自动模拟真实环境下的数据。
先在idea上增加mavern-kafka依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
下面的代码创建了两个topic,每200毫秒随机产生一些用户在电商APP上的行为数据。
import java.util.Properties
import org.codehaus.jettison.json.JSONObject
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import org.apache.log4j.{Level, Logger}
import scala.util.Random
object KafkaEventProducer {
private val goodsid = Array("1100", "1975", "24724", "4542")
private val userid = Array("20ad3455", "47c4ea08","F4727214", "J9FE7E52")
private val siteid = Array("67800", "60902")
private val eventkey = Array("open", "goods_view", "addtobag", "impression", "checkout")
private val pagename = Array("categories", "search", "goodsdetail", "Home")
private val sid = Array("xie", "chen", "long")
private val goodsLength = goodsid.length
private val userdLength = userid.length
private val siteidLength = siteid.length
private val eventkeyLength = eventkey.length
private val pagenameLength = pagename.length
private val sidLength = sid.length
private val random = new Random()
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val topic = Array("impression", "event")
val brokers = "127.0.0.1:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)
while(true) {
val event = new JSONObject()
event
.put("userid", userid(random.nextInt(useridLength)))
.put("eventtime", System.currentTimeMillis.toString)
.put("siteid", siteid(random.nextInt(siteidLength)))
.put("goodsid", goodsid(random.nextInt(goodsid.length)))
.put("pagename", pagename(random.nextInt(pagenameLength)))
.put("sid", sid(random.nextInt(sidLength)))
.put("eventkey", eventkey(random.nextInt(eventkeyLength)))
// produce event message
producer.send(new KeyedMessage[String, String](topic(random.nextInt(2)), event.toString))
println("Message sent: " + event)
Thread.sleep(200)
}
}
}
在终端输入以下命令,接收topic-impression的消息.
kafka-console-consumer --bootstrap-server localhost:9092 --topic impression --from-beginning
spark streaming消费消息
Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。
下面的程序实现spark streaming每6秒打印一次从kafka收集到数据。
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.broadcast.Broadcast
/**
* @author XieChenlong
* 流处理例子
*/
object streamingMab {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("StreamingMab").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Durations.seconds(GlobalConfig.KAFKA_INTERVAL_TIME))
val KAFKA_TOPIC_NAME = Array("impression", "event")
val KAFKA_BROKERS = "127.0.0.1:9092"
val KAFKA_PARAMS = Map[String, Object](
"bootstrap.servers" -> "127.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> “xieTest”,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](KAFKA_TOPIC_NAME, GlobalConfig.KAFKA_PARAMS)
)
val eventDStream = stream.map(record => record.value)
eventDStream.foreachRDD(
rdd => if (!rdd.isEmpty()) {
rdd.foreach {x = >
println(x)
}
}
)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
参考资料
不停游动的鱼, mac 本地安装kafka)
lusecond,mac环境下使用brew安装kafka
spark, Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
wiki, kafka
Andi Ai, Spark编程指南 - 简体中文版
以上是关于mac上搭建kafka并利用spark streaming实时消费的主要内容,如果未能解决你的问题,请参考以下文章
Kafka:ZK+Kafka+Spark Streaming集群环境搭建安装spark2.2.1
Kafka:ZK+Kafka+Spark Streaming集群环境搭建安装zookeeper-3.4.12
Kafka:ZK+Kafka+Spark Streaming集群环境搭建安装kafka_2.11-1.1.0
Kafka:ZK+Kafka+Spark Streaming集群环境搭建VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。