sparkStreaming

Posted 漠小浅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparkStreaming相关的知识,希望对你有一定的参考价值。

速查手册:

读取socket端数据   

JavaReceiverInputDStream lines = jssc.socketTextStream("master", 9999);

监控HDFS文件系统(只能检测文件夹下面新增的文件,对文件内容的追加和删除没有作用) 

JavaDStream lines = jssc.textFileStream("hdfs://Master:9000/sparkStreaming/data");//监控新增的文件。

flume推送本地文件到HDFS配置参数:

#agent1表示代理名称

agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1

#Spooling Directory是监控指定文件夹中新文件的变化,一旦新文件出现,就解析该文件内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。
#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/flume_data
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp


#配置sink1
agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=hdfs://hadoop0:9000/flume
agent1.sinks.sink1.hdfs.fileType=DataStream
agent1.sinks.sink1.hdfs.writeFormat=TEXT
agent1.sinks.sink1.hdfs.rollInterval=1
agent1.sinks.sink1.channel=channel1
agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d


#配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/root/hmbbs_tmp/123
agent1.channels.channel1.dataDirs=/root/hmbbs_tmp/

执行命令bin/flume-ng agent -n agent1 -c conf -f conf/example -Dflume.root.logger=DEBUG,console

flume推送(push)数据到sparkStreaming

#agent1表示代理名称

agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1

#Spooling Directory是监控指定文件夹中新文件的变化,一旦新文件出现,就解析该文件内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。
#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/flume_data
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp


#配置sink1
#agent1.sinks.sink1.type=hdfs
#agent1.sinks.sink1.hdfs.path=hdfs://hadoop0:9000/flume
#agent1.sinks.sink1.hdfs.fileType=DataStream
#agent1.sinks.sink1.hdfs.writeFormat=TEXT
#agent1.sinks.sink1.hdfs.rollInterval=1
#agent1.sinks.sink1.channel=channel1
#agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d

agent1.sinks.sink1.type=avro

agent1.sinks.sink1.channel = channel1

agent1.sinks.sink1.hostname=master

agent1.sinks.sink1.port=9999


#配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/root/hmbbs_tmp/123
agent1.channels.channel1.dataDirs=/root/hmbbs_tmp/


添加依赖:

<dependency>

<groupId>org.apache.spark</group>

<artifactId>spark-streaming-flume_2.10</artifactId>

<version>1.6.0</version>

</dependency>

 JavaReceiverInputDStream lines =FlumeUtils.createStream(jssc,"master",9999)

JavaDStream words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>()
            @Override
            public Iterable<String> call(SparkFlumeEvent event) throws Exception
                //一个文件就相当于一个event
                String line = new String(event.event().getBody().array());
                return Arrays.asList((line.split(" ")));
           
        );

sparkStreaming拉取(poll)数据从flume      

agent1.sinks.sink1.type=org.apache.spark.streamingflume.sink.SparkSink

agent1.sinks.sink1.channel = channel1

agent1.sinks.sink1.hostname=master

agent1.sinks.sink1.port=9999

下载jar包到flume的lib下面:http://spark.apache.org/docs/latest/streaming-flume-integration.html


JavaReceiverInputDStream lines = FlumeUtils.createPollingStream(jssc,"master",9999);

sparkStreamingOnKafkaReceiver
小节:这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层Consumer API来实现的。reciver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。 然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启动高可可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL).该机制会同步地接受到Kafka数据 写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

参数1是streamingContext实例;

参数2是ZooKeeper集群信息(接受Kafka数据的时候会从Zookeeper中获取offerset等元数据信息)

参数3是Cousumer Group

参数4是消费的Topic以及并发读取Topic中Partition的线程数

为什么需要参数二呢?因为spark Streaming把offset的信息存放在了zookeeper中,所以需要,那为什么需要第四个参数呢?首先topic是必须的,他的integer是什么呢?因为spark Streaming可以让用户自己设置receiver读取kafk数据分片的并行度,但是

,需要注意的是,它设置的是receiver并行读取数据的线程,并不是设置的是RDD的partition的数量,RDD的partition的数量和Kafka的数据分片的数量没有任何关系。

Map<String,Integer> topicConsumerConcurrency = new HashMap<String,Integer>();

topicConsumerConcurrency .put("helloKafkaFromSparkStreaming",2)

JavaPairReceiverInputDstream<String,String> lines = KafkaUtils.createStream(jsc,"master:2181,slave1:2181,slave2:2181","MyFirstConsumerGroup",topicConsumerConcurrency );

lines.flatMap(new FlatMapFunction<String,String>,String()

public Iterable<String>call(Tuple2<String,String>tuple) throws Exception

return Array.asList(tuple._2.split(" "));

)

