Kafka Streams - 根据 Streams 数据发送不同的主题
Posted
技术标签:
【中文标题】Kafka Streams - 根据 Streams 数据发送不同的主题【英文标题】:Kafka Streams - Send on different topics depending on Streams Data 【发布时间】:2018-08-03 16:06:06 【问题描述】:我有一个 kafka 流应用程序等待在主题 user_activity
上发布记录。它将接收 json 数据,并根据我想将该流推送到不同主题的键的值。
这是我的流应用代码:
KStream<String, String> source_user_activity = builder.stream("user_activity");
source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>()
@Override
public Iterable<String> apply(String value)
System.out.println("value: " + value);
ArrayList<String> keywords = new ArrayList<String>();
try
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString());
send.put("activity_time", received.get("CreationTime"));
send.put("user_id", received.get("UserId"));
send.put("operation_type", received.get("Operation"));
send.put("app_name", received.get("Workload"));
keywords.add(send.toString());
// apply regex to value and for each match add it to keywords
catch (Exception e)
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
return keywords;
).to("user_activity_by_date");
在这段代码中,我想检查操作类型,然后根据我想将流推送到相关主题。
我怎样才能做到这一点?
编辑:
我已将我的代码更新为:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
KStream<String, String>[] branches = source_o365_user_activity.branch(
(key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),
(key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")),
(key, value) -> true
);
branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");
【问题讨论】:
你更新的代码有效吗? @panoet 是的 谢谢。它节省了我的时间。 【参考方案1】:您可以使用branch
方法来拆分您的流。此方法采用谓词将源流拆分为多个流。
以下代码取自kafka-streams-examples:
KStream<String, OrderValue>[] forks = ordersWithTotals.branch(
(id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT,
(id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);
forks[0].mapValues(
orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, FAIL))
.to(ORDER_VALIDATIONS.name(), Produced
.with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));
forks[1].mapValues(
orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, PASS))
.to(ORDER_VALIDATIONS.name(), Produced
.with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));
【讨论】:
什么是(id, orderValue
)?我正在查看 Kafka Streams 文档及其类似 (key, value) -> predicate()
的内容。但是我在值中有一个 Json 对象,然后在那个 Json 对象中我有多个键和值。那么我该如何根据它进行分支呢?
这只是一个例子。看看这个:kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/… 好的,我明白了。谓词的数量是静态的,因此我猜您可以将源流拆分为一些预定义数量的子流。如果您需要一些动态拆分,恐怕您必须重新设计逻辑。关于您的问题:您可以定义谓词列表,例如 _hasField_(...) 或其他东西。每个谓词都会检查json中的字段是否。
假设我要将所有记录发送到第三个主题。分支允许吗?
@EL323 是的,您只需为每条记录创建一个评估为 true 的谓词。
就像我在编辑中所做的那样?但它没有得到其他日志。它只获取前两个谓词评估为假的日志。【参考方案2】:
原始的KStream.branch
方法不方便,因为混合了数组和泛型,并且因为它强制使用“幻数”从结果中提取正确的分支(参见例如KAFKA-5488 问题)。从spring-kafka 2.2.4 开始,KafkaStreamBrancher 类可用。有了它,更方便的分支是可能的:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks->ks.to("A"))
.branch((key, value) -> value.contains("B"), ks->ks.to("B"))
.defaultBranch(ks->ks.to("C"))
.onTopOf(builder.stream("source"))
//onTopOf returns the provided stream so we can continue with method chaining
//and do something more with the original stream
还有KIP-418,所以在后续版本中,Kafka 本身的分支也有可能得到改进。
【讨论】:
【参考方案3】:另一种可能性是使用 TopicNameExtractor 动态路由事件:
https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing
不过,您需要提前创建主题,
val outputTopic: TopicNameExtractor[String, String] = (_, value: String, _) => defineOutputTopic(value)
builder
.stream[String, String](inputTopic)
.to(outputTopic)
和 defineOutputTopic 可以返回给定值(或键或记录上下文)的一组已定义主题之一。 PD:对不起 scala 代码,在链接中有一个 Java 示例。
【讨论】:
以上是关于Kafka Streams - 根据 Streams 数据发送不同的主题的主要内容,如果未能解决你的问题,请参考以下文章
将Kafka Streams代码迁移到Spring Cloud Stream吗?
Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:“侦听器”不能为空
Kafka Streams - 根据 Streams 数据发送不同的主题