Kafka 和 Pyspark 集成
Posted
技术标签:
【中文标题】Kafka 和 Pyspark 集成【英文标题】:Kafka and Pyspark Integration 【发布时间】:2018-07-31 21:07:38 【问题描述】:我对大数据很天真,我正在尝试将 kafka 连接到 spark。 这是我的生产者代码
import os
import sys
import pykafka
def get_text():
## This block generates my required text.
text_as_bytes=text.encode(text)
producer.produce(text_as_bytes)
if __name__ == "__main__":
client = pykafka.KafkaClient("localhost:9092")
print ("topics",client.topics)
producer = client.topics[b'imagetext'].get_producer()
get_text()
当我这样做时,这是在控制台使用者上打印我生成的文本 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imagetext --from-beginning
现在我希望使用 Spark 使用此文本,这是我的 Jupyter 代码
import findspark
findspark.init()
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /spark-2.1.1-bin-hadoop2.6/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,5)
print('ssc =================== ')
kstream = KafkaUtils.createDirectStream(ssc, topics = ['imagetext'],
kafkaParams = "metadata.broker.list": 'localhost:9092')
print('contexts =================== ')
lines = kstream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)
但这会在我的 Jupyter 上产生输出为
Time: 2018-02-21 15:03:25
-------------------------------------------
-------------------------------------------
Time: 2018-02-21 15:03:30
-------------------------------------------
不是我的控制台消费者上的文本.. 请帮忙,无法找出错误。
【问题讨论】:
Kafka 客户端的数据是否流入?因为如果它发送单个文件并且此后什么都不做(在该文件之后不发送任何数据),则没有流数据到达 spark 流。 数据已经在控制台消费者上,目前没有新数据。 我认为这就是问题所在。 Spark 流式传输需要流式传输的数据。 Kafka 应该不断地向 spark 发送数据。 尝试一次将 get_text() 置于无限循环中以连续发送数据。我使用 KafkaProducer 并将发送放在一个循环中。 producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER,acks=ACKS)\nr = requests.get("stream.meetup.com/2/rsvps",stream=True)\n for line in r.iter_lines():\n producer.send(TOPIC_KAFKA,line,key=TOPIC_KAFKA,partition =0)\n 打印行 非常感谢。它似乎工作! 【参考方案1】:我找到了另一个解决方案。虽然将get_text()
放入循环中的解决方案有效,但它不是正确的解决方案。当您在 Kafka 中发送数据时,您的数据并不是连续的。因此,Spark 流不应该以这种方式获得它。
Kafka-python 库提供了get(timeout)
功能,以便 Kafka 等待请求。
producer.send(topic,data).get(timeout=10)
由于您使用的是pykafka
,我不确定它是否会起作用。不过,您仍然可以尝试一次,不要将get_text()
放入循环中。
【讨论】:
【参考方案2】:只需将消费者中的端口从 9092 更改为 2181,因为它是 Zookeeper。从生产者端,它必须连接到端口号为 9092 的 Kafka。从流器端,它必须连接到端口号为 2181 的 Zookeeper。
【讨论】:
以上是关于Kafka 和 Pyspark 集成的主要内容,如果未能解决你的问题,请参考以下文章
从 Docker 容器将 PySpark 连接到 Kafka
PySpark 和 Kafka “Set 已消失。某些数据可能已丢失……”