Kafka——SpringBoot整合(生产者)

Posted 6。

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka——SpringBoot整合(生产者)相关的知识,希望对你有一定的参考价值。

生产者根据不同的业户需求和性能需求,选择合适的发送方式,可以达到最大效率。
具体选择哪一种发送方式,要考虑实际的业务场景。
简单来说:
1、保存用户点击情况、不重要的日志分析等大频率储存,但不担心丢失的情况下使用异步发送
2、订单、支付信息,信息量不一定很多,但是非常重要,就需要采取同步发送或者异步带回调发送,对发送结果进行必要处理。

pom

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>

生产者配置文件

这里也可以在application.xxx进行参数配置,不过使用java配置文件的方式使配置更加直观,建议使用java配置
具体的参数配置可以查看以前的博客文章kafka生产者参数配置。https://www.cnblogs.com/luckyhui28/p/12001798.html

application.properties

# 指定kafka 代理地址,可以多个
kafka.bootstrap-servers=192.168.32.11:9092

javaConfig

@Configuration
public class KafkaProducerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * 生产者配置
     * @return
     */
    private Map<String, Object> configs() {
        Map<String, Object> props = new HashMap<>();
        // 连接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 键的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 重试,0为不启用重试机制
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        // 控制批处理大小,单位为字节
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
        return props;
    }


    @Bean
    public ProducerFactory<String, String> producerFactory() {
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(configs());
        return factory;
    }
    @Primary
    @Bean("producertemplate")
    public KafkaTemplate<String, String> producertemplate() {
        KafkaTemplate template = new KafkaTemplate<String, String>(producerFactory());
        return template;
    }

}

事务性javaConfig配置

@Configuration
public class KafkaTransactionalProducerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;


    /**
     * 生产者配置
     * @return
     */
    private Map<String, Object> configs() {
        Map<String, Object> props = new HashMap<>();
        // 连接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 键的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 重试,0为不启用重试机制
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        // 控制批处理大小,单位为字节
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
        return props;
    }



    @Bean
    public ProducerFactory<String, String> transactionalProducerFactory() {
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(configs());
        // 开启事务
        factory.transactionCapable();
        // 用来生成Transactional.id的前缀
        factory.setTransactionIdPrefix("tran-");
        return factory;
    }


    /**
     * 事务管理器
     * @param transactionalProducerFactory
     * @return
     */
    @Bean
    public KafkaTransactionManager transactionManager(ProducerFactory transactionalProducerFactory) {
        KafkaTransactionManager manager = new KafkaTransactionManager(transactionalProducerFactory);
        return manager;
    }

    @Bean("transactionalTemplate")
    public KafkaTemplate<String, String> transactionalTemplate() {
        KafkaTemplate template = new KafkaTemplate<String, String>(transactionalProducerFactory());
        return template;
    }
}

Controller

@Controller
public class KafkaTestController {

	private static Logger logger = LoggerFactory.getLogger(KafkaTestController.class);

	@Autowired
	@Qualifier("producertemplate")
	private KafkaTemplate<String, String> producertemplate;

	@Autowired
	@Qualifier("transactionalTemplate")
	private KafkaTemplate<String, String> transactionalTemplate;

	private Gson gson = new GsonBuilder().create();
}

异步不回调发送方式

发送既忘,只管消息发送,不论结果。多使用不重要的业务信息

	@RequestMapping("/testSendMsg1")
	@ResponseBody
	public String testSendMsg1(){ //发送既忘方式发送消息,在发送消息后并不关心消息是否正确到达。这种方法方式的性能最高,可靠性最差。
		Message message = new Message();
		message.setId(1);
		message.setMsg("testSendMsg1");
		message.setSendTime(new Date());
		logger.info("发送消息(发送既忘) ----->>>>>  message = {}", gson.toJson(message));
		producertemplate.send("hello", gson.toJson(message));
		return "testSendMsg1";
	}

异步带回调信息

