06.Apache Pulsar的JAVA API相关使用操作,基于Pulsar实现Topic的构建操作,使用JAVA如何管理租户/namespace/Topic,基于Pulsar实现数据生产/消费

Posted 涂作权的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了06.Apache Pulsar的JAVA API相关使用操作,基于Pulsar实现Topic的构建操作,使用JAVA如何管理租户/namespace/Topic,基于Pulsar实现数据生产/消费相关的知识,希望对你有一定的参考价值。

1.6.Apache Pulsar的JAVA API相关使用操作
1.6.1.基于Pulsar实现Topic的构建操作_准备工作
1.6.2.基于Pulsar实现Topic的构建操作
1-使用JAVA如何管理租户
2-使用Java如何管理namespace
3-使用JAVA如何管理Topic

1.6.3.基于Pulsar实现数据生产
1.6.4.基于Pulsar实现数据消费

1.6.Apache Pulsar的JAVA API相关使用操作

1.6.1.基于Pulsar实现Topic的构建操作_准备工作

首先,需要我们创建一个maven项目,并加入Pulsar相关的依赖

<!--代码库-->
<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
        </snapshots>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-all</artifactId>
        <version>2.8.1</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <target>1.8</target>
                <source>1.8</source>
            </configuration>
        </plugin>
    </plugins>
</build>

1.6.2.基于Pulsar实现Topic的构建操作

  • 1-使用JAVA如何管理租户
package com.toto.learn.admin;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.TenantInfo;

import java.util.HashSet;
import java.util.List;

/**
 * @author tuzuoquan
 * @version 1.0
 * @ClassName CreateTenants
 * @description TODO
 * @date 2022/8/22 23:28
 **/
public class CreateTenants 

    public static void main(String[] args) throws PulsarClientException, PulsarAdminException 
        //1.创建Pulsar的Admin管理对象
        String serviceHttpUrl = "http://node1:8080,node2:8080,node3:8080";
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();

        //2.基于Pulsar的Admin对象进行相关的操作

        //2.1:如何创建 租户操作
        HashSet<String> allowedClusters = new HashSet<>();
        allowedClusters.add("pulsar-cluster");
        TenantInfo config = TenantInfo.builder().allowedClusters(allowedClusters).build();
        pulsarAdmin.tenants().createTenant("toto_pulsar_t",config);

        //2.2:查看当前有哪些租户
        List<String> tenants = pulsarAdmin.tenants().getTenants();
        for(String tenant : tenants) 
            System.out.println("租户信息为:" + tenant);
        

        //2.3:删除租户操作
        pulsarAdmin.tenants().deleteTenant("toto_pulsar_t");

        System.out.println("=============================");
        List<String> tenantsTwo = pulsarAdmin.tenants().getTenants();
        for(String tenant : tenantsTwo) 
            System.out.println("租户信息为:" + tenant);
        

        pulsarAdmin.close();
    


输出结果:

