sparkStreaming
Posted 漠小浅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparkStreaming相关的知识,希望对你有一定的参考价值。
速查手册:
读取socket端数据
JavaReceiverInputDStream lines = jssc.socketTextStream("master", 9999);
监控HDFS文件系统(只能检测文件夹下面新增的文件,对文件内容的追加和删除没有作用)
JavaDStream lines = jssc.textFileStream("hdfs://Master:9000/sparkStreaming/data");//监控新增的文件。
flume推送本地文件到HDFS配置参数:
#agent1表示代理名称
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
#Spooling Directory是监控指定文件夹中新文件的变化,一旦新文件出现,就解析该文件内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。
#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/flume_data
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
#配置sink1
agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=hdfs://hadoop0:9000/flume
agent1.sinks.sink1.hdfs.fileType=DataStream
agent1.sinks.sink1.hdfs.writeFormat=TEXT
agent1.sinks.sink1.hdfs.rollInterval=1
agent1.sinks.sink1.channel=channel1
agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d
#配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/root/hmbbs_tmp/123
agent1.channels.channel1.dataDirs=/root/hmbbs_tmp/
执行命令bin/flume-ng agent -n agent1 -c conf -f conf/example -Dflume.root.logger=DEBUG,console
flume推送(push)数据到sparkStreaming
#agent1表示代理名称
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
#Spooling Directory是监控指定文件夹中新文件的变化,一旦新文件出现,就解析该文件内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。
#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/flume_data
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
#配置sink1
#agent1.sinks.sink1.type=hdfs
#agent1.sinks.sink1.hdfs.path=hdfs://hadoop0:9000/flume
#agent1.sinks.sink1.hdfs.fileType=DataStream
#agent1.sinks.sink1.hdfs.writeFormat=TEXT
#agent1.sinks.sink1.hdfs.rollInterval=1
#agent1.sinks.sink1.channel=channel1
#agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d
agent1.sinks.sink1.type=avro
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.hostname=master
agent1.sinks.sink1.port=9999
#配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/root/hmbbs_tmp/123
agent1.channels.channel1.dataDirs=/root/hmbbs_tmp/
添加依赖:
<dependency>
<groupId>org.apache.spark</group>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>1.6.0</version>
</dependency>
JavaReceiverInputDStream lines =FlumeUtils.createStream(jssc,"master",9999)
JavaDStream words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>()
@Override
public Iterable<String> call(SparkFlumeEvent event) throws Exception
//一个文件就相当于一个event
String line = new String(event.event().getBody().array());
return Arrays.asList((line.split(" ")));
);
sparkStreaming拉取(poll)数据从flume
agent1.sinks.sink1.type=org.apache.spark.streamingflume.sink.SparkSink
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.hostname=master
agent1.sinks.sink1.port=9999
下载jar包到flume的lib下面:http://spark.apache.org/docs/latest/streaming-flume-integration.html
JavaReceiverInputDStream lines = FlumeUtils.createPollingStream(jssc,"master",9999);
sparkStreamingOnKafkaReceiver
小节:这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层Consumer API来实现的。reciver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。 然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启动高可可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL).该机制会同步地接受到Kafka数据 写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
参数1是streamingContext实例;
参数2是ZooKeeper集群信息(接受Kafka数据的时候会从Zookeeper中获取offerset等元数据信息)
参数3是Cousumer Group
参数4是消费的Topic以及并发读取Topic中Partition的线程数
为什么需要参数二呢?因为spark Streaming把offset的信息存放在了zookeeper中,所以需要,那为什么需要第四个参数呢?首先topic是必须的,他的integer是什么呢?因为spark Streaming可以让用户自己设置receiver读取kafk数据分片的并行度,但是
,需要注意的是,它设置的是receiver并行读取数据的线程,并不是设置的是RDD的partition的数量,RDD的partition的数量和Kafka的数据分片的数量没有任何关系。
Map<String,Integer> topicConsumerConcurrency = new HashMap<String,Integer>();
topicConsumerConcurrency .put("helloKafkaFromSparkStreaming",2)
JavaPairReceiverInputDstream<String,String> lines = KafkaUtils.createStream(jsc,"master:2181,slave1:2181,slave2:2181","MyFirstConsumerGroup",topicConsumerConcurrency );
lines.flatMap(new FlatMapFunction<String,String>,String()
public Iterable<String>call(Tuple2<String,String>tuple) throws Exception
return Array.asList(tuple._2.split(" "));
)
启动kafka(nuhup后台)
nohup ./kafka-server-start.sh ../config/server.properties &
创建topic
./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partition 1 --topic helloKafkaFromSparkStreaming
创建producer
./kafka-console-producer.sh --broker-list master:9092,slave9092,slave2:9092 --topic helloKafkaFromSparkStreaming
创建consumer
bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic helloKafkaFromSparkStreaming --from-begining
sparkStreaming on Kafka DIrect
小节:这种新的不基于Receiver的直接方式,替代掉了Receiver来接受数据,这种方式会周期性的查询Kafka,来获取每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单cousumer api来获取Kafka指定的offset范围的数据。 优点: 1,简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD Partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。 2:高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会绘制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中做了数据的复制,那么就可以通过Kafka的副本进行恢复。 3,一次且仅一次的事务机制 基于receiver的方式,是使用Kafka的高阶API来在Zookeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据领丢失的高可靠性,但是却无法保证数据被处理一次仅且一次,可能会处理两次。因为spark和Zookeeper之间可能是不同步的。 基于direct的方式,使用Kafka的简单api,Spark Streaming自己就负责追踪消费的offset并保存在checkpoint中。spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
HashMap<String,String> KafkaParams = new HashMap<String,String>();
KafkaParams.put("metadata.borker.list","master:9092,slave1:9092,slave2:9092");
HashSet<String> topics = new HashSet<String>();
topics.add("topics");
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc,String.class,String.class, StringDecoder.class,StringDecoder.class,KafkaParams,topics);
这里需要Kafka的metadata.borker.list,不需要Zookeeper的相关信息了,愿意就是spark streaming自己保存offset,直接从Kafka中读取offset,所以需要kafks的broker列表。
topic也不需要设置receiver读取kafka数据的并行进程数,因为spark会创建跟Kafka partition一样多的RDD partition,并且会并行地从kafka中读取数据。kafka partiton和RDD partition之间是一一对应的。
scala版本:KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet);
updateStateByKey
jsc.checkpoing(" ");//必须指定
JavaPairDStream<String,Integer> wordsCount = pairs.updataStateBykey(newFuction2<List<Integer>,Optional<Integer>,Option<Integer>,Optional<Integer>>()
public Option<Integer> call(List<Integer>values,Option<Ingeger>state)throws Exception
Integer updatedValue = 0;
if(state.isPresent()
updatedValue = state.get();
for(Integer value:values)
updatedValues += value;
return Optional.of(updatedValue);
)
transform 黑名单过滤
val blackList = Array(("hadoop",true),("mahout",true))
val blackListRDD = sc.sparkContext.parallelize(blackList,8)
val adsClickStream = sss.socketTextStream("master",9999)
//adf格式:time name
//map结果:name,(time,name)
val adsClickStreamFormatted = adsClickStream.mapads => (ads.split(" ")(1),ads)
adsClickStreamFormatted.trransform(userClickRDD =>
val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
val validClicked = joinedBlackListRDD .filter(joinedItem =>
if(joinedItem._2._2.getOrElse(false))
false
else
true
)
validClicked .map(validClicked => validClick._2._1
reduceByKeyAndWindow 在线热搜词
注意:窗口和滑动时间间隔一定是Batch Interval的整数倍
//数据格式 user item
val hottestStream= ssc.socketTextStream("master",9999);
val searchPair = hottestStream.map(_.split(" ")(1)).map(item => (item,1)
//val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1+v2,Seconds(60),Seconds(20))
val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1+v2,(v1:Int,v2:int) => v1-v2,Seconds(60),Seconds(20))
hottestDStream.transform(hottestItemRDD =>
val top3 = hottestItemRDD.map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).take(3)
for(item <- top3)
println(item)
hottestItemRDD //这个无所谓,因为transform需要返回一个RDD
).print()
foreachRDD 写入数据到外部数据源
val wordCounts = words.map(x=>(x,1)).reduceByKey(_+_)
wordCounts.foreachRDD rdd =>
rdd.foreachPartition partitionOfRecords =>
val connection = ConnectionPool.getConnection();
partitionOfRecords.foreach(record =>
val sql = "insert into streaming_itemcount(item,count) values(' "+ record._1+" ' ,"+record._2 +")"
val stmt = connection.createStatement();
stmt.executeUpdate(sql);
)
ConnectionPool.returnConnection(connection)
连接池简单实现:
public class ConnectionPool
private static LinkedList<Connection> connectionQueue;
static
try
Class.forName("com.mysql.jdbc.Driver");
catch (ClassNotFoundException e)
e.printStackTrace();
public synchronized static Connection getConnection()
try
if(connectionQueue == null)
connectionQueue = new LinkedList<Connection>();
for(int i = 0; i < 5; i++)
Connection conn = DriverManager.getConnection(
"jdbc:mysql://master:3306/sparkstreaming",
"root",
"root");
connectionQueue.push(conn);
catch(Exception e)
e.printStackTrace();
return connectionQueue.poll();
public static void returnConnection(Connection conn) connectionQueue.push(conn);
sparkStreaming And SQL
使用sparkStreaming SQL来在线动态计算电商中不同类别中最热门的商品排名。
//数据格式: user item category
val userClickLogsDStream = ssc.socketTextStream("master",9999);
val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
(clickLog.split(" ")(2)+"_"+clickLog.split(" ")(1),1))
val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,_-_,Seconds(60),Second(20))
categoryUserClickLogsDStream.foreachRDD rdd =>
val categoryItemRow = rdd.map(reducedItem =>
val category = reducedItem._1.split("_")(0)
val item= reducedItem._1.split("_")(1)
val click_count = reducedItem._2
Row(category,item,click_count )
)
val structType = StructType(Array(
StructField("category",StringType,true)
StructField("item",StringType,true)
StructField("click_count ",IntegerType,true)
))
val hiveContext = new hiveContext(rdd.sparkContext)
categoryItemDF = hiveContext.createDataFrame(categoryItemRow,structType)
categoryItemDF.registerTempTable("categoryItemTable")
val resultDataFram = hiveContext.sql("select category,item,click_count from (select category,item,click_count,row_number()"+
" over (partition by category order by click_count desc ) rank from categoryItemTable) subquery " +
" where rank <= 3" )
val resultRowRDD = resultDataFram.rdd
resultRowRDD .foreachRDD rdd =>
rdd.foreachPartition partitionOfRecords =>
if(partitionOfRecords.isEmpty)
else
val connection = ConnectionPool.getConnection();
partitionOfRecords.foreach(record =>
val sql = "insert into categorytop3(category,item,click_count) values(' "+ record.getAs("category")+" ' ,' "+record.getAs("item")+" '," + record.getAs("click_count") + ")"
val stmt = connection.createStatement();
stmt.executeUpdate(sql);
)
ConnectionPool.returnConnection(connection)
Kafka Producer
/** * 论坛数据自动生成代码,数据格式如下: * date:日期,格式为yyyy-MM-dd * timestamp:时间戳 * userID:用户ID * pageID:页面ID * channel:板块ID * action:点击和注册 */ public class SparkStreamingDataManuallyProducerforKafka extends Thread //论坛板块 static String[] channelNames = new String[] "spark","scala","kafka","Flink","hadoop","Storm", "Hive","Impala","Hbase","ML" ; static String[] actionNames = new String[]"View","Register"; private String topic;//发送给Kafak的数据的类别 private Producer<Integer,String> producerForKaka; private static String dataToday; private static Random random; public SparkStreamingDataManuallyProducerforKafka(String topic) dataToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); this.topic = topic; random = new Random(); Properties conf = new Properties(); conf.put("metadata.broker.list", "master:9092,slave1:9092,slave2:9092"); conf.put("serializer.class","kafka.serializer.StringEncoder"); producerForKaka = new Producer<Integer,String>(new ProducerConfig(conf)); @Override public void run() int counter = 0; while(true) counter++; String userLog = userLogs(); System.out.println("product:" + userLog); producerForKaka.send(new KeyedMessage<Integer, String>(topic,userLog)); if(500 == counter) counter = 0; try Thread.sleep(1000); catch (InterruptedException e) e.printStackTrace(); public static void main(String[] args) new SparkStreamingDataManuallyProducerforKafka("UserLogs").start(); private static String userLogs() StringBuffer userLogBuffer = new StringBuffer(""); long timestamp = new Date().getTime(); long userID = 0L; long pageID = 0L; //随机生成的用户ID userID = random.nextInt((int) 2000); //随机生成的页面ID pageID = random.nextInt((int) 2000); //随机生成Chan String channel = channelNames[random.nextInt(10)]; //随机生成action行为 String action = actionNames[random.nextInt(2)]; userLogBuffer.append(dataToday) .append("\\t") .append(timestamp) .append("\\t") .append(userID) .append("\\t") .append(pageID) .append("\\t") .append(channel) .append("\\t") .append(action); return userLogBuffer.toString();
sparkStreaming PV
HashMap<String,String> KafkaParams = new HashMap<String,String>();
KafkaParams.put("metadata.borker.list","master:9092,slave1:9092,slave2:9092");
HashSet<String> topics = new HashSet<String>();
topics.add("topics");
JavaPairInputDStream<String, String> lines= KafkaUtils.createDirectStream(jssc,String.class,String.class, StringDecoder.class,StringDecoder.class,KafkaParams,topics);
JavaPairInputDStream<String, String> logDStream = lines.filter(new Function<Tuple2<String,String,>,Boolean> ()
public Boolean call(Tuple2<String,String> v1) throws Exception
String[] logs = v1._2.split("\\t");
String action = logs[5];
if("View".equals(action))
return true;
else
return false;
);
JavaPairInputDStream<Long,Long> pairs = logDStream.mapToPair(new PairFunction<String,String>,Long,Long>()
public Tuple2<Long,Long> call(Tuple2<String,String,t) throws Exception
String[] logs = t._2.split("\\t");
Long pageId = Long.valueOf(logs[3]);
return new Tuple2<Long,Long>(pageId,1L);
);
JavaPairDSteam<Long,Long> wordsCount = pairs.reduceByKey(new Function<Long,Long,Long>()
public Long call(long v1,Long v2) throws Exception
return v1+v2;
);
wordsCount .print();
sparkStreaming UV
logDStream.map(new Function<Tuple2<String,String>() public String call(Tuple2<String,String> v1) throws Exception String[] logs = v1._2.split("\\t"); Long usrID = Long.valueOf(logs[2] != null ? logs[2]:"-1"); Long pageID = Long.valueOf(logs[3]); return pageID + "_" + usrID; ).transform(new Function<JavaRDD<String>,JavaRDD<String>>() public JavaRDD<String> call (JavaRDD<String> v1) throws Exception v1.distinct(); ).mapToPair(new PairFuction<String,Long,Long>() public Tuple2<Long,LOng> call (String t) throws Exception String [] logs = t.split("_") Long pageId = Long.valueOf(logs[0]); return new Tuple2<Long,Long>(pageId,1L); ).reduceByKey(new Function2<Long,Long,Long>() public Long call (Long v1,Long v2) throws Exception return v1 + v2: ).print页面跳出
private static void onlineJumped(JavaPairInputDStream<String,String> lines ) lines.mapToPair(new PairFunction<Tuple2<String, String>,Long,Long>() @Override public Tuple2<Long, Long> call(Tuple2<String, String> t) throws Exception String[] logs = t._2().split("\\t"); Long usrId = Long.valueOf(logs[2] != null ? logs[2]:"-1"); return new Tuple2<Long, Long>(usrId,1L); ).filter(new Function<Tuple2<Long, Long>, Boolean>() @Override public Boolean call(Tuple2<Long, Long> v1) throws Exception if(1 == v1._2()) return true; else return false; ).count().print();
sparkStreaming broadcast accumulator
public class Test private static volatile Broadcast<List<String>> broadcastList = null; private static volatile Accumulator<Integer> accumulator = null; public static void main(String[] args) SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("wordCountOnline"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); broadcastList = jssc.sparkContext().broadcast(java.util.Arrays.asList("Hadoop","Mahout","Hive")); accumulator = jssc.sparkContext().accumulator(0, "OnlineBlackCounter"); JavaReceiverInputDStream lines = jssc.socketTextStream("master", 9999); JavaPairDStream<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() @Override public Tuple2<String, Integer> call(String word) throws Exception return new Tuple2<String, Integer>(word, 1); ); JavaPairDStream<String,Integer> wordCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() @Override public Integer call(Integer v1, Integer v2) throws Exception return v1 + v2; ); wordCount.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() @Override public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() @Override public Boolean call(Tuple2<String, Integer> wordPair) throws Exception if(broadcastList.value().contains((wordPair._1()))) accumulator.add(wordPair._2()); return false; else return true; ).collect(); System.out.println("BlackList appeared:" + accumulator.value()+ "times"); return null; ); jssc.start(); jssc.awaitTermination(); jssc.close();
以上是关于sparkStreaming的主要内容,如果未能解决你的问题,请参考以下文章