带有回调结果,根据发送结果进行后期处理

	@RequestMapping("/testSendMsg3")
	@ResponseBody
	public String testSendMsg3(){ //异步发送,异步发送一般是在sned()方法里指定一个CallBack的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。
		Message message = new Message();
		message.setId(1);
		message.setMsg("testSendMsg1");
		message.setSendTime(new Date());
		logger.info("发送消息(异步发送) ----->>>>>  message = {}", gson.toJson(message));
		producertemplate.send("hello", gson.toJson(message)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
			@Override
			public void onFailure(Throwable throwable) {
				logger.error("发送发生错误:" + throwable.getMessage());
			}

			@Override
			public void onSuccess(SendResult<String, String> stringStringSendResult) {
				logger.info("消息发送成功");
				//生产者发送的元数据信息
				ProducerRecord<String, String> producerRecord = stringStringSendResult.getProducerRecord();
				logger.info("生产者发送元信息 :" );
				String topic = producerRecord.topic();
				logger.info("主题 :" + topic );
				String value = producerRecord.value();
				logger.info("信息 :" + value );
				Integer partition = producerRecord.partition();
				logger.info("分区 :" + partition );
				String key = producerRecord.key();
				logger.info("键 :" + key );
				Long timestamp = producerRecord.timestamp();
				logger.info("时间戳 :" + timestamp );
				//返回结果元信息
				RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
				logger.info("生产者返回结果元信息 :" );
				String topic1 = recordMetadata.topic();
				logger.info("主题 :" + topic1 );
				int partition1 = recordMetadata.partition();
				logger.info("分区 :" + partition1 );
				long offset = recordMetadata.offset();
				logger.info("位移 :" + offset );
				boolean b = recordMetadata.hasOffset();
				logger.info("是否返回位移信息 :" + b );
				long timestamp1 = recordMetadata.timestamp();
				logger.info("时间戳 :" + timestamp1 );
				boolean b1 = recordMetadata.hasTimestamp();
				logger.info("是否返回时间戳 :" + b1 );
				int i = recordMetadata.serializedKeySize();
				logger.info("序列化后键大小 :" + i );
				int i1 = recordMetadata.serializedValueSize();
				logger.info("序列化后值大小 :" + i1 );
			}
		});
		return "testSendMsg3";
	}

同步发送

阻塞式,同步发送方式发送消息,利用返回的Fature对象实现。

	@RequestMapping("/testSendMsg2")
	@ResponseBody
	public String testSendMsg2(){ //同步发送方式发送消息,利用返回的Fature对象实现。。
		Message message = new Message();
		message.setId(1);
		message.setMsg("testSendMsg2");
		message.setSendTime(new Date());
		logger.info("发送消息(同步发送) ----->>>>>  message = {}", gson.toJson(message));

		SendResult<String, String> sendResult = null;
		try {
			sendResult = producertemplate.send("hello", gson.toJson(message)).get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}


		//生产者发送的元数据信息
		ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
		logger.info("生产者发送元信息 :" );
		String topic = producerRecord.topic();
		logger.info("主题 :" + topic );
		String value = producerRecord.value();
		logger.info("信息 :" + value );
		Integer partition = producerRecord.partition();
		logger.info("分区 :" + partition );
		String key = producerRecord.key();
		logger.info("键 :" + key );
		Long timestamp = producerRecord.timestamp();
		logger.info("时间戳 :" + timestamp );



		//返回结果元信息
		RecordMetadata recordMetadata = sendResult.getRecordMetadata();
		logger.info("生产者返回结果元信息 :" );
		String topic1 = recordMetadata.topic();
		logger.info("主题 :" + topic1 );
		int partition1 = recordMetadata.partition();
		logger.info("分区 :" + partition1 );
		long offset = recordMetadata.offset();
		logger.info("位移 :" + offset );
		boolean b = recordMetadata.hasOffset();
		logger.info("是否返回位移信息 :" + b );
		long timestamp1 = recordMetadata.timestamp();
		logger.info("时间戳 :" + timestamp1 );
		boolean b1 = recordMetadata.hasTimestamp();
		logger.info("是否返回时间戳 :" + b1 );
		int i = recordMetadata.serializedKeySize();
		logger.info("序列化后键大小 :" + i );
		int i1 = recordMetadata.serializedValueSize();
		logger.info("序列化后值大小 :" + i1 );


		return "testSendMsg2";
	}

事务式发送

多用于发送多条信息,若有异常全部失败

	@RequestMapping("/testSendMsg4")
	@ResponseBody
	@Transactional
	public String testSendMsg5(){ //事务发送
		Message message = new Message();
		message.setId(1);
		message.setMsg("testSendMsg4");
		message.setSendTime(new Date());
		logger.info("发送消息(事务发送) ----->>>>>  message = {}", gson.toJson(message));
		
		transactionalTemplate.send("hello", gson.toJson(message));

		int i = 1/0;


		return "testSendMsg4";
	}

以上是关于Kafka——SpringBoot整合(生产者)的主要内容,如果未能解决你的问题,请参考以下文章

kafka整合springboot以及核心参数的使用

SpringBoot整合Kafka下

四.Kafka入门到精通-SpringBoot整合Kafka(Producer拦截器&Producer监听器)

四.Kafka入门到精通-SpringBoot整合Kafka(Producer拦截器&Producer监听器)

springboot整合kafka

三.Kafka入门到精通-SpringBoot整合Kafka(上)