如何使用kafka流处理块/批处理数据?

Posted

技术标签:

【中文标题】如何使用kafka流处理块/批处理数据?【英文标题】:how to process data in chunks/batches with kafka streams? 【发布时间】:2019-02-21 07:29:45 【问题描述】:

对于大数据中的许多情况,最好一次使用少量记录缓冲区,而不是一次处理一条记录。

最自然的例子是调用一些支持批处理以提高效率的外部 API。

我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何我想要的东西。

到目前为止,我有:

builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")

我想要的是:

builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")

在 Scala 和 Akka Streams 中,该函数称为 groupedbatch。在 Spark Structured Streaming 中,我们可以使用 mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))

【问题讨论】:

为什么不直接安排处理,然后从流中读取到 chunkSize 记录? 您问题的一个旁注是,从流处理器调用外部 API 并不总是最好的模式。有时您会发现,最好将外部数据作为自己的主题引入 Kafka 本身(例如来自数据库、大型机等的 CDC),然后轻松加入流处理本身。 Spark 中的 mapPartitions 不保证分区大小。只有流式传输的持续时间会影响窗口大小 正如@RobinMoffatt 提到的,将外部数据加载到 Kafka 主题中,将其作为 KTable 读取到您的应用程序中并执行流表连接而不是外部 API 调用可能会更好。 除此之外,您可以使用 transform() 和附加的 state 并手动建立批次。如果状态大小小于 200,则将记录放入存储中。如果达到 200 条记录,提取所有数据,进行外部 API 调用---注意,您需要同步进行---,然后清除存储。 【参考方案1】:

您可以使用队列。如下所示,

@Component
@Slf4j
public class NormalTopic1StreamProcessor extends AbstractStreamProcessor<String> 

    public NormalTopic1StreamProcessor(KafkaStreamsConfiguration configuration) 
        super(configuration);
    

    @Override
    Topology buildTopology() 
        KStream<String, String> kStream = streamsBuilder.stream("normalTopic", Consumed.with(Serdes.String(), Serdes.String()));
        // .peek((key, value) -> log.info("message received by stream 0"));
        kStream.process(() -> new AbstractProcessor<String, String>() 
            final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
            final List<String> collection = new ArrayList<>();

            @Override
            public void init(ProcessorContext context) 
                super.init(context);
                context.schedule(Duration.of(1, ChronoUnit.MINUTES), WALL_CLOCK_TIME, timestamp -> 
                    processQueue();
                    context().commit();
                );
            

            @Override
            public void process(String key, String value) 
                queue.add(value);
                if (queue.remainingCapacity() == 0) 
                    processQueue();
                
            

            public void processQueue() 
                queue.drainTo(collection);
                long count = collection.stream().peek(System.out::println).count();
                if (count > 0) 
                    System.out.println("count is " + count);
                    collection.clear();
                
            
        );
        kStream.to("normalTopic1");
        return streamsBuilder.build();
    


【讨论】:

值得注意的是,即使处理器从不调用context.commit(),也会发生提交,因此此实现可能会提交processQueue()从未处理过的偏移量,请参阅:***.com/q/54075610/1011662【参考方案2】:

似乎还不存在。关注这个空间https://issues.apache.org/jira/browse/KAFKA-7432

【讨论】:

【参考方案3】:

我怀疑 Kafka 流目前是否像其他工具一样支持固定大小的窗口。 但是有基于时间的窗口,由 kafka 流支持。 https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing

您可以随时间定义窗口大小,而不是记录数。

    翻滚时间窗口 滑动时间窗口 会话窗口 跳跃时间窗口

在您的情况下,可以选择使用 Tumbling Time Window。这些是不重叠的、固定大小的时间窗口。

例如,大小为 5000 毫秒的翻转窗口具有可预测性 窗口边界 [0;5000),[5000;10000),... - 而不是 [1000;6000),[6000;11000),... 甚至是“随机”的东西 [1452;6452),[6452;11452),....

【讨论】:

似乎我们在这种方法上走在了正确的轨道上,但窗口化似乎只与连接和聚合有关。我们想做一张地图。如何通过聚合进行地图操作??

以上是关于如何使用kafka流处理块/批处理数据?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 kafka 流在 kafka 中进行请求响应?

如何构建安全的Kafka集群

kafka基本概念

如何将火花流 DF 写入 Kafka 主题

Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?

如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?