启动kafka(nuhup后台)

nohup ./kafka-server-start.sh ../config/server.properties &

创建topic

./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partition 1 --topic helloKafkaFromSparkStreaming

创建producer

./kafka-console-producer.sh --broker-list master:9092,slave9092,slave2:9092 --topic helloKafkaFromSparkStreaming

创建consumer

bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic helloKafkaFromSparkStreaming --from-begining

sparkStreaming on Kafka DIrect
小节:这种新的不基于Receiver的直接方式,替代掉了Receiver来接受数据,这种方式会周期性的查询Kafka,来获取每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单cousumer api来获取Kafka指定的offset范围的数据。 优点: 1,简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD Partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。 2:高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会绘制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中做了数据的复制,那么就可以通过Kafka的副本进行恢复。 3,一次且仅一次的事务机制 基于receiver的方式,是使用Kafka的高阶API来在Zookeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据领丢失的高可靠性,但是却无法保证数据被处理一次仅且一次,可能会处理两次。因为spark和Zookeeper之间可能是不同步的。 基于direct的方式,使用Kafka的简单api,Spark Streaming自己就负责追踪消费的offset并保存在checkpoint中。spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

HashMap<String,String> KafkaParams =  new HashMap<String,String>();
        KafkaParams.put("metadata.borker.list","master:9092,slave1:9092,slave2:9092");
        HashSet<String> topics = new HashSet<String>();
        topics.add("topics");
        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc,String.class,String.class, StringDecoder.class,StringDecoder.class,KafkaParams,topics);

这里需要Kafka的metadata.borker.list,不需要Zookeeper的相关信息了,愿意就是spark streaming自己保存offset,直接从Kafka中读取offset,所以需要kafks的broker列表。

topic也不需要设置receiver读取kafka数据的并行进程数,因为spark会创建跟Kafka partition一样多的RDD partition,并且会并行地从kafka中读取数据。kafka partiton和RDD partition之间是一一对应的。


scala版本:KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet);

updateStateByKey

jsc.checkpoing("            ");//必须指定

JavaPairDStream<String,Integer> wordsCount = pairs.updataStateBykey(newFuction2<List<Integer>,Optional<Integer>,Option<Integer>,Optional<Integer>>()

public Option<Integer> call(List<Integer>values,Option<Ingeger>state)throws Exception

Integer updatedValue = 0;

if(state.isPresent()

updatedValue = state.get();

for(Integer value:values)

updatedValues += value;

return Optional.of(updatedValue);

)

transform 黑名单过滤

val blackList = Array(("hadoop",true),("mahout",true))

val blackListRDD = sc.sparkContext.parallelize(blackList,8)

val adsClickStream = sss.socketTextStream("master",9999)

//adf格式:time name

//map结果:name,(time,name)

val adsClickStreamFormatted = adsClickStream.mapads => (ads.split(" ")(1),ads)

adsClickStreamFormatted.trransform(userClickRDD =>

val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)

val validClicked = joinedBlackListRDD .filter(joinedItem =>

if(joinedItem._2._2.getOrElse(false))

false

else

true

)

validClicked .map(validClicked  => validClick._2._1

).print

reduceByKeyAndWindow 在线热搜词

注意:窗口和滑动时间间隔一定是Batch Interval的整数倍

//数据格式 user item

val hottestStream= ssc.socketTextStream("master",9999);

val searchPair =  hottestStream.map(_.split(" ")(1)).map(item => (item,1)

//val hottestDStream =  searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1+v2,Seconds(60),Seconds(20))

val hottestDStream =  searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1+v2,(v1:Int,v2:int) => v1-v2,Seconds(60),Seconds(20))

hottestDStream.transform(hottestItemRDD =>

val top3 = hottestItemRDD.map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).take(3)

for(item <- top3)

println(item)

hottestItemRDD //这个无所谓,因为transform需要返回一个RDD

).print()

foreachRDD 写入数据到外部数据源

val wordCounts = words.map(x=>(x,1)).reduceByKey(_+_)

wordCounts.foreachRDD rdd =>

rdd.foreachPartition partitionOfRecords =>

val connection = ConnectionPool.getConnection();

partitionOfRecords.foreach(record =>

val sql = "insert into streaming_itemcount(item,count) values(' "+ record._1+" ' ,"+record._2 +")"

val stmt = connection.createStatement();

stmt.executeUpdate(sql);

)

ConnectionPool.returnConnection(connection)

连接池简单实现:

