[Spark][Streaming]Spark读取网络输入的例子

Posted 健哥的数据花园

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Spark][Streaming]Spark读取网络输入的例子相关的知识,希望对你有一定的参考价值。

Spark读取网络输入的例子:

参考如下的URL进行试验

https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream
http://www.cnblogs.com/FG123/p/5324743.html

发现 先执行 nc -lk 9999 ,再执行 spark 程序之后,
如果停止 nc ,spark程序会报错:

类似于:

-------------------------------------------
Time: 2017-10-28 19:32:02
-------------------------------------------

17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

 

这表明,两者已经建立 的 通信。但是没有看到预想的 word count 输出。我猜测是 用于参与计算的进程数不够,所以进行如下改动:

sc = SparkContext("local[2]", "streamwordcount")

改为:

sc = SparkContext("local[3]", "streamwordcount")

整个程序如下:

[training@localhost ab]$ cat test.py
#showing remote messages

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":

sc = SparkContext("local[3]", "streamwordcount")
# 创建本地的SparkContext对象,包含3个执行线程

ssc = StreamingContext(sc, 2)
# 创建本地的StreamingContext对象,处理的时间片间隔时间,设置为2s

lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda line: line.split(" "))
# 使用flatMap和Split对2秒内收到的字符串进行分割

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()

ssc.start() 
# 启动Spark Streaming应用

ssc.awaitTermination()

再次运行 nc 程序

[training@localhost ~]$ nc -lk 9999

运行 spark 程序:

[training@localhost ~]$ spark-submit /home/training/ab/test.py

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

 

在nc窗口中输入一些数据:

aaa bbb ccc
ddd aaa sss
sss bbb bbb

kkk jjj mmm
ooo kkk jjj
mmm ccc ddd
eee fff sss
rrr nnn ooo
ppp sss zzz
mmm sss ttt
kkk sss ttt
rrr ooo ppp
kkk qqq kkk
lll nnn jjj
rrr ooo sss
kkk aaa ddd
aaa aaa fff
eee sss nnn
ooo ppp qqq
qqq sss eee
sss mmm nnn

 

此时,经过一小会,可以看到,spark 程序的窗口输出:

------------------------------------------- 
Time: 2017-10-28 19:33:50
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:52
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:54
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:56
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:58
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:00
-------------------------------------------
(u\'\', 1)
(u\'mmm\', 2)
(u\'bbb\', 3)
(u\'nnn\', 1)
(u\'ccc\', 2)
(u\'rrr\', 1)
(u\'sss\', 3)
(u\'fff\', 1)
(u\'aaa\', 2)
(u\'ooo\', 2)
...

------------------------------------------- 
Time: 2017-10-28 19:34:02
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:04
-------------------------------------------
(u\'ppp\', 1)
(u\'sss\', 1)
(u\'zzz\', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:06
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:08
-------------------------------------------
(u\'mmm\', 1)
(u\'sss\', 1)
(u\'ttt\', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:10
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:12
-------------------------------------------
(u\'sss\', 1)
(u\'ttt\', 1)
(u\'kkk\', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:14
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:16
-------------------------------------------
(u\'ppp\', 1)
(u\'rrr\', 1)
(u\'ooo\', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:18
-------------------------------------------
(u\'qqq\', 1)
(u\'kkk\', 2)

------------------------------------------- 
Time: 2017-10-28 19:34:20
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:22
-------------------------------------------

 

以上是关于[Spark][Streaming]Spark读取网络输入的例子的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming 读取 Kafka 数据的两种方式

Spark Streaming 实现读取Kafka 生产数据

失败后读取 Spark Streaming 检查点

通过 Apache Spark Streaming 从 RabbitMq 读取消息

Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题

Spark Streaming读取Kafka数据两种方式