pySpark Kafka Direct Streaming 更新 Zookeeper / Kafka Offset

Posted

技术标签:

【中文标题】pySpark Kafka Direct Streaming 更新 Zookeeper / Kafka Offset【英文标题】:pySpark Kafka Direct Streaming update Zookeeper / Kafka Offset 【发布时间】:2017-10-21 22:39:06 【问题描述】:

目前我正在使用 Kafka / Zookeeper 和 pySpark (1.6.0)。 我已经成功创建了一个使用 KafkaUtils.createDirectStream() 的 kafka 消费者。

所有流媒体都没有问题,但我认识到,在我消费了一些消息后,我的 Kafka 主题没有更新到当前偏移量。

由于我们需要更新主题才能在此处进行监控,这有点奇怪。

在 Spark 的文档中我发现了这条评论:

   offsetRanges = []

     def storeOffsetRanges(rdd):
         global offsetRanges
         offsetRanges = rdd.offsetRanges()
         return rdd

     def printOffsetRanges(rdd):
         for o in offsetRanges:
             print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)

     directKafkaStream\
         .transform(storeOffsetRanges)\
         .foreachRDD(printOffsetRanges)

如果您希望基于 Zookeeper 的 Kafka 监控工具显示流应用程序的进度,您可以使用它自己更新 Zookeeper。

这里是文档: http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

我在 Scala 中找到了一个解决方案,但我找不到 python 的等价物。 这是 Scala 示例:http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

问题

但问题是,我如何才能从那时起更新 zookeeper?

【问题讨论】:

【参考方案1】:

我用 python kazoo 库编写了一些函数来保存和读取 Kafka 偏移量。

获取 Kazoo 客户端单例的第一个函数:

ZOOKEEPER_SERVERS = "127.0.0.1:2181"

def get_zookeeper_instance():
    from kazoo.client import KazooClient

    if 'KazooSingletonInstance' not in globals():
        globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
        globals()['KazooSingletonInstance'].start()
    return globals()['KazooSingletonInstance']

然后函数读取和写入偏移量:

def read_offsets(zk, topics):
    from pyspark.streaming.kafka import TopicAndPartition

    from_offsets = 
    for topic in topics:
        for partition in zk.get_children(f'/consumers/topic'):
            topic_partion = TopicAndPartition(topic, int(partition))
            offset = int(zk.get(f'/consumers/topic/partition')[0])
            from_offsets[topic_partion] = offset
    return from_offsets

def save_offsets(rdd):
    zk = get_zookeeper_instance()
    for offset in rdd.offsetRanges():
        path = f"/consumers/offset.topic/offset.partition"
        zk.ensure_path(path)
        zk.set(path, str(offset.untilOffset).encode())

然后在开始流式传输之前,您可以从 zookeeper 读取偏移量并将它们传递给createDirectStream 对于fromOffsets 参数。:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
    sc = SparkContext(appName="PythonStreamingSaveOffsets")
    ssc = StreamingContext(sc, 2)

    zk = get_zookeeper_instance()
    from_offsets = read_offsets(zk, topics)

    directKafkaStream = KafkaUtils.createDirectStream(
        ssc, topics, "metadata.broker.list": brokers,
        fromOffsets=from_offsets)

    directKafkaStream.foreachRDD(save_offsets)


if __name__ == "__main__":
    main()

【讨论】:

【参考方案2】:

我遇到了类似的问题。 你是对的,通过使用directStream,意味着直接使用kafka低级API,它没有更新阅读器偏移量。 有几个scala/java的例子,但python没有。 但是自己做很容易,你需要做的是:

从开头的偏移量读取 保存最后的偏移量

例如,我通过以下方式将每个分区的偏移量保存在 redis 中:

stream.foreachRDD(lambda rdd: save_offset(rdd))
def save_offset(rdd):
  ranges = rdd.offsetRanges()
  for rng in ranges:
     rng.untilOffset # save offset somewhere

然后在开始时,您可以使用:

fromoffset = 
topic_partition = TopicAndPartition(topic, partition)
fromoffset[topic_partition]= int(value) #the value of int read from where you store previously.

对于一些使用zk跟踪偏移量的工具,最好将偏移量保存在zookeeper中。 这一页: https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html 描述如何设置偏移量,基本上zk节点是: /consumers/[consumer_name]/offsets/[topic name]/[partition id] 因为我们使用的是directStream,所以你必须组成一个消费者名称。

【讨论】:

感谢您的回答,但我仍然不确定我可以从 pySpark 使用哪个框架来更新 Kafka 分区上的偏移量。

以上是关于pySpark Kafka Direct Streaming 更新 Zookeeper / Kafka Offset的主要内容,如果未能解决你的问题,请参考以下文章

streaming kafka direct 详解

Spark Streaming 基于 Direct API 优化与 Kafka 集成

Spark Streaming 基于 Direct API 优化与 Kafka 集成

Spark Kafka 基于Direct自己管理offset

Spark Direct Stream 不会为每个 kafka 分区创建并行流

Spark 学习笔记之 Streaming和Kafka Direct