public  class ConnectionPool


    private static LinkedList<Connection> connectionQueue;


    static
        try
            Class.forName("com.mysql.jdbc.Driver");
        catch (ClassNotFoundException e)
            e.printStackTrace();
       
   


    public synchronized static Connection getConnection()
        try
            if(connectionQueue == null)
                connectionQueue = new LinkedList<Connection>();
                for(int i = 0; i < 5; i++)
                    Connection conn = DriverManager.getConnection(
                          "jdbc:mysql://master:3306/sparkstreaming",
                            "root",
                            "root");
                    connectionQueue.push(conn);
               
           
        catch(Exception e)
            e.printStackTrace();
       
        return connectionQueue.poll();
   


    public static void returnConnection(Connection conn) connectionQueue.push(conn);

sparkStreaming  And SQL

使用sparkStreaming SQL来在线动态计算电商中不同类别中最热门的商品排名。

//数据格式: user item category

val userClickLogsDStream = ssc.socketTextStream("master",9999);

val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>

(clickLog.split(" ")(2)+"_"+clickLog.split(" ")(1),1))

val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,_-_,Seconds(60),Second(20))

categoryUserClickLogsDStream.foreachRDD rdd =>

val categoryItemRow = rdd.map(reducedItem =>

val category = reducedItem._1.split("_")(0)

val item= reducedItem._1.split("_")(1)

val click_count = reducedItem._2

Row(category,item,click_count )

)

val structType = StructType(Array(

StructField("category",StringType,true)

StructField("item",StringType,true)

StructField("click_count ",IntegerType,true)

))

val hiveContext = new hiveContext(rdd.sparkContext)

categoryItemDF = hiveContext.createDataFrame(categoryItemRow,structType)

categoryItemDF.registerTempTable("categoryItemTable")

val resultDataFram = hiveContext.sql("select category,item,click_count from (select category,item,click_count,row_number()"+

" over (partition by category order by click_count desc ) rank from categoryItemTable) subquery " +

"  where rank <= 3" )

val resultRowRDD = resultDataFram.rdd

resultRowRDD .foreachRDD rdd =>

rdd.foreachPartition partitionOfRecords =>

if(partitionOfRecords.isEmpty)

else

val connection = ConnectionPool.getConnection();

partitionOfRecords.foreach(record =>

val sql = "insert into categorytop3(category,item,click_count) values(' "+ record.getAs("category")+" ' ,' "+record.getAs("item")+" '," + record.getAs("click_count") + ")"

val stmt = connection.createStatement();

stmt.executeUpdate(sql);

)

ConnectionPool.returnConnection(connection)

Kafka Producer

/**
 * 论坛数据自动生成代码,数据格式如下:
 * date:日期,格式为yyyy-MM-dd
 * timestamp:时间戳
 * userID:用户ID
 * pageID:页面ID
 * channel:板块ID
 * action:点击和注册
 */
