Kafka Streams 表转换

Posted

技术标签:

【中文标题】Kafka Streams 表转换【英文标题】:Kafka Streams table transformations 【发布时间】:2018-02-28 10:44:30 【问题描述】:

我在 SQL Server 中有一个表,我想流式传输到 Kafka 主题,结构如下:

(UserID, ReportID)

此表将不断更改(添加、插入、无更新记录)

我想把这个变成这样的结构,放到 Elasticsearch 中:


  "UserID": 1,
  "Reports": [1, 2, 3, 4, 5, 6]

到目前为止,我看到的示例是日志或点击流,它们在我的情况下不起作用。

这种用例可能吗?我总是可以只看UserID 更改和查询数据库,但这似乎很幼稚,并不是最好的方法。

更新

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.ArrayList;
import java.util.Properties;

public class MyDemo 
  public static void main(String... args) 
    System.out.println("Hello KTable!");

    final Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<Long, Long> reportPermission = builder.stream(TOPIC);

    KTable<Long, ArrayList<Long>> result = reportPermission
        .groupByKey()
        .aggregate(
            new Initializer<ArrayList<Long>>() 
              @Override
              public ArrayList<Long> apply() 
                return null;
              
            ,
            new Aggregator<Long, Long, ArrayList<Long>>() 
              @Override
              public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) 
                aggregate.add(value);
                return aggregate;
              
            ,
            new Serde<ArrayList<Long>>() 
              @Override
              public void configure(Map<String, ?> configs, boolean isKey) 

              @Override
              public void close() 

              @Override
              public Serializer<ArrayList<Long>> serializer() 
                return null;
              

              @Override
              public Deserializer<ArrayList<Long>> deserializer() 
                return null;
              
            );

    result.to("report-aggregated-topic");

    KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  

  private static final String TOPIC = "report-permission";

  private static final Properties createStreamProperties() 
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");

    return props;
  

我实际上陷入了聚合阶段,因为我无法为 ArrayList&lt;Long&gt; 编写合适的 SerDe(还没有足够的技能),lambda 似乎不适用于聚合器 - 它不知道 agg 的类型是什么:

KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
    .groupByKey()
    .aggregate(
        () -> new ArrayList<Long>(),
        (key, val, agg) -> agg.add(val),
        longSerde
    );

【问题讨论】:

使用 Lambda 的聚合应该可以工作。看看这个例子:github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/… 对于 Serds:您当然可以挑剔地重用现有的 LongSerde。基本模式是首先序列化数组列表的大小,然后是每个单独的长值。在反序列化中你也这样做。首先,反序列化大小,然后知道预期有多少条目并单独反序列化所有条目。或者你从头开始写一个列表序列化器***.com/questions/23793885/… 【参考方案1】:

您可以使用 Kafka 的 Connect API 将数据从 SQL Server 获取到 Kafka。我不知道 SQL Server 的任何特定连接器,但您可以使用任何基于 JDBC 的通用连接器:https://www.confluent.io/product/connectors/

要处理数据,您可以使用 Kafka 的 Streams API。您可以简单地 aggregate() all 报告每个用户。像这样的:

KTable<UserId, List<Reports>> result =
    builder.stream("topic-name")
           .groupByKey()
           // init a new empty list and
           // `add` the items to the list in the actual aggregation
           .aggregate(...);

result.to("result-topic");

查看文档以了解有关 Streams API 的更多详细信息:https://docs.confluent.io/current/streams/index.html

请注意,您需要确保报告列表不会无限增长。 Kafka 有一些(可配置的)最大消息大小,整个列表将包含在单个消息中。因此,您应该在投入生产之前估计最大消息大小并应用相应的配置 (-> max.message.bytes)。在网页上查看配置:http://kafka.apache.org/documentation/#brokerconfigs

最后,您使用 Connect API 将数据推送到 Elastic Search。有多种不同的连接器可用(我当然会推荐 Confluent 一个)。有关 Connect API 的更多详细信息:https://docs.confluent.io/current/connect/userguide.html

【讨论】:

我一定会去看看的。明天将回复结果如何。 你能帮忙多一点吗?我只是因为缺乏 Java 技能而陷入困境。已更新我的问题。【参考方案2】:

在SQL和Kafka Streams中直接不允许这种方法,但是用例是可能的,可以实现如下:

1) 使用 SOLRJ API 在 SQL 服务器上编写自定义应用程序,每当在 SQL 中执行 DML(插入、更新、删除等)操作时,该应用程序将命中 Solr 实例。 https://wiki.apache.org/solr/Solrj

2) 使用 Solr SQL 数据导入处理程序,只要 SQL 中发生 DML(插入、更新、删除等)操作,SQL Server 就会自动通知 solr。 https://wiki.apache.org/solr/DataImportHandler

【讨论】:

以上是关于Kafka Streams 表转换的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams - 根据 Streams 数据发送不同的主题

Akka Stream Kafka vs Kafka Streams

Kafka Streams 开发单词计数应用

带有Spring Cloud Stream的Kafka Streams进程中的Serd错误

Kafka Streams入门指南

Kafka streams概览