013- Kafka应用之Kafka与Spark Streaming整合

Posted BearData

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了013- Kafka应用之Kafka与Spark Streaming整合相关的知识,希望对你有一定的参考价值。

Spark Streaming与Kafka整合主要是用来处理流计算,根据设置的时间窗口大小,每隔一段时间批量处理一次。本篇我们将介绍Spark Streaming与Kafka的整合。

一,环境准备:

1. pom.xml

org.apache.spark

spark-core_$
{scala.version}

$
{spark.version}


org.apache.spark

spark-streaming_$
{scala.version}

$
{spark.version}


org.apache.spark

spark-streaming-kafka-0-10_2.11

$
{spark.version}

2. 配置kafka.properties:

b

ootstrap.servers=bootStrapServer

serializer.class=kafka.serializer.StringEncoder

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

enable.auto.commit=true

auto.commit.interval.ms=1000

group.id=beardata

3. 常量KafkaConfigConsts:

package com.colin.beardata.utils;

/**

* Created by zhangxiong on 18-3-29.

*/


public interface KafkaConfigConsts {

/**

* bootstrap.servers

*/


String BOOTSTRAP_SERVERS = "bootstrap.servers";

/**

* serializer.class

*/


String SERIALIZER_CLASS = "serializer.class";

/**

* value.deserializer

*/


String VALUE_DESERIALIZER = "value.deserializer";

/**

* key.deserializer

*/


String KEY_DESERIALIZER = "key.deserializer";

/**

* enable.auto.commit

*/


String ENABLE_AUTO_COMMIT = "enable.auto.commit";

/**

* auto.commit.interval.ms

*/


String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";

/**

* group.id

*/


String GROUP_ID = "group.id";

}

4. 读取配置的工具类KafkaPropertiesUtils:

package com.colin.beardata.utils;

import java.util.ResourceBundle;

/**

* Created by zhangxiong on 18-3-26.

*/


public class KafkaPropertiesUtils {

private static ResourceBundle bundle = ResourceBundle.getBundle("kafka");

/**

* 获取值

*

* @param key

* @return

*/


public static String getValue(String key) {

return bundle.getString(key);

}

}

5. 主类SparkKafkaApp:

package com.colin.beardata

import com.colin.beardata.utils.{KafkaConfigConsts, KafkaPropertiesUtils}

import org.apache.spark.{SparkConf, TaskContext}

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.{Duration, StreamingContext}

import org.slf4j.{Logger, LoggerFactory}

/**

* Created by zhangxiong on 18-2-23.

*/


object SparkKafkaApp {

val logger: Logger = LoggerFactory.getLogger(this.getClass.getName)

def main(args: Array[String]): Unit = {

if (args.length < 4) {

logger.error("Usage:kafka01:port,kafka02:port time_interval topic1,topic2 local/cluster")

sys.exit()

}

//参数

val Array(bootStrapServer, duration, topic, runType) = args

val sparkConf = new SparkConf().setAppName("SparkKafkaApp")

if (runType.equals("local")) {

sparkConf.setMaster("local[2]")

}

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val scc = new StreamingContext(sparkConf, Duration(duration.toInt))

// 因为使用到了updateStateByKey,所以必须要设置checkpoint

var topicSet = Set[String]()

val topics = topic.split(",")

topics.foreach(topic => {

topicSet += topic

})

//需要消费的kafka数据的topic

val kafkaParam = Map(

KafkaConfigConsts.BOOTSTRAP_SERVERS -> bootStrapServer, // kafka的broker list地址

KafkaConfigConsts.SERIALIZER_CLASS -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.SERIALIZER_CLASS),

KafkaConfigConsts.VALUE_DESERIALIZER -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.VALUE_DESERIALIZER),

KafkaConfigConsts.KEY_DESERIALIZER -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.KEY_DESERIALIZER),

KafkaConfigConsts.ENABLE_AUTO_COMMIT -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.ENABLE_AUTO_COMMIT),

KafkaConfigConsts.AUTO_COMMIT_INTERVAL_MS -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.AUTO_COMMIT_INTERVAL_MS),

KafkaConfigConsts.GROUP_ID -> KafkaPropertiesUtils.getValue(KafkaConfigConsts.GROUP_ID)

)

//创建streaming

val stream = createStream(scc, kafkaParam, topicSet)

stream.foreachRDD(rdd => {

if (!rdd.isEmpty) {

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

rdd.foreachPartition(partitionRecords => {

val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)

// 输出主题,分区,偏移量,下一个偏移量

println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")

try

partitionRecords.foreach(record => {

val key = record.key()

val value = record.value()

if (logger.isDebugEnabled)

logger.info("topic:" + topic + ",key:" + key + "  value:" + value)

})

catch {

case ex: Exception =>

logger.error(ex.getMessage)

}

// stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

})

}

})

scc.start() // 真正启动程序

scc.awaitTermination() //阻塞等待

}

/**

* 创建一个从kafka获取数据的流.

*

* @param scc        spark streaming上下文

* @param kafkaParam kafka相关配置

* @param topics     需要消费的topic集合

* @return

*/


def createStream(scc: StreamingContext, kafkaParam: Map[String, String], topics: Set[String]) = {

KafkaUtils.createDirectStream(

scc,

LocationStrategies.PreferConsistent,

Subscribe[String, String](topics, kafkaParam)

)

}

}

二, 测试

我们给主题beardata发送消息: hello,beardata

【013】- Kafka应用之Kafka与Spark Streaming整合

在控制台我们接收到了来自beardata的消息

以上就是Kafka与Spark Streaming的整合,下一篇我们将介绍Kafka技术内幕之日志存储。



长按二维码,关注BearData

以上是关于013- Kafka应用之Kafka与Spark Streaming整合的主要内容,如果未能解决你的问题,请参考以下文章

大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

正经文章Kafka与Spark Streaming的联姻

flume kafka spark读取日志延迟

spark streaming集成kafka

Spark Streaming Kafka 偏移量 Offset 管理

学习笔记Kafka—— Kafka 与Spark集成 —— 原理介绍与开发环境配置实战