public class SparkStreamingDataManuallyProducerforKafka extends Thread


    //论坛板块
    static String[] channelNames = new String[] 
            "spark","scala","kafka","Flink","hadoop","Storm",
            "Hive","Impala","Hbase","ML"
    ;
    static  String[] actionNames = new String[]"View","Register";

    private String topic;//发送给Kafak的数据的类别
    private Producer<Integer,String> producerForKaka;

    private static  String dataToday;
    private static Random random;

    public SparkStreamingDataManuallyProducerforKafka(String topic)
        dataToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
        this.topic = topic;
        random = new Random();
        Properties conf = new Properties();
        conf.put("metadata.broker.list", "master:9092,slave1:9092,slave2:9092");
        conf.put("serializer.class","kafka.serializer.StringEncoder");
        producerForKaka = new Producer<Integer,String>(new ProducerConfig(conf));
    


    @Override
    public void run() 
        int counter = 0;
        while(true) 
            counter++;
            String userLog = userLogs();
            System.out.println("product:" + userLog);
            producerForKaka.send(new KeyedMessage<Integer, String>(topic,userLog));

            if(500 == counter) 
                counter = 0;
                try 
                    Thread.sleep(1000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        
    

    public static void main(String[] args) 

        new SparkStreamingDataManuallyProducerforKafka("UserLogs").start();
    

    private static String userLogs() 
        StringBuffer userLogBuffer = new StringBuffer("");
            long timestamp = new Date().getTime();
            long userID = 0L;
            long pageID = 0L;

            //随机生成的用户ID
            userID = random.nextInt((int) 2000);
            //随机生成的页面ID
            pageID = random.nextInt((int) 2000);
            //随机生成Chan
            String channel = channelNames[random.nextInt(10)];
            //随机生成action行为
            String action = actionNames[random.nextInt(2)];
            userLogBuffer.append(dataToday)
                    .append("\\t")
                    .append(timestamp)
                    .append("\\t")
                    .append(userID)
                    .append("\\t")
                    .append(pageID)
                    .append("\\t")
                    .append(channel)
                    .append("\\t")
                    .append(action);
        return userLogBuffer.toString();
    

sparkStreaming PV

HashMap<String,String> KafkaParams =  new HashMap<String,String>();
        KafkaParams.put("metadata.borker.list","master:9092,slave1:9092,slave2:9092");
        HashSet<String> topics = new HashSet<String>();
        topics.add("topics");
        JavaPairInputDStream<String, String> lines= KafkaUtils.createDirectStream(jssc,String.class,String.class, StringDecoder.class,StringDecoder.class,KafkaParams,topics);

JavaPairInputDStream<String, String> logDStream = lines.filter(new Function<Tuple2<String,String,>,Boolean> ()

public Boolean call(Tuple2<String,String> v1) throws Exception

String[] logs = v1._2.split("\\t");

String action = logs[5];

if("View".equals(action))

return true;

else

return false;

);

JavaPairInputDStream<Long,Long> pairs = logDStream.mapToPair(new PairFunction<String,String>,Long,Long>()

public Tuple2<Long,Long> call(Tuple2<String,String,t) throws Exception

String[] logs = t._2.split("\\t");

Long pageId = Long.valueOf(logs[3]);

return new Tuple2<Long,Long>(pageId,1L);

);

JavaPairDSteam<Long,Long> wordsCount = pairs.reduceByKey(new Function<Long,Long,Long>()

public Long call(long v1,Long v2) throws Exception

return v1+v2;

);

wordsCount .print();

sparkStreaming UV
logDStream.map(new Function<Tuple2<String,String>() public String call(Tuple2<String,String> v1) throws Exception String[] logs = v1._2.split("\\t"); Long usrID = Long.valueOf(logs[2] != null ? logs[2]:"-1"); Long pageID = Long.valueOf(logs[3]); return pageID + "_" + usrID; ).transform(new Function<JavaRDD<String>,JavaRDD<String>>() public JavaRDD<String> call (JavaRDD<String> v1) throws Exception v1.distinct(); ).mapToPair(new PairFuction<String,Long,Long>() public Tuple2<Long,LOng> call (String t) throws Exception String [] logs = t.split("_") Long pageId = Long.valueOf(logs[0]); return new Tuple2<Long,Long>(pageId,1L); ).reduceByKey(new Function2<Long,Long,Long>() public Long call (Long v1,Long v2) throws Exception return v1 + v2: ).print
页面跳出
private static void onlineJumped(JavaPairInputDStream<String,String> lines ) 
    lines.mapToPair(new PairFunction<Tuple2<String, String>,Long,Long>() 
        @Override
        public Tuple2<Long, Long> call(Tuple2<String, String> t) throws Exception 
            String[] logs = t._2().split("\\t");
            Long usrId = Long.valueOf(logs[2] != null ? logs[2]:"-1");
            return new Tuple2<Long, Long>(usrId,1L);
        
    ).filter(new Function<Tuple2<Long, Long>, Boolean>() 
        @Override
        public Boolean call(Tuple2<Long, Long> v1) throws Exception 
           if(1 == v1._2()) 
               return  true;
            else 
               return false;
           
        
    ).count().print();
sparkStreaming broadcast accumulator
public class Test 
    private  static volatile Broadcast<List<String>> broadcastList = null;
    private  static volatile Accumulator<Integer> accumulator = null;
    public static void main(String[] args) 
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("wordCountOnline");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

        broadcastList = jssc.sparkContext().broadcast(java.util.Arrays.asList("Hadoop","Mahout","Hive"));
        accumulator = jssc.sparkContext().accumulator(0, "OnlineBlackCounter");

        JavaReceiverInputDStream lines = jssc.socketTextStream("master", 9999);
        JavaPairDStream<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() 
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception 
                return new Tuple2<String, Integer>(word, 1);
            
        );

       JavaPairDStream<String,Integer> wordCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() 
           @Override
           public Integer call(Integer v1, Integer v2) throws Exception 
               return v1 + v2;
           
       );

        wordCount.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() 
            @Override
            public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception 
                rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() 
                    @Override
                    public Boolean call(Tuple2<String, Integer> wordPair) throws Exception 
                        if(broadcastList.value().contains((wordPair._1())))
                            accumulator.add(wordPair._2());
                            return false;
                         else 
                            return true;
                        
                    
                ).collect();
                System.out.println("BlackList appeared:" + accumulator.value()+ "times");
                return null;
            
        );

        jssc.start();
        jssc.awaitTermination();
        jssc.close();

    













以上是关于sparkStreaming的主要内容,如果未能解决你的问题,请参考以下文章

大数据(8p)SparkStreaming精准一次消费Kafka

SparkStreaming 性能调优

SSH开发模式——Struts2(第三小节)

SparkStreaming 打印输出demo

SparkStreaming 打印输出demo

SparkStreaming流处理