生产者根据不同的业户需求和性能需求,选择合适的发送方式,可以达到最大效率。
具体选择哪一种发送方式,要考虑实际的业务场景。
简单来说:
1、保存用户点击情况、不重要的日志分析等大频率储存,但不担心丢失的情况下使用异步发送
2、订单、支付信息,信息量不一定很多,但是非常重要,就需要采取同步发送或者异步带回调发送,对发送结果进行必要处理。
pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
生产者配置文件
这里也可以在application.xxx进行参数配置,不过使用java配置文件的方式使配置更加直观,建议使用java配置
具体的参数配置可以查看以前的博客文章kafka生产者参数配置。https://www.cnblogs.com/luckyhui28/p/12001798.html
application.properties
# 指定kafka 代理地址,可以多个
kafka.bootstrap-servers=192.168.32.11:9092
javaConfig
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 生产者配置
* @return
*/
private Map<String, Object> configs() {
Map<String, Object> props = new HashMap<>();
// 连接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 重试,0为不启用重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 1);
// 控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(configs());
return factory;
}
@Primary
@Bean("producertemplate")
public KafkaTemplate<String, String> producertemplate() {
KafkaTemplate template = new KafkaTemplate<String, String>(producerFactory());
return template;
}
}
事务性javaConfig配置
@Configuration
public class KafkaTransactionalProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 生产者配置
* @return
*/
private Map<String, Object> configs() {
Map<String, Object> props = new HashMap<>();
// 连接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 重试,0为不启用重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 1);
// 控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
return props;
}
@Bean
public ProducerFactory<String, String> transactionalProducerFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(configs());
// 开启事务
factory.transactionCapable();
// 用来生成Transactional.id的前缀
factory.setTransactionIdPrefix("tran-");
return factory;
}
/**
* 事务管理器
* @param transactionalProducerFactory
* @return
*/
@Bean
public KafkaTransactionManager transactionManager(ProducerFactory transactionalProducerFactory) {
KafkaTransactionManager manager = new KafkaTransactionManager(transactionalProducerFactory);
return manager;
}
@Bean("transactionalTemplate")
public KafkaTemplate<String, String> transactionalTemplate() {
KafkaTemplate template = new KafkaTemplate<String, String>(transactionalProducerFactory());
return template;
}
}
Controller
@Controller
public class KafkaTestController {
private static Logger logger = LoggerFactory.getLogger(KafkaTestController.class);
@Autowired
@Qualifier("producertemplate")
private KafkaTemplate<String, String> producertemplate;
@Autowired
@Qualifier("transactionalTemplate")
private KafkaTemplate<String, String> transactionalTemplate;
private Gson gson = new GsonBuilder().create();
}
异步不回调发送方式
发送既忘,只管消息发送,不论结果。多使用不重要的业务信息
@RequestMapping("/testSendMsg1")
@ResponseBody
public String testSendMsg1(){ //发送既忘方式发送消息,在发送消息后并不关心消息是否正确到达。这种方法方式的性能最高,可靠性最差。
Message message = new Message();
message.setId(1);
message.setMsg("testSendMsg1");
message.setSendTime(new Date());
logger.info("发送消息(发送既忘) ----->>>>> message = {}", gson.toJson(message));
producertemplate.send("hello", gson.toJson(message));
return "testSendMsg1";
}
异步带回调信息
带有回调结果,根据发送结果进行后期处理
@RequestMapping("/testSendMsg3")
@ResponseBody
public String testSendMsg3(){ //异步发送,异步发送一般是在sned()方法里指定一个CallBack的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。
Message message = new Message();
message.setId(1);
message.setMsg("testSendMsg1");
message.setSendTime(new Date());
logger.info("发送消息(异步发送) ----->>>>> message = {}", gson.toJson(message));
producertemplate.send("hello", gson.toJson(message)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("发送发生错误:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
logger.info("消息发送成功");
//生产者发送的元数据信息
ProducerRecord<String, String> producerRecord = stringStringSendResult.getProducerRecord();
logger.info("生产者发送元信息 :" );
String topic = producerRecord.topic();
logger.info("主题 :" + topic );
String value = producerRecord.value();
logger.info("信息 :" + value );
Integer partition = producerRecord.partition();
logger.info("分区 :" + partition );
String key = producerRecord.key();
logger.info("键 :" + key );
Long timestamp = producerRecord.timestamp();
logger.info("时间戳 :" + timestamp );
//返回结果元信息
RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
logger.info("生产者返回结果元信息 :" );
String topic1 = recordMetadata.topic();
logger.info("主题 :" + topic1 );
int partition1 = recordMetadata.partition();
logger.info("分区 :" + partition1 );
long offset = recordMetadata.offset();
logger.info("位移 :" + offset );
boolean b = recordMetadata.hasOffset();
logger.info("是否返回位移信息 :" + b );
long timestamp1 = recordMetadata.timestamp();
logger.info("时间戳 :" + timestamp1 );
boolean b1 = recordMetadata.hasTimestamp();
logger.info("是否返回时间戳 :" + b1 );
int i = recordMetadata.serializedKeySize();
logger.info("序列化后键大小 :" + i );
int i1 = recordMetadata.serializedValueSize();
logger.info("序列化后值大小 :" + i1 );
}
});
return "testSendMsg3";
}
同步发送
阻塞式,同步发送方式发送消息,利用返回的Fature对象实现。
@RequestMapping("/testSendMsg2")
@ResponseBody
public String testSendMsg2(){ //同步发送方式发送消息,利用返回的Fature对象实现。。
Message message = new Message();
message.setId(1);
message.setMsg("testSendMsg2");
message.setSendTime(new Date());
logger.info("发送消息(同步发送) ----->>>>> message = {}", gson.toJson(message));
SendResult<String, String> sendResult = null;
try {
sendResult = producertemplate.send("hello", gson.toJson(message)).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//生产者发送的元数据信息
ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
logger.info("生产者发送元信息 :" );
String topic = producerRecord.topic();
logger.info("主题 :" + topic );
String value = producerRecord.value();
logger.info("信息 :" + value );
Integer partition = producerRecord.partition();
logger.info("分区 :" + partition );
String key = producerRecord.key();
logger.info("键 :" + key );
Long timestamp = producerRecord.timestamp();
logger.info("时间戳 :" + timestamp );
//返回结果元信息
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
logger.info("生产者返回结果元信息 :" );
String topic1 = recordMetadata.topic();
logger.info("主题 :" + topic1 );
int partition1 = recordMetadata.partition();
logger.info("分区 :" + partition1 );
long offset = recordMetadata.offset();
logger.info("位移 :" + offset );
boolean b = recordMetadata.hasOffset();
logger.info("是否返回位移信息 :" + b );
long timestamp1 = recordMetadata.timestamp();
logger.info("时间戳 :" + timestamp1 );
boolean b1 = recordMetadata.hasTimestamp();
logger.info("是否返回时间戳 :" + b1 );
int i = recordMetadata.serializedKeySize();
logger.info("序列化后键大小 :" + i );
int i1 = recordMetadata.serializedValueSize();
logger.info("序列化后值大小 :" + i1 );
return "testSendMsg2";
}
事务式发送
多用于发送多条信息,若有异常全部失败
@RequestMapping("/testSendMsg4")
@ResponseBody
@Transactional
public String testSendMsg5(){ //事务发送
Message message = new Message();
message.setId(1);
message.setMsg("testSendMsg4");
message.setSendTime(new Date());
logger.info("发送消息(事务发送) ----->>>>> message = {}", gson.toJson(message));
transactionalTemplate.send("hello", gson.toJson(message));
int i = 1/0;
return "testSendMsg4";
}