06.Apache Pulsar的JAVA API相关使用操作,基于Pulsar实现Topic的构建操作,使用JAVA如何管理租户/namespace/Topic,基于Pulsar实现数据生产/消费
Posted 涂作权的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了06.Apache Pulsar的JAVA API相关使用操作,基于Pulsar实现Topic的构建操作,使用JAVA如何管理租户/namespace/Topic,基于Pulsar实现数据生产/消费相关的知识,希望对你有一定的参考价值。
1.6.Apache Pulsar的JAVA API相关使用操作
1.6.1.基于Pulsar实现Topic的构建操作_准备工作
1.6.2.基于Pulsar实现Topic的构建操作
1-使用JAVA如何管理租户
2-使用Java如何管理namespace
3-使用JAVA如何管理Topic
1.6.3.基于Pulsar实现数据生产
1.6.4.基于Pulsar实现数据消费
1.6.Apache Pulsar的JAVA API相关使用操作
1.6.1.基于Pulsar实现Topic的构建操作_准备工作
首先,需要我们创建一个maven项目,并加入Pulsar相关的依赖
<!--代码库-->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
1.6.2.基于Pulsar实现Topic的构建操作
- 1-使用JAVA如何管理租户
package com.toto.learn.admin;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.HashSet;
import java.util.List;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName CreateTenants
* @description TODO
* @date 2022/8/22 23:28
**/
public class CreateTenants
public static void main(String[] args) throws PulsarClientException, PulsarAdminException
//1.创建Pulsar的Admin管理对象
String serviceHttpUrl = "http://node1:8080,node2:8080,node3:8080";
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
//2.基于Pulsar的Admin对象进行相关的操作
//2.1:如何创建 租户操作
HashSet<String> allowedClusters = new HashSet<>();
allowedClusters.add("pulsar-cluster");
TenantInfo config = TenantInfo.builder().allowedClusters(allowedClusters).build();
pulsarAdmin.tenants().createTenant("toto_pulsar_t",config);
//2.2:查看当前有哪些租户
List<String> tenants = pulsarAdmin.tenants().getTenants();
for(String tenant : tenants)
System.out.println("租户信息为:" + tenant);
//2.3:删除租户操作
pulsarAdmin.tenants().deleteTenant("toto_pulsar_t");
System.out.println("=============================");
List<String> tenantsTwo = pulsarAdmin.tenants().getTenants();
for(String tenant : tenantsTwo)
System.out.println("租户信息为:" + tenant);
pulsarAdmin.close();
输出结果:
"C:\\Program Files\\Java\\jdk1.8.0_111\\bin\\java.exe" -javaagent:D:\\installed\\ideaIU-2018.1.5.win-scala\\lib\\idea_rt.jar=11735:D:\\installed\\ideaIU-2018.1.5.win-scala\\bin -Dfile.encoding=UTF-8 -classpath "C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\charsets.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\deploy.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\access-bridge-64.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\cldrdata.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\dnsns.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\jaccess.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\jfxrt.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\localedata.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\nashorn.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunec.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunjce_provider.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunmscapi.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunpkcs11.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\zipfs.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\javaws.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jce.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jfr.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jfxswt.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jsse.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\management-agent.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\plugin.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\resources.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\rt.jar;E:\\workspace\\pulsarlearn\\pulsar-base\\target\\classes;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-all\\2.8.1\\pulsar-client-all-2.8.1.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-api\\2.8.1\\pulsar-client-api-2.8.1.jar;E:\\maven_repo\\org\\apache\\pulsar\\bouncy-castle-bc\\2.8.1\\bouncy-castle-bc-2.8.1-pkg.jar;E:\\maven_repo\\org\\bouncycastle\\bcpkix-jdk15on\\1.69\\bcpkix-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcprov-jdk15on\\1.69\\bcprov-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcutil-jdk15on\\1.69\\bcutil-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcprov-ext-jdk15on\\1.69\\bcprov-ext-jdk15on-1.69.jar;E:\\maven_repo\\org\\slf4j\\slf4j-api\\1.7.25\\slf4j-api-1.7.25.jar;E:\\maven_repo\\javax\\validation\\validation-api\\1.1.0.Final\\validation-api-1.1.0.Final.jar;E:\\maven_repo\\net\\jcip\\jcip-annotations\\1.0\\jcip-annotations-1.0.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-admin-api\\2.8.1\\pulsar-client-admin-api-2.8.1.jar;E:\\maven_repo\\jakarta\\ws\\rs\\jakarta.ws.rs-api\\2.1.6\\jakarta.ws.rs-api-2.1.6.jar;E:\\maven_repo\\jakarta\\xml\\bind\\jakarta.xml.bind-api\\2.3.3\\jakarta.xml.bind-api-2.3.3.jar;E:\\maven_repo\\jakarta\\activation\\jakarta.activation-api\\1.2.2\\jakarta.activation-api-1.2.2.jar;E:\\maven_repo\\javax\\xml\\bind\\jaxb-api\\2.3.1\\jaxb-api-2.3.1.jar;E:\\maven_repo\\com\\sun\\activation\\javax.activation\\1.2.0\\javax.activation-1.2.0.jar;E:\\maven_repo\\org\\slf4j\\jul-to-slf4j\\1.7.25\\jul-to-slf4j-1.7.25.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-package-core\\2.8.1\\pulsar-package-core-2.8.1.jar;E:\\maven_repo\\com\\google\\guava\\guava\\30.1-jre\\guava-30.1-jre.jar;E:\\maven_repo\\com\\google\\guava\\failureaccess\\1.0.1\\failureaccess-1.0.1.jar;E:\\maven_repo\\com\\google\\guava\\listenablefuture\\9999.0-empty-to-avoid-conflict-with-guava\\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;E:\\maven_repo\\com\\google\\code\\findbugs\\jsr305\\3.0.2\\jsr305-3.0.2.jar;E:\\maven_repo\\org\\checkerframework\\checker-qual\\3.5.0\\checker-qual-3.5.0.jar;E:\\maven_repo\\com\\google\\errorprone\\error_prone_annotations\\2.3.4\\error_prone_annotations-2.3.4.jar;E:\\maven_repo\\com\\google\\j2objc\\j2objc-annotations\\1.3\\j2objc-annotations-1.3.jar;E:\\maven_repo\\com\\google\\code\\gson\\gson\\2.8.6\\gson-2.8.6.jar;E:\\maven_repo\\org\\apache\\commons\\commons-lang3\\3.11\\commons-lang3-3.11.jar" com.toto.learn.admin.CreateTenants
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
租户信息为:my-tenant
租户信息为:my-tenant2
租户信息为:public
租户信息为:pulsar
租户信息为:test-tenant
租户信息为:toto_pulsar_t
=============================
租户信息为:my-tenant
租户信息为:my-tenant2
租户信息为:public
租户信息为:pulsar
租户信息为:test-tenant
- 2-使用Java如何管理namespace
package com.toto.learn.admin;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.List;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName CreateNamespace
* @description TODO
* @date 2022/8/22 23:29
**/
public class CreateNamespace
public static void main(String[] args) throws PulsarClientException, PulsarAdminException
//1.创建Pulsar的Admin管理对象
String serviceHttpUrl = "http://node1:8080,node2:8080,node3:8080";
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
//2.执行相关的操作
//2.1.如何创建名称空间
pulsarAdmin.namespaces().createNamespace("toto_pulsar_t/toto_pulsar_n");
//2.2.获取在某个租户下,一共有哪些名称空间
List<String> namespaces = pulsarAdmin.namespaces().getNamespaces("toto_pulsar_t");
for(String namespace : namespaces)
System.out.println(namespace);
System.out.println("==========================");
//2.3:删除名称空间
/*pulsarAdmin.namespaces().deleteNamespace("toto_pulsar_t/toto_pulsar_n");
for(String namespace : namespaces)
System.out.println(namespace);
*/
pulsarAdmin.close();
"C:\\Program Files\\Java\\jdk1.8.0_111\\bin\\java.exe" -javaagent:D:\\installed\\ideaIU-2018.1.5.win-scala\\lib\\idea_rt.jar=12252:D:\\installed\\ideaIU-2018.1.5.win-scala\\bin -Dfile.encoding=UTF-8 -classpath "C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\charsets.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\deploy.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\access-bridge-64.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\cldrdata.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\dnsns.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\jaccess.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\jfxrt.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\localedata.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\nashorn.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunec.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunjce_provider.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunmscapi.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\sunpkcs11.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\ext\\zipfs.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\javaws.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jce.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jfr.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jfxswt.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\jsse.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\management-agent.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\plugin.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\resources.jar;C:\\Program Files\\Java\\jdk1.8.0_111\\jre\\lib\\rt.jar;E:\\workspace\\pulsarlearn\\pulsar-base\\target\\classes;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-all\\2.8.1\\pulsar-client-all-2.8.1.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-api\\2.8.1\\pulsar-client-api-2.8.1.jar;E:\\maven_repo\\org\\apache\\pulsar\\bouncy-castle-bc\\2.8.1\\bouncy-castle-bc-2.8.1-pkg.jar;E:\\maven_repo\\org\\bouncycastle\\bcpkix-jdk15on\\1.69\\bcpkix-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcprov-jdk15on\\1.69\\bcprov-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcutil-jdk15on\\1.69\\bcutil-jdk15on-1.69.jar;E:\\maven_repo\\org\\bouncycastle\\bcprov-ext-jdk15on\\1.69\\bcprov-ext-jdk15on-1.69.jar;E:\\maven_repo\\org\\slf4j\\slf4j-api\\1.7.25\\slf4j-api-1.7.25.jar;E:\\maven_repo\\javax\\validation\\validation-api\\1.1.0.Final\\validation-api-1.1.0.Final.jar;E:\\maven_repo\\net\\jcip\\jcip-annotations\\1.0\\jcip-annotations-1.0.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-client-admin-api\\2.8.1\\pulsar-client-admin-api-2.8.1.jar;E:\\maven_repo\\jakarta\\ws\\rs\\jakarta.ws.rs-api\\2.1.6\\jakarta.ws.rs-api-2.1.6.jar;E:\\maven_repo\\jakarta\\xml\\bind\\jakarta.xml.bind-api\\2.3.3\\jakarta.xml.bind-api-2.3.3.jar;E:\\maven_repo\\jakarta\\activation\\jakarta.activation-api\\1.2.2\\jakarta.activation-api-1.2.2.jar;E:\\maven_repo\\javax\\xml\\bind\\jaxb-api\\2.3.1\\jaxb-api-2.3.1.jar;E:\\maven_repo\\com\\sun\\activation\\javax.activation\\1.2.0\\javax.activation-1.2.0.jar;E:\\maven_repo\\org\\slf4j\\jul-to-slf4j\\1.7.25\\jul-to-slf4j-1.7.25.jar;E:\\maven_repo\\org\\apache\\pulsar\\pulsar-package-core\\2.8.1\\pulsar-package-core-2.8.1.jar;E:\\maven_repo\\com\\google\\guava\\guava\\30.1-jre\\guava-30.1-jre.jar;E:\\maven_repo\\com\\google\\guava\\failureaccess\\1.0.1\\failureaccess-1.0.1.jar;E:\\maven_repo\\com\\google\\guava\\listenablefuture\\9999.0-empty-to-avoid-conflict-with-guava\\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;E:\\maven_repo\\com\\google\\code\\findbugs\\jsr305\\3.0.2\\jsr305-3.0.2.jar;E:\\maven_repo\\org\\checkerframework\\checker-qual\\3.5.0\\checker-qual-3.5.0.jar;E:\\maven_repo\\com\\google\\errorprone\\error_prone_annotations\\2.3.4\\error_prone_annotations-2.3.4.jar;E:\\maven_repo\\com\\google\\j2objc\\j2objc-annotations\\1.3\\j2objc-annotations-1.3.jar;E:\\maven_repo\\com\\google\\code\\gson\\gson\\2.8.6\\gson-2.8.6.jar;E:\\maven_repo\\org\\apache\\commons\\commons-lang3\\3.11\\commons-lang3-3.11.jar" com.toto.learn.admin.CreateNamespace
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
toto_pulsar_t/toto_pulsar_n
==========================
- 3-使用JAVA如何管理Topic
package com.toto.learn.admin;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.List;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName CreateTopic
* @description TODO
* @date 2022/8/22 23:29
**/
public class CreateTopic
public static void main(String[] args) throws PulsarClientException, PulsarAdminException
//1.创建Pulsar的Admin管理对象
String serviceHttpUrl = "http://node1:8080,node2:8080,node3:8080";
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
//2.执行相关的操作
//2.1:创建 Topic相关的操作: 有分区和没有分区, 以及持久化和非持久化(以下创建topic的过程需要单独一个个创建)
// pulsarAdmin.topics().createNonPartitionedTopic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic1");
// pulsarAdmin.topics().createNonPartitionedTopic("non-persistent://toto_pulsar_t/toto_pulsar_n/t_topic2");
//
pulsarAdmin.topics().createPartitionedTopic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic3",5);
// pulsarAdmin.topics().createPartitionedTopic("non-persistent://toto_pulsar_t/toto_pulsar_n/t_topic4",5);
//2.2:查看当前有哪些topic:
List<String> topicList = pulsarAdmin.topics().getList("toto_pulsar_t/toto_pulsar_n");
for(String topic : topicList)
System.out.println(topic);
//2.3 修改Topic分片的数量
pulsarAdmin.topics().updatePartitionedTopic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic3", 7);
//2.4 一共有多少个分片呢
int partitons = pulsarAdmin.topics().getPartitionedTopicMetadata("persistent://toto_pulsar_t/toto_pulsar_n/t_topic3").partitions;
System.out.println(partitons);
//2.5: 删除Topic
//pulsarAdmin.topics().deletePartitionedTopic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic3");
//关闭admin对象
pulsarAdmin.close();
1.6.3.基于Pulsar实现数据生产
- 1-使用JAVA如何生产数据_同步方式
package com.toto.learn.producer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName PulsarProducerSyncTest
* @description 演示Pulsar 生产者 同步发送
* @date 2022/8/23 0:44
**/
public class PulsarProducerSyncTest
public static void main(String[] args) throws PulsarClientException
//1.创建Pulsar的客户端对象
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://node1:6650,node2:6650,node3:6650").build();
//2.通过客户端构建生产者的对象
//可以通过命令创建:./pulsar-admin topics create persistent://toto_pulsar_t/toto_pulsar_n/t_topic1
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://toto_pulsar_t/toto_pulsar_n/t_topic1")
.create();
//3.使用生产者发送数据
producer.send("hello java API pulsar ...");
System.out.println("数据生产完成...");
//4.释放资源
producer.close();
pulsarClient.close();
- 2-使用java如何生产数据_异步方式
package com.toto.learn.producer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName PulsarProducerAsyncTest
* @description 演示Pulsar 生产者 异步发送
* @date 2022/8/23 0:34
**/
public class PulsarProducerAsyncTest
public static void main(String[] args) throws PulsarClientException, InterruptedException
//1.创建Pulsar以上是关于06.Apache Pulsar的JAVA API相关使用操作,基于Pulsar实现Topic的构建操作,使用JAVA如何管理租户/namespace/Topic,基于Pulsar实现数据生产/消费的主要内容,如果未能解决你的问题,请参考以下文章
使用管理 API 将 Protobuf 消息发布到 Pulsar 模式注册表时出现 500 错误