013- Kafka应用之Kafka与Spark Streaming整合
Posted BearData
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了013- Kafka应用之Kafka与Spark Streaming整合相关的知识,希望对你有一定的参考价值。
Spark Streaming与Kafka整合主要是用来处理流计算,根据设置的时间窗口大小,每隔一段时间批量处理一次。本篇我们将介绍Spark Streaming与Kafka的整合。
一,环境准备:
1. pom.xml
org.apache.spark
spark-core_${scala.version}
${spark.version}
org.apache.spark
spark-streaming_${scala.version}
${spark.version}
org.apache.spark
spark-streaming-kafka-0-10_2.11
${spark.version}
2. 配置kafka.properties:
b
ootstrap.servers=bootStrapServer
serializer.class=kafka.serializer.StringEncoder
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
enable.auto.commit=true
auto.commit.interval.ms=1000
group.id=beardata
3. 常量KafkaConfigConsts:
package com.colin.beardata.utils;
/**
* Created by zhangxiong on 18-3-29.
*/
public interface KafkaConfigConsts {
/**
* bootstrap.servers
*/
String BOOTSTRAP_SERVERS = "bootstrap.servers";
/**
* serializer.class
*/
String SERIALIZER_CLASS = "serializer.class";
/**
* value.deserializer
*/
String VALUE_DESERIALIZER = "value.deserializer";
/**
* key.deserializer
*/
String KEY_DESERIALIZER = "key.deserializer";
/**
* enable.auto.commit
*/
String ENABLE_AUTO_COMMIT = "enable.auto.commit";
/**
* auto.commit.interval.ms
*/
String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
/**
* group.id
*/
String GROUP_ID = "group.id";
}
4. 读取配置的工具类KafkaPropertiesUtils:
package com.colin.beardata.utils;
import java.util.ResourceBundle;
/**
* Created by zhangxiong on 18-3-26.
*/
public class KafkaPropertiesUtils {
private static ResourceBundle bundle = ResourceBundle.getBundle("kafka");
/**
* 获取值
*
* @param key
* @return
*/
public static String getValue(String key) {
return bundle.getString(key);
}
}
5. 主类SparkKafkaApp:
package com.colin.beardata
import com.colin.beardata.utils.{KafkaConfigConsts, KafkaPropertiesUtils}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.slf4j.{Logger, LoggerFactory}
/**
* Created by zhangxiong on 18-2-23.
*/
object SparkKafkaApp {
val logger: Logger = LoggerFactory.getLogger(this.getClass.getName)
def main(args: Array[String]): Unit = {
if (args.length < 4) {
logger.error("Usage:kafka01:port,kafka02:port time_interval topic1,topic2 local/cluster")
sys.exit()
}
//参数
val Array(bootStrapServer, duration, topic, runType) = args
val sparkConf = new SparkConf().setAppName("SparkKafkaApp")
if (runType.equals("local")) {
sparkConf.setMaster("local[2]")
}
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val scc = new StreamingContext(sparkConf, Duration(duration.toInt))
// 因为使用到了updateStateByKey,所以必须要设置checkpoint
var topicSet = Set[String]()
val topics = topic.split(",")
topics.foreach(topic => {
topicSet += topic
})
//需要消费的kafka数据的topic
val kafkaParam = Map(
KafkaConfigConsts.BOOTSTRAP_SERVERS -> bootStrapServer, // kafka的broker list地址
KafkaConfigConsts.SERIALIZER_CLASS -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.SERIALIZER_CLASS),
KafkaConfigConsts.VALUE_DESERIALIZER -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.VALUE_DESERIALIZER),
KafkaConfigConsts.KEY_DESERIALIZER -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.KEY_DESERIALIZER),
KafkaConfigConsts.ENABLE_AUTO_COMMIT -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.ENABLE_AUTO_COMMIT),
KafkaConfigConsts.AUTO_COMMIT_INTERVAL_MS -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.AUTO_COMMIT_INTERVAL_MS),
KafkaConfigConsts.GROUP_ID -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.GROUP_ID)
)
//创建streaming
val stream = createStream(scc, kafkaParam, topicSet)
stream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(partitionRecords => {
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
// 输出主题,分区,偏移量,下一个偏移量
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
try
partitionRecords.foreach(record => {
val key = record.key()
val value = record.value()
if (logger.isDebugEnabled)
logger.info("topic:" + topic + ",key:" + key + " value:" + value)
})
catch {
case ex: Exception =>
logger.error(ex.getMessage)
}
// stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
}
})
scc.start() // 真正启动程序
scc.awaitTermination() //阻塞等待
}
/**
* 创建一个从kafka获取数据的流.
*
* @param scc spark streaming上下文
* @param kafkaParam kafka相关配置
* @param topics 需要消费的topic集合
* @return
*/
def createStream(scc: StreamingContext, kafkaParam: Map[String, String], topics: Set[String]) = {
KafkaUtils.createDirectStream(
scc,
LocationStrategies.PreferConsistent,
Subscribe[String, String](topics, kafkaParam)
)
}
}
二, 测试
我们给主题beardata发送消息: hello,beardata
在控制台我们接收到了来自beardata的消息
以上就是Kafka与Spark Streaming的整合,下一篇我们将介绍Kafka技术内幕之日志存储。
长按二维码,关注BearData
以上是关于013- Kafka应用之Kafka与Spark Streaming整合的主要内容,如果未能解决你的问题,请参考以下文章
大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式