学习笔记Kafka—— Kafka 与Spark集成 —— 原理介绍与开发环境配置实战

Posted 别呀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学习笔记Kafka—— Kafka 与Spark集成 —— 原理介绍与开发环境配置实战相关的知识,希望对你有一定的参考价值。

一、环境

1.1、Hadoop环境


1.2、Spark环境

1.3、Spark Streaming


1.4、Add Maven Dependencies & 开发流程

Add Scala Framework Support

添加依赖(在pom.xml添加)

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.3</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.3</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.4.3</version>
</dependency>

开发流程

二、实战

实战一:Kafka消息输出到Console|本地测试|集群测试

package demo01

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SecondKafkaWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("SecondKafkaWordCount")
    val ssc = new StreamingContext(conf,Seconds(3))

    ssc.sparkContext.setLogLevel("WARN")

    val kafkaProperties = Map("bootstrap.servers"->"node100:9092,node101:9092,node102:9092")

    val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaProperties,Set("test_02_02"))

    //transformation
    val result = data.map(_._2).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    //action
    result.print()
    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}


编译打包,并上传jar文件

执行TimerProducer.java

结果:

实战二:Kafka消息输出到HDFS|本地测试|集群测试

package demo01

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaWordCountHDFS {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077")
      .set("spark.executor.memory","512m")
      .setAppName("KafkaWordCountConsole")
    val ssc = new StreamingContext(conf,Seconds(3))

    ssc.sparkContext.setLogLevel("WARN")

    val kafkaProperties = Map("bootstrap.servers"->"node110:9092,node111:9092,node112:9092")

    val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaProperties,Set("test_02_02"))

    //transformation
    val result = data.map(_._2).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    //action
    result.saveAsTextFiles("/tmp/wordcount","txt")
    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}


编译打包,上传jar文件

spark提交

运行TimerProducer.java文件

结果:

以上是关于学习笔记Kafka—— Kafka 与Spark集成 —— 原理介绍与开发环境配置实战的主要内容,如果未能解决你的问题,请参考以下文章

学习笔记Kafka—— Kafka 与Spark集成 —— 原理介绍与开发环境配置实战

学习笔记Kafka—— Kafka 与Spark集成 —— 原理介绍与开发环境配置实战

Spark基础学习笔记31:Kafka分布式消息系统

Spark 学习笔记之 Streaming和Kafka Direct

Spark Streaming 基于 Direct API 优化与 Kafka 集成

Spark Streaming 基于 Direct API 优化与 Kafka 集成