通过 Observable(RxJava) 使用 Kafka

Posted

技术标签:

【中文标题】通过 Observable(RxJava) 使用 Kafka【英文标题】:Using Kafka through Observable(RxJava) 【发布时间】:2017-10-11 23:36:32 【问题描述】:

我有一个生产者(使用 Kafka)和多个消费者。所以我在主题中发布一条消息,然后我的消费者接收并处理该消息。

我需要在生产者中至少收到一个消费者的响应(如果是第一个消费者会更好)。我正在尝试使用 RxJava 来做到这一点(可观察)。

这样可以吗?谁有例子?

【问题讨论】:

【参考方案1】:

您最好先分享您的解决方案...

由于 Spring Cloud Stream 是一个 mh 的流解决方案,而不是请求/回复,因此没有示例与您分享。

您也可以考虑让您的消费者成为生产者。并且在原始生产者中有一个消费者从回复主题中读取。最后,您必须将回复数据与请求数据相关联。

与 RxJava 或任何其他实现细节无关。

【讨论】:

【参考方案2】:

您可以按如下方式使用它:

val consumer = new RxConsumer("zookeeper:2181", "consumer-group")

consumer.getRecordStream("cool-topic-(x|y|z)")
  .map(deserialize)
  .take(42 seconds)
  .foreach(println)

  consumer.shutdown()

有关详细信息,请参阅: https://github.com/cjdev/kafka-rx

【讨论】:

kafka-rx 还活着吗?我正在考虑使用它,但它自 2015 年 10 月以来就没有提交过。此外,我从未使用过 EPL v.1 许可证。不确定我的项目赞助商是否可以接受。 看来 kafka-rx 正在使用旧的 kafka 和 Java 8 版本。这个解决方案不应该是公认的答案。理想的答案应该解决如何单独使用 rxjava IMO。【参考方案3】:

以下是我如何使用 rxjava '2.2.6' 来处理 Kafka 事件,而无需任何额外的依赖:

import io.reactivex.Observable;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

...

// Load consumer props 
Properties props = new Properties();  
props.load(KafkaUtils.class.getClassLoader().getResourceAsStream("kafka-client.properties")); 

// Create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to topics
consumer.subscribe(Arrays.asList(props.getProperty("kafkaTopics").split("\\s*,\\s*")));

// Create an Observable for topic events
Observable<ConsumerRecords<String, String>> observable = Observable.fromCallable(() -> 
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(10);
    return records;
);

// Process Observable events
observable.subscribe(records -> 
    if ((records != null) && (!records.isEmpty())) 
        for (ConsumerRecord<String, String> record : records) 
            System.out.println(record.offset() + ": " + record.value());
        
    
);

【讨论】:

以上是关于通过 Observable(RxJava) 使用 Kafka的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava 流程分析

RxJava使用入门

RxJava

RxJava- 操作符之组合Observable

RxJava- 操作符之转换Observable

Android RxJava使用介绍 RxJava的操作符