Spring Boot整合Pulsar生产和消费消息 简单示例代码
Posted 抓手
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot整合Pulsar生产和消费消息 简单示例代码相关的知识,希望对你有一定的参考价值。
引入pulsar maven依赖
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.0</version>
</dependency>
生产者
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author 向振华
* @date 2022/10/11 10:05
*/
@Slf4j
@Component
public class PulsarProducer
// Pulsar服务broker地址
private String url = "pulsar://192.168.18.3:6650";
// topic
private String topic = "xzhtest";
// 生产者
Producer<byte[]> producer = null;
@PostConstruct
public void initPulsarProducer() throws PulsarClientException
// 构造Pulsar client
PulsarClient client = PulsarClient.builder()
.serviceUrl(url)
.build();
// 创建生产者producer
producer = client.newProducer()
.topic(topic)
.create();
/**
* 发送消息
*
* @param message
*/
public void send(String message)
try
MessageId messageId = producer.send(message.getBytes());
log.info("发送消息 messageId " + messageId);
catch (Exception e)
log.info("发送消息异常 ", e);
消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author 向振华
* @date 2022/10/11 10:14
*/
@Slf4j
@Component
public class PulsarConsumer
// Pulsar服务broker地址
private String url = "pulsar://192.168.18.3:6650";
// topic
private String topic = "xzhtest";
// 订阅名
private String subscription = "xzhtestgroup";
// 消费者
private Consumer<?> consumer = null;
@PostConstruct
public void initPulsarConsumer() throws PulsarClientException
// 构造Pulsar client
PulsarClient client = PulsarClient.builder()
.serviceUrl(url)
.build();
// 创建消费者consumer
consumer = client.newConsumer()
.topic(topic)
.subscriptionName(subscription)
// 声明消费模式为Exclusive(独占)模式,只允许一个Consumer加入Subscription
.subscriptionType(SubscriptionType.Exclusive)
// 配置从最早开始消费,否则可能会消费不到历史消息
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
// 开始监听消息
new Thread(this::listener).start();
/**
* 监听消息
*/
private void listener()
try
while (true)
Message<?> message = consumer.receive();
byte[] bytes = message.getData();
String messageString = new String(bytes);
log.info("接收到消息 " + messageString);
// 消费成功
consumer.acknowledge(message);
catch (PulsarClientException e)
log.error("Pulsar消费异常:", e);
发送消息
@Resource
private PulsarProducer pulsarProducer;
public void test()
pulsarProducer.send("测试消息");
结果
2022-10-11 15:00:27.883 INFO 13316 --- [nio-8596-exec-9] c.b.bop.oms.controller.PulsarProducer : 发送消息 messageId 2156:0:0:0
2022-10-11 15:00:27.884 INFO 13316 --- [ Thread-20] c.b.bop.oms.controller.PulsarConsumer : 接收到消息 测试消息
注意:
生产者和消费者的创建都是采用的基本参数和默认值,更多参数的设置参考:
org.apache.pulsar.client.api.ProducerBuilder
org.apache.pulsar.client.api.ConsumerBuilder
以上是关于Spring Boot整合Pulsar生产和消费消息 简单示例代码的主要内容,如果未能解决你的问题,请参考以下文章
spring-boot-route(十三)整合RabbitMQ