"C:\\Program Files\\Java\\jdk1.8.0_111\\bin\\java.exe" -javaagent:D:\\installed\\ideaIU-2018.1.5.win-scala\\lib\\idea_rt.jar=11735:D:\\installed\\ideaIU-2018.1.5.win-scala\\bin -Dfile.encoding=UTF-8 -classpath "C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\charsets.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\deploy.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\access-bridge-64.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\cldrdata.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\dnsns.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\jaccess.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\jfxrt.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\localedata.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\nashorn.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunec.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunjce_provider.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunmscapi.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunpkcs11.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\zipfs.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\javaws.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jce.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jfr.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jfxswt.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jsse.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\management-agent.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\plugin.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\resources.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\rt.jar;E:\\workspace\\pulsarlearn\\pulsar-base\\target\\classes;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-all\\2.8.1\\pulsar-client-all-2.8.1.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-api\\2.8.1\\pulsar-client-api-2.8.1.jar;E:\\maven_repo\\org\\apache\\pulsar\\bouncy-castle-bc\\2.8.1\\bouncy-castle-bc-2.8.1-pkg.jar;E:\\maven_repo\\org\\bouncycastle\\bcpkix-jdk15on\\1.69\\bcpkix-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcprov-jdk15on\\1.69\\bcprov-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcutil-jdk15on\\1.69\\bcutil-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcprov-ext-jdk15on\\1.69\\bcprov-ext-jdk15on-1.69.jar;E:\\maven_repo\\org\\slf4j\\slf4j-api\\1.7.25\\slf4j-api-1.7.25.jar;E:\\maven_repo\\javax\\validation\\validation-api\\1.1.0.Final\\validation-api-1.1.0.Final.jar;E:\\maven_repo\\net\\jcip\\jcip-annotations\\1.0\\jcip-annotations-1.0.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-admin-api\\2.8.1\\pulsar-client-admin-api-2.8.1.jar;E:\\maven_repo\\jakarta\\ws\\rs\\jakarta.ws.rs-api\\2.1.6\\jakarta.ws.rs-api-2.1.6.jar;E:\\maven_repo\\jakarta\\xml\\bind\\jakarta.xml.bind-api\\2.3.3\\jakarta.xml.bind-api-2.3.3.jar;E:\\maven_repo\\jakarta\\activation\\jakarta.activation-api\\1.2.2\\jakarta.activation-api-1.2.2.jar;E:\\maven_repo\\javax\\xml\\bind\\jaxb-api\\2.3.1\\jaxb-api-2.3.1.jar;E:\\maven_repo\\com\\sun\\activation\\javax.activation\\1.2.0\\javax.activation-1.2.0.jar;E:\\maven_repo\\org\\slf4j\\jul-to-slf4j\\1.7.25\\jul-to-slf4j-1.7.25.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-package-core\\2.8.1\\pulsar-package-core-2.8.1.jar;E:\\maven_repo\\com\\google\\guava\\guava\\30.1-jre\\guava-30.1-jre.jar;E:\\maven_repo\\com\\google\\guava\\failureaccess\\1.0.1\\failureaccess-1.0.1.jar;E:\\maven_repo\\com\\google\\guava\\listenablefuture\\9999.0-empty-to-avoid-conflict-with-guava\\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;E:\\maven_repo\\com\\google\\code\\findbugs\\jsr305\\3.0.2\\jsr305-3.0.2.jar;E:\\maven_repo\\org\\checkerframework\\checker-qual\\3.5.0\\checker-qual-3.5.0.jar;E:\\maven_repo\\com\\google\\errorprone\\error_prone_annotations\\2.3.4\\error_prone_annotations-2.3.4.jar;E:\\maven_repo\\com\\google\\j2objc\\j2objc-annotations\\1.3\\j2objc-annotations-1.3.jar;E:\\maven_repo\\com\\google\\code\\gson\\gson\\2.8.6\\gson-2.8.6.jar;E:\\maven_repo\\org\\apache\\commons\\commons-lang3\\3.11\\commons-lang3-3.11.jar" com.toto.learn.admin.CreateTenants
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
租户信息为:my-tenant
租户信息为:my-tenant2
租户信息为:public
租户信息为:pulsar
租户信息为:test-tenant
租户信息为:toto_pulsar_t
=============================
租户信息为:my-tenant
租户信息为:my-tenant2
租户信息为:public
租户信息为:pulsar
租户信息为:test-tenant
  • 2-使用Java如何管理namespace
package com.toto.learn.admin;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;

import java.util.List;

/**
 * @author tuzuoquan
 * @version 1.0
 * @ClassName CreateNamespace
 * @description TODO
 * @date 2022/8/22 23:29
 **/
public class CreateNamespace 

    public static void main(String[] args) throws PulsarClientException, PulsarAdminException 
        //1.创建Pulsar的Admin管理对象
        String serviceHttpUrl = "http://node1:8080,node2:8080,node3:8080";
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
        //2.执行相关的操作

        //2.1.如何创建名称空间
        pulsarAdmin.namespaces().createNamespace("toto_pulsar_t/toto_pulsar_n");

        //2.2.获取在某个租户下,一共有哪些名称空间
        List<String> namespaces = pulsarAdmin.namespaces().getNamespaces("toto_pulsar_t");

        for(String namespace : namespaces) 
            System.out.println(namespace);
        

        System.out.println("==========================");
        //2.3:删除名称空间
        /*pulsarAdmin.namespaces().deleteNamespace("toto_pulsar_t/toto_pulsar_n");

        for(String namespace : namespaces) 
            System.out.println(namespace);
        */

        pulsarAdmin.close();

    


