/**
* This method or function return a new Kafka producer. This maybe
* called once in an application. For example, in a spring project
* this may produced using @Bean or in a java ee application using
* the @Produce or @ApplicationScoped
*/
public KafkaProducer getProducer(KafkaConfiguration kafkaConfiguration) {
return new KafkaProducer(kafkaConfiguration.asMap());
}
/**
* Kafka Producer configuration. Shows the way and
* the parameters. In this case the fields are bind
* with a application.properties file, but may bind
* also with other ways.
*/
@Configuration
@ConfigurationProperties("custom.kafka")
public static class KafkaConfiguration {
private String bootstrapServers;
private String keySerializerClass;
private String valueSerializerClass;
private int retries;
Map<String, ? extends Serializable> asMap() {
return HashMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass,
ProducerConfig.RETRIES_CONFIG, retries
).toJavaMap();
}
}
/**
* This shows the way of using tha kafka producer. In this
* example a text message, "text message", is sent to the
* a topic, "testTopic". The producer sends the message using
* the fire-and-forget method.
*/
@Autowired
private KafkaProducer kafkaProducer;
...
Try.of(() -> kafkaProducer.send(new ProducerRecord("testTopic", "text message")))
.onSuccess(future -> log.info("Message sent"))
.onFailure(Throwable::printStackTrace);