Spring Boot整合Pulsar生产和消费消息 简单示例代码

Posted 抓手

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot整合Pulsar生产和消费消息 简单示例代码相关的知识,希望对你有一定的参考价值。

引入pulsar maven依赖

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.10.0</version>
        </dependency>

生产者

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author 向振华
 * @date 2022/10/11 10:05
 */
@Slf4j
@Component
public class PulsarProducer 

    // Pulsar服务broker地址
    private String url = "pulsar://192.168.18.3:6650";

    // topic
    private String topic = "xzhtest";

    // 生产者
    Producer<byte[]> producer = null;

    @PostConstruct
    public void initPulsarProducer() throws PulsarClientException 
        // 构造Pulsar client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(url)
                .build();

        // 创建生产者producer
        producer = client.newProducer()
                .topic(topic)
                .create();
    

    /**
     * 发送消息
     *
     * @param message
     */
    public void send(String message) 
        try 
            MessageId messageId = producer.send(message.getBytes());
            log.info("发送消息 messageId " + messageId);
         catch (Exception e) 
            log.info("发送消息异常 ", e);
        
    

消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author 向振华
 * @date 2022/10/11 10:14
 */
@Slf4j
@Component
public class PulsarConsumer 

    // Pulsar服务broker地址
    private String url = "pulsar://192.168.18.3:6650";

    // topic
    private String topic = "xzhtest";

    // 订阅名
    private String subscription = "xzhtestgroup";

    // 消费者
    private Consumer<?> consumer = null;

    @PostConstruct
    public void initPulsarConsumer() throws PulsarClientException 
        // 构造Pulsar client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(url)
                .build();

        // 创建消费者consumer
        consumer = client.newConsumer()
                .topic(topic)
                .subscriptionName(subscription)
                // 声明消费模式为Exclusive(独占)模式,只允许一个Consumer加入Subscription
                .subscriptionType(SubscriptionType.Exclusive)
                // 配置从最早开始消费,否则可能会消费不到历史消息
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        // 开始监听消息
        new Thread(this::listener).start();
    

    /**
     * 监听消息
     */
    private void listener() 
        try 
            while (true) 
                Message<?> message = consumer.receive();
                byte[] bytes = message.getData();
                String messageString = new String(bytes);

                log.info("接收到消息 " + messageString);

                // 消费成功
                consumer.acknowledge(message);
            
         catch (PulsarClientException e) 
            log.error("Pulsar消费异常:", e);
        
    

发送消息

    @Resource
    private PulsarProducer pulsarProducer;

    public void test() 
        pulsarProducer.send("测试消息");
    

结果

2022-10-11 15:00:27.883  INFO 13316 --- [nio-8596-exec-9] c.b.bop.oms.controller.PulsarProducer    : 发送消息 messageId 2156:0:0:0
2022-10-11 15:00:27.884  INFO 13316 --- [      Thread-20] c.b.bop.oms.controller.PulsarConsumer    : 接收到消息 测试消息

注意:

生产者和消费者的创建都是采用的基本参数和默认值,更多参数的设置参考:

org.apache.pulsar.client.api.ProducerBuilder

org.apache.pulsar.client.api.ConsumerBuilder

以上是关于Spring Boot整合Pulsar生产和消费消息 简单示例代码的主要内容,如果未能解决你的问题,请参考以下文章

spring-boot-route(十三)整合RabbitMQ

Spring Boot (十三): Spring Boot 整合 RabbitMQ

Spring Boot之RabbitMQ

详解Spring Boot中的RabbitMQ

Spring Boot 整合 RabbitMQ

Spring Boot整合RabbitMQ