带有 Spring Boot 的 Kafka 流

Posted

技术标签:

【中文标题】带有 Spring Boot 的 Kafka 流【英文标题】:Kafka Streams with Spring Boot 【发布时间】:2019-01-14 21:55:07 【问题描述】:

我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理。所以我需要 Kafka Streams 配置,或者我想使用 KStreams 或 KTable,但我在互联网上找不到示例。

我做了生产者和消费者,现在我想实时流式传输。

【问题讨论】:

【参考方案1】:

首先让我说,如果您是 Kafka 流的新手,在其之上添加 spring-boot 会增加另一个级别的复杂性,并且 Kafka 流本身具有很大的学习曲线。以下是帮助您前进的基础知识: pom:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>$spring.version</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>$kafka.version</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>$kafka.version</version>
</dependency>

现在是配置对象。下面的代码假设您正在创建两个流应用程序,请记住每个应用程序都代表自己的处理拓扑:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaStreamConfig 

  @Value("$delivery-stats.stream.threads:1")
  private int threads;

  @Value("$delivery-stats.kafka.replication-factor:1")
  private int replicationFactor;

  @Value("$messaging.kafka-dp.brokers.url:localhost:9092")
  private String brokersUrl;


  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public StreamsConfig kStreamsConfigs() 
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
    setDefaults(config);
    return new StreamsConfig(config);
  


  public void setDefaults(Map<String, Object> config) 
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
  

  @Bean("app1StreamBuilder")
  public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() 
    Map<String, Object> config = new HashMap<>();
    setDefaults(config);
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
    config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
    config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    return new StreamsBuilderFactoryBean(config);
  

  @Bean("app2StreamBuilder")
  public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() 
    Map<String, Object> config = new HashMap<>();
    setDefaults(config);
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
    config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
    config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    return new StreamsBuilderFactoryBean(config);
  

现在有趣的部分来了,使用 streamsBuilder 构建您的应用程序(本例中为 app1)。

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class App1 
  @SuppressWarnings("unchecked")
  @Bean("app1StreamTopology")
  public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) 

    final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
    toSquare.map((key, value) ->  // do something with each msg, square the values in our case
      return KeyValue.pair(key, value * value);
    ).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic

    return toSquare;
  

希望这会有所帮助。

Kafka 命令创建主题并将数据发送到主题

创建主题:

kafka-topics.bat --zookeeper localhost:2181 --create --topic toSquare --replication-factor 1 --partitions 1

向主题发送数据:

kafka-console-producer --broker-list localhost:9092 --topic testStreamsIn --property parse.key=true --property key.separator=,
test,12345678

【讨论】:

我能问一下为什么你需要返回应用程序中没有使用的 KStream 吗?为什么你不能在一些后期构造注释中使用它? 您定义了 bean app1StreamTopology,但是当应用程序启动时这个 bean 是如何被挂钩的。我看不到它在任何地方被注入,所以 Spring Kafka 是否收集了所有 KStream 类型的 bean,然后应用流式登录?【参考方案2】:

在 Spring Boot 上开始使用 Kafka Streams 的简单方法:

    使用https://start.spring.io 引导您的项目。选择 Cloud StreamSpring for Apache Kafka Streams 作为依赖项。这是预配置项目模板的链接:https://start.spring.io/#!language=java&dependencies=kafka-streams,cloud-stream

    在您的应用程序中定义 KStream bean。例如,这是一个非常基本的消费者应用程序。它只是将 KStream 中的数据和日志记录消费到标准输出中。

    @SpringBootApplication
    public class Application 
    
        public static void main(String[] args) 
            SpringApplication.run(Main.class, args);
        
    
        @Bean
        public java.util.function.Consumer<KStream<String, String>> process() 
            return stream -> stream.foreach((key, value) -> 
                System.out.println(key + ":" + value);
            );
        
    
    

    在这个应用程序中,我们定义了一个单一的输入绑定。 Spring 将使用名称 process-in-0 创建此绑定,即 bean 函数的名称后跟 -in-,然后是参数的序号位置。您可以使用此绑定名称来设置其他属性,例如主题名称。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic

    查看更多示例 here - Spring Cloud Stream Kafka Binder 参考,编程模型部分。

    配置application.yaml如下:

    spring:
      cloud:
        stream:
          bindings:
            process-in-0.destination: my-topic
          kafka:
            streams:
              binder:
                applicationId: my-app
                brokers: localhost:9092
                configuration:
                  default:
                    key:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                    value:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    

【讨论】:

process-inname of the method + input 因此它变成process-in 同样,一旦你完成处理转储数据到输出类将是process-out【参考方案3】:

您可以使用从头开始创建新的 Spring Boot 项目 https://start.spring.io/ 相应地选择必要的版本/依赖项并生成/下载项目。

您可以开始实施 kstream api 方法 (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KStream.html)

【讨论】:

以上是关于带有 Spring Boot 的 Kafka 流的主要内容,如果未能解决你的问题,请参考以下文章

带有spring boot和kafka的docker内部信任库的问题路径

带有 Kafka(和 Spring Boot)的分布式系统中的 Graphql 订阅

Kafka 入门和 Spring Boot 集成

Kafka 入门和 Spring Boot 集成

Kafka 生产者 TimeoutException: Expiring 1 record(s)

带有Spring Cloud Stream的Kafka Streams进程中的Serd错误