Kafka和Pyspark整合

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka和Pyspark整合相关的知识,希望对你有一定的参考价值。

我在大数据中天真,我试图将kafka连接到火花。这是我的生产者代码

import os
import sys
import pykafka
def get_text():
    ## This block generates my required text. 
      text_as_bytes=text.encode(text)
      producer.produce(text_as_bytes)


if __name__ == "__main__":
    client = pykafka.KafkaClient("localhost:9092")
    print ("topics",client.topics)
    producer = client.topics[b'imagetext'].get_producer()

    get_text() 

当我执行bin / kafka-console-consumer.sh时,这是在控制台消费者上打印我生成的文本--zookeeper localhost:2181 --topic imagetext --from-beginning

现在我希望使用Spark来使用这个文本,这是我的Jupyter代码

import findspark
findspark.init()
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /spark-2.1.1-bin-hadoop2.6/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'



conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)

ssc = StreamingContext(sc,5)
print('ssc =================== {} {}')

kstream = KafkaUtils.createDirectStream(ssc, topics = ['imagetext'], 
     kafkaParams = {"metadata.broker.list": 'localhost:9092'})

print('contexts =================== {} {}')
lines = kstream.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)

但这正在我的Jupyter产生输出

Time: 2018-02-21 15:03:25
-------------------------------------------

-------------------------------------------
Time: 2018-02-21 15:03:30
-------------------------------------------

不是我的控制台消费者的文字..请帮助,无法弄清楚错误。

答案

我找到了另一个解决方案。虽然将get_text()放入循环中的解决方案起作用,但它不是正确的解决方案。在Kafka发送数据时,您的数据不是连续的。因此,Spark流媒体不应该以这种方式获得它。

Kafka-python库提供get(timeout)功能,以便Kafka等待请求。

producer.send(topic,data).get(timeout=10)

既然你使用pykafka,我不确定它是否会起作用。尽管如此,你仍然可以尝试一次,不要把get_text()放在循环中。

以上是关于Kafka和Pyspark整合的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 和 Pyspark 集成

从 Docker 容器将 PySpark 连接到 Kafka

运行 pyspark kafka steam 时出错

整合Kafka到Spark Streaming——代码示例和挑战

在 PySpark 作业上打印 Kafka 调试消息

整合Kafka到Spark Streaming——代码示例和挑战