"C:\\Program Files\\Java\\jdk1.8.0_111\\bin\\java.exe" -javaagent:D:\\installed\\ideaIU-2018.1.5.win-scala\\lib\\idea_rt.jar=12252:D:\\installed\\ideaIU-2018.1.5.win-scala\\bin -Dfile.encoding=UTF-8 -classpath "C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\charsets.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\deploy.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\access-bridge-64.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\cldrdata.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\dnsns.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\jaccess.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\jfxrt.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\localedata.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\nashorn.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunec.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunjce_provider.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunmscapi.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunpkcs11.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\zipfs.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\javaws.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jce.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jfr.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jfxswt.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jsse.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\management-agent.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\plugin.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\resources.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\rt.jar;E:\\workspace\\pulsarlearn\\pulsar-base\\target\\classes;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-all\\2.8.1\\pulsar-client-all-2.8.1.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-api\\2.8.1\\pulsar-client-api-2.8.1.jar;E:\\maven_repo\\org\\apache\\pulsar\\bouncy-castle-bc\\2.8.1\\bouncy-castle-bc-2.8.1-pkg.jar;E:\\maven_repo\\org\\bouncycastle\\bcpkix-jdk15on\\1.69\\bcpkix-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcprov-jdk15on\\1.69\\bcprov-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcutil-jdk15on\\1.69\\bcutil-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcprov-ext-jdk15on\\1.69\\bcprov-ext-jdk15on-1.69.jar;E:\\maven_repo\\org\\slf4j\\slf4j-api\\1.7.25\\slf4j-api-1.7.25.jar;E:\\maven_repo\\javax\\validation\\validation-api\\1.1.0.Final\\validation-api-1.1.0.Final.jar;E:\\maven_repo\\net\\jcip\\jcip-annotations\\1.0\\jcip-annotations-1.0.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-admin-api\\2.8.1\\pulsar-client-admin-api-2.8.1.jar;E:\\maven_repo\\jakarta\\ws\\rs\\jakarta.ws.rs-api\\2.1.6\\jakarta.ws.rs-api-2.1.6.jar;E:\\maven_repo\\jakarta\\xml\\bind\\jakarta.xml.bind-api\\2.3.3\\jakarta.xml.bind-api-2.3.3.jar;E:\\maven_repo\\jakarta\\activation\\jakarta.activation-api\\1.2.2\\jakarta.activation-api-1.2.2.jar;E:\\maven_repo\\javax\\xml\\bind\\jaxb-api\\2.3.1\\jaxb-api-2.3.1.jar;E:\\maven_repo\\com\\sun\\activation\\javax.activation\\1.2.0\\javax.activation-1.2.0.jar;E:\\maven_repo\\org\\slf4j\\jul-to-slf4j\\1.7.25\\jul-to-slf4j-1.7.25.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-package-core\\2.8.1\\pulsar-package-core-2.8.1.jar;E:\\maven_repo\\com\\google\\guava\\guava\\30.1-jre\\guava-30.1-jre.jar;E:\\maven_repo\\com\\google\\guava\\failureaccess\\1.0.1\\failureaccess-1.0.1.jar;E:\\maven_repo\\com\\google\\guava\\listenablefuture\\9999.0-empty-to-avoid-conflict-with-guava\\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;E:\\maven_repo\\com\\google\\code\\findbugs\\jsr305\\3.0.2\\jsr305-3.0.2.jar;E:\\maven_repo\\org\\checkerframework\\checker-qual\\3.5.0\\checker-qual-3.5.0.jar;E:\\maven_repo\\com\\google\\errorprone\\error_prone_annotations\\2.3.4\\error_prone_annotations-2.3.4.jar;E:\\maven_repo\\com\\google\\j2objc\\j2objc-annotations\\1.3\\j2objc-annotations-1.3.jar;E:\\maven_repo\\com\\google\\code\\gson\\gson\\2.8.6\\gson-2.8.6.jar;E:\\maven_repo\\org\\apache\\commons\\commons-lang3\\3.11\\commons-lang3-3.11.jar" com.toto.learn.admin.CreateNamespace
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
toto_pulsar_t/toto_pulsar_n
==========================
  • 3-使用JAVA如何管理Topic
