将Kafka Streams代码迁移到Spring Cloud Stream吗?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将Kafka Streams代码迁移到Spring Cloud Stream吗?相关的知识,希望对你有一定的参考价值。

Spring云流将支持下面的Kafka Streams应用程序。以下是Kafka示例应用程序摘录中的代码。任何反馈或支持,表示赞赏。

        ...
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        KStream<String, Purchase> purchaseKStream = streamsBuilder.stream.....
        KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
        patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
        patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
        purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
        purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));

        // adding State to processor
        String rewardsStateStoreName = "rewardsPointsStore";
        RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
        KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
        StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
        streamsBuilder.addStateStore(storeBuilder);
        KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
        KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(()
        statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
        statefulRewardAccumulator.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), getProperties());
答案

假设您使用的是Spring Cloud Stream Horsham (3.0.0)版本,以下伪代码应该可以使用。我没有测试此代码,但是它应该与目标等的正确配置一起使用。请查看docs

@SpringBootApplicaiton
public class SampleApp 

  public static void main(String[] args) 
   SpringApplication.run(SampleApp.class, args);
  

   @Bean
   public Function<KStream<String, Purchase>, KStream<String, RewardAccumulator>> process() 
     return purchaseKStream -> 
           purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
           KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
           patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
           patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
                purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));

           KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
           KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(()
                statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));

           return statefulRewardAccumulator;
               
   

   @Bean
   public StoreBuilder storeBuilder() 
     String rewardsStateStoreName = "rewardsPointsStore";
     RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
     KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
     StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
     return storeBuilder;
   


以上是关于将Kafka Streams代码迁移到Spring Cloud Stream吗?的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:“侦听器”不能为空

带有Spring Cloud Stream的Kafka Streams进程中的Serd错误

Spring Cloud Streams 没有在消息中设置 kafka 键?

Kafka Streams“Consumed.with()”与KafkaAvroDeserializer

如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?

Kafka Streams - 根据 Streams 数据发送不同的主题