PySpark 直接从 Kafka 流式传输

Posted

技术标签:

【中文标题】PySpark 直接从 Kafka 流式传输【英文标题】:PySpark direct streaming from Kafka 【发布时间】:2016-02-16 21:20:54 【问题描述】:

目标

我的目标是获得一个简单的 Spark Streaming 示例,该示例使用与 Kafka 交互的直接方法,但我无法克服特定错误。

理想的结果是打开两个控制台窗口。一个我可以输入句子,另一个显示所有句子的“实时”字数。

控制台 1

猫喜欢熏肉

我的猫吃了培根

控制台 2

时间:..

[("the", 2), ("cat", 1), ("likes", 1), ("bacon", 1)]

时间:..

[("the", 3), ("cat", 2), ("likes", 1), ("bacon", 2), ("my", 1), ("ate", 1 )]

采取的步骤

下载并解压

kafka_2.10-0.8.2.0
spark-1.5.2-bin-hadoop2.6

在不同的屏幕中启动 ZooKeeper 和 Kafka 服务器。

screen -S zk
bin/zookeeper-server-start.sh config/zookeeper.properties

“Ctrl-a”“d”分离屏幕

screen -S kafka
bin/kafka-server-start.sh config/server.properties

"Ctrl-a" "d"

启动 Kafka 生产者

使用单独的控制台窗口并在其中输入单词以模拟流。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

启动 Pyspark

使用 Spark 流式处理 Kafka 包。

bin/pyspark --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2

运行简单的字数统计

基于docs 中的示例。

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 2)
topic = "test"
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()

错误

在 Kafka 生产者控制台中输入单词只会产生一次结果,但随后会引发一次以下错误并且不会产生进一步的结果(尽管“时间”部分继续出现)。

Time: 2015-11-15 18:39:52
-------------------------------------------

15/11/15 18:42:57 ERROR PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
        at java.net.ServerSocket.implAccept(ServerSocket.java:530)
        at java.net.ServerSocket.accept(ServerSocket.java:498)
        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
Traceback (most recent call last):
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/streaming/util.py", line 62, in call
    r = self.func(t, *rdds)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/streaming/dstream.py", line 171, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/rdd.py", line 1299, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/context.py", line 917, in runJob
    return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/usr/lib/python2.7/socket.py", line 380, in read
    data = self._sock.recv(left)
error: [Errno 104] Connection reset by peer

任何帮助或建议将不胜感激。

【问题讨论】:

【参考方案1】:

尝试运行: spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.1 your_python_file_name.py 您可以设置其他参数(--deploy-mode 等)

【讨论】:

【参考方案2】:

在创建 DSstreams RDD 之后,我们应该使用 foreachRDD 来迭代 RDD。

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 2)
ssc = StreamingContext(sc, 2)
topic = "test"
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
kvs.foreachRDD(handler)
def handler(message):
    records = message.collect()
    for record in records:
         <Data processing whatever you want >

【讨论】:

以上是关于PySpark 直接从 Kafka 流式传输的主要内容,如果未能解决你的问题,请参考以下文章

如何从本地目录中读取,kmeans 流式传输 pyspark

火花流到pyspark json文件中的数据帧

将 Pyspark 与 Kafka 连接起来

PySpark 处理流数据并将处理后的数据保存到文件

Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题

Spark 流式传输 Kafka 消息未使用