Spark Streaming基于Spark Streaming&Flume&Kafka打造通用流处理平台
Posted 怒上王者
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming基于Spark Streaming&Flume&Kafka打造通用流处理平台相关的知识,希望对你有一定的参考价值。
通用流处理平台
项目流程图
具体操作步骤
------ log4j -> Flume ------
1、编写log4j.properties配置文件(log4j.properties)(Flumesource接收的hostname和port在log4j.properties文件中配置)
2、添加flume-log4jappender依赖(flume_log4j 依赖)
3、编写自动生成日志的java程序(LoggerData.java)
4、编写Flume配置文件并启动Flume(streaming.conf,flume启动命令)
------ Flume -> Kafka ------
5 、启动zookeeper
6 、启动Kafka
7 、查看Kafka的topic列表(查看Kafka的topics命令)
8、创建Kafka的topic(Kafka创建topic命令)
9 、编写Flume配置文件(streaming2.conf)
10、启动Flume
11、启动Kafka消费者
12、启动log4j日志生成程序(LoggerData.java)
------ Kafka -> Spark Streaming ------
13 、编写Spark Streaming程序(Receiver方式)
14 、运行Spark Streaming程序(KafkaStreamingApp.scala)(运行程序之前记得先运行kafka、flume,再运行log4j自动生成日志程序,最后运行SparkStreaming程序)
整合日志输出到Flume
1. pom.xml
<!--Flume整合log4j时要添加这个依赖-->
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.7.0</version>
</dependency>
不添加依赖会报错
log4j:ERROR Could not instantiate class [org.apache.flume.clients.log4jappender.Log4jAppender].
java.lang.ClassNotFoundException: org.apache.flume.clients.log4jappender.Log4jAppender
2. 结合log4j产生日志
import org.apache.log4j.Logger;
/**
* 模拟日志生成
*/
public class LoggerData
private static Logger logger = Logger.getLogger(LoggerData.class.getName());
public static void main(String[] args) throws Exception
int index = 0;
while (true)
Thread.sleep(1000);
logger.info("current value is : " + index++);
3. 编写Flume配置文件 streaming.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.118.151
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
4. Flume启动
flume-ng agent \\
--conf $FLUME_HOME/job \\
--conf-file $FLUME_HOME/job/streaming.conf \\
--name agent1 \\
-Dflume.root.logger=INFO,console
5. 配置log4j.properties
Flume source接收的hostname和port配置在log4j.properties文件中
# Set root category priority to INFO and its only appender to CONSOLE.
log4j.rootCategory=INFO, CONSOLE, flume
#log4j.rootCategory=INFO, CONSOLE, LOGFILE
# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.STDOUT.Target=System.out
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%dMM-dd HH:mm:ss.SSS [%-5p] %c - %m%n
# Flume
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop01
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
注意要在log4j.rootCategory 后加上flume,控制台上才会打印
6. 启动IDEA程序,查看日志接收情况
整合Flume到Kafka
1. 启动zookeeper
本次测试使用的是kafka自带的zookeeper,所以启动命令不能使用 zkServer.sh start
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties >./logs/zk.log &
2. 启动kafka
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
3.查看Kafka的topic列表
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper hadoop01:2181
4.创建一个新的topic
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic streamingtopic
5.编写Flume配置文件(streaming2.conf)
vim streaming2.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.118.151
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
agent1.channels.logger-channel.capacity = 1000
agent1.channels.logger-channel.transactionCapacity = 100
#define sink
agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.kafka.bootstrap.servers = 192.168.118.151:9092
agent1.sinks.kafka-sink.kafka.topic = streamingtopic
agent1.sinks.kafka-sink.kafka.producer.acks = 1
agent1.sinks.kafka-sink.kafka.flumeBatchSize = 10
agent1.sinks.kafka-sink.kafka.producer.linger.ms = 10000
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
flume1.7-Kafka Sink
flume1.6-Kafka Sink
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streamingtopic
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
6.启动Flume agent
flume-ng agent \\
--conf $FLUME_HOME/job \\
--conf-file $FLUME_HOME/job/streaming2.conf \\
--name agent1 \\
-Dflume.root.logger=INFO,console
7.启动kafka消费者
#0.8和0.9版本使用--zookeeper hadoop01:2181
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic streamingtopic
#新版本(0.9以后)使用--bootstrap-server hadoop01:9092
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic streamingtopic
8.启动IDEA程序 模拟日志生成
/在IDEA中启动,启动后查看Kafka消费者端是否有数据输出,数据分批次,在Flume配置文件中的batchSize可设置批次大小,批次设置位20表示Flume采集到20条数据后会把这20条数据一次性发送给Kafka
整合Kafka到Spark Streaming
编写Spark Streaming程序(Receiver方式)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.Seconds, StreamingContext
/**
* 整合Kafka到Spark Streaming
*/
object KafkaStreamingApp
def main(args: Array[String]): Unit =
if(args.length !=4)
System.err.println("Usage: <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
val Array(zkQuorum, group, topics,numThreads) = args
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val sparkConf = new SparkConf().setAppName("KafkaStreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))
// TODO...对接kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
messages.map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
参数 hadoop01:2181 test streamingtopic 1
总结
以上代码都是在本地测试的,在IDEA中运行LoggerData,生成日志数据,然后使用Flume、Kafka以及Spark Streaming进行整合操作
在生产上的流程:
1、将LoggerData打成jar包,在服务器上执行LoggerData这个类,生成数据
2、Flume、Kafka和测试中是一样的,启动Flume-ng 即可
3、Kafka 整合Spark Streaming的代码需要打成jar包,然后使用spark-submit的方式进行提交。
可以根据生产的实际情况选择相应的模式:local/yarn/Standalone/mesos
以上是关于Spark Streaming基于Spark Streaming&Flume&Kafka打造通用流处理平台的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming基于Spark Streaming&Flume&Kafka打造通用流处理平台
Spark Streaming基于Spark Streaming&Flume&Kafka打造通用流处理平台
Spark Streaming基于Spark Streaming&Flume&Kafka打造通用流处理平台
Spark Streaming实时流处理项目实战Spark Streaming整合Flume实战二