package com.toto.learn.admin;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;

import java.util.List;

/**
 * @author tuzuoquan
 * @version 1.0
 * @ClassName CreateTopic
 * @description TODO
 * @date 2022/8/22 23:29
 **/
public class CreateTopic 

    public static void main(String[] args) throws PulsarClientException, PulsarAdminException 
        //1.创建Pulsar的Admin管理对象
        String serviceHttpUrl = "http://node1:8080,node2:8080,node3:8080";
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();

        //2.执行相关的操作
        //2.1:创建 Topic相关的操作: 有分区和没有分区, 以及持久化和非持久化(以下创建topic的过程需要单独一个个创建)
//        pulsarAdmin.topics().createNonPartitionedTopic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic1");
//        pulsarAdmin.topics().createNonPartitionedTopic("non-persistent://toto_pulsar_t/toto_pulsar_n/t_topic2");
//
        pulsarAdmin.topics().createPartitionedTopic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic3",5);
//        pulsarAdmin.topics().createPartitionedTopic("non-persistent://toto_pulsar_t/toto_pulsar_n/t_topic4",5);

        //2.2:查看当前有哪些topic:
        List<String> topicList = pulsarAdmin.topics().getList("toto_pulsar_t/toto_pulsar_n");
        for(String topic : topicList) 
            System.out.println(topic);
        

        //2.3 修改Topic分片的数量
        pulsarAdmin.topics().updatePartitionedTopic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic3", 7);

        //2.4 一共有多少个分片呢
        int partitons = pulsarAdmin.topics().getPartitionedTopicMetadata("persistent://toto_pulsar_t/toto_pulsar_n/t_topic3").partitions;
        System.out.println(partitons);

        //2.5: 删除Topic
        //pulsarAdmin.topics().deletePartitionedTopic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic3");

        //关闭admin对象
        pulsarAdmin.close();
    


1.6.3.基于Pulsar实现数据生产

  • 1-使用JAVA如何生产数据_同步方式
package com.toto.learn.producer;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;

/**
 * @author tuzuoquan
 * @version 1.0
 * @ClassName PulsarProducerSyncTest
 * @description 演示Pulsar 生产者 同步发送
 * @date 2022/8/23 0:44
 **/
public class PulsarProducerSyncTest 

    public static void main(String[] args) throws PulsarClientException 
        //1.创建Pulsar的客户端对象
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://node1:6650,node2:6650,node3:6650").build();

        //2.通过客户端构建生产者的对象
        //可以通过命令创建:./pulsar-admin topics create persistent://toto_pulsar_t/toto_pulsar_n/t_topic1
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic1")
                .create();

        //3.使用生产者发送数据
        producer.send("hello java API pulsar ...");

        System.out.println("数据生产完成...");

        //4.释放资源
        producer.close();
        pulsarClient.close();
    


  • 2-使用java如何生产数据_异步方式
package com.toto.learn.producer;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;

/**
 * @author tuzuoquan
 * @version 1.0
 * @ClassName PulsarProducerAsyncTest
 * @description 演示Pulsar 生产者 异步发送
 * @date 2022/8/23 0:34
 **/
public class PulsarProducerAsyncTest 

    public static void main(String[] args) throws PulsarClientException, InterruptedException 
        //1.创建Pulsar

以上是关于06.Apache Pulsar的JAVA API相关使用操作,基于Pulsar实现Topic的构建操作,使用JAVA如何管理租户/namespace/Topic,基于Pulsar实现数据生产/消费的主要内容,如果未能解决你的问题,请参考以下文章

Pulsar Reader 例子

使用管理 API 将 Protobuf 消息发布到 Pulsar 模式注册表时出现 500 错误

有没有办法在 Pulsar 中确认特定消息?

Pulsar-Producer实现简介

Kafka与Pulsar的区别在哪?为什么会成为下一代的消息中间件之王?

Kafka与Pulsar的区别在哪?为什么会成为下一代的消息中间件之王?