Flask - 拉取实时流 kafka 数据 - 将 Kafka 与 Python Flask 集成
Posted
技术标签:
【中文标题】Flask - 拉取实时流 kafka 数据 - 将 Kafka 与 Python Flask 集成【英文标题】:Flask - Pulling the live stream kafka data - Integrating Kafka with Python Flask 【发布时间】:2016-05-15 02:25:56 【问题描述】:这个项目是为了real time search engine - log analysis
的性能。
我有一个从 Spark 处理到 Kafka 的实时流数据。
现在有了 Kafka 输出,
我想get the data from the Kafka using Flask
.. 和visualize it using Chartjs
或其他一些可视化..
如何从Kafka using the python flask
获取直播数据?
知道如何开始吗?
任何帮助将不胜感激!
谢谢!
【问题讨论】:
你试过什么?社区无法解决这个问题,你的问题太板了。 【参考方案1】:我会查看适用于 python 的 Kafka 包:
http://kafka-python.readthedocs.org/en/master/usage.html
这应该让您设置为从 Kafka 流式传输数据。此外,我可能会查看这个项目:https://github.com/travel-intelligence/flasfka,它与一起使用 Flask 和 Kafka 相关(刚刚在谷歌搜索中找到)。
【讨论】:
【参考方案2】:我正在解决类似的问题(带有来自 Kafka 的实时流数据的小型 Flask 应用程序)。
您必须做几件事来设置它。首先,您需要KafkaConsumer 来获取消息:
from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id='groupid', boostrap_servers=kafkakserver)
consumer.subscribe(topics=['topicid'])
try:
# this method should auto-commit offsets as you consume them.
# If it doesn't, turn on logging.DEBUG to see why it gets turned off.
# Not assigning a group_id can be one cause
for msg in consumer:
# TODO: process the kafka messages.
finally:
# Always close your producers/consumers when you're done
consumer.close()
这是关于最基本的KafkaConsumer。 for
循环阻塞线程并循环,直到它提交最后一条消息。还有consumer.poll()
方法可以在给定时间内获取您可以获取的消息,具体取决于您希望如何构建数据流。 Kafka 在设计时考虑了长期运行的消费者进程,但如果您正确提交消息,您也可以根据需要打开和关闭消费者。
现在你有了数据,所以你可以用 Flask 将它流式传输到浏览器。我对 ChartJS 不熟悉,但 live streaming from Flask 集中在调用一个 Python 函数,该函数在循环内以 yield
结尾,而不仅仅是处理结束时的 return
。
查看Michael Grinberg's blog 和his followup 上的流式传输作为使用 Flask 进行流式传输的实际示例。 (注意:任何实际在严肃的 Web 应用程序中流式传输视频的人都可能希望使用 ffmpy 将其编码为广泛使用的 H.264 之类的视频编解码器,并将其包装在 MPEG-DASH 中......或者可能选择一个可以执行更多操作的框架这个东西给你。)
【讨论】:
以上是关于Flask - 拉取实时流 kafka 数据 - 将 Kafka 与 Python Flask 集成的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一