云原生时代顶流消息中间件Apache Pulsar部署实操之Pulsar IO与Pulsar SQL
Posted IT小神
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了云原生时代顶流消息中间件Apache Pulsar部署实操之Pulsar IO与Pulsar SQL相关的知识,希望对你有一定的参考价值。
文章目录
Pulsar IO (Connector连接器)
基础定义
Pulsar IO连接器能够轻松地创建、部署和管理与外部系统(如Apache Cassandra、Aerospike等)交互的连接器。IO连接器有两种类型:源连接器和接收器连接器。
可以通过Connector Admin CLI使用源和接收器子命令管理Pulsar连接器(例如,在连接器上创建、更新、启动、停止、重新启动、重新加载、删除和执行其他操作)。有关最新和完整的信息,请参阅Pulsar管理文档。
安装Pulsar和内置连接器
在将Pulsar连接到数据库之前,需要先安装Pulsar和所需的内置连接器。要启用Pulsar连接器,您需要在下载页面上下载连接器的tarball版本。
# 下载最新版本2.11.0的pulsar-io-cassandra和pulsar-io-jdbc-postgres,需要什么连接器可以从官方查看是否支持并下载,这里举例就下载两个
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-cassandra-2.11.0.nar
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-jdbc-postgres-2.11.0.nar
# 在pulsar根目录下创建目录
mkdir connectors
# 将压缩文件移动connectors目录
mv pulsar-io-jdbc-postgres-2.11.0.nar pulsar-io-jdbc-postgres-2.11.0.nar connectors
# 重启pulsar
# 查看可用连接器列表
curl -w '\\n' -s http://localhost:8080/admin/v2/functions/connectors
连接Pulsar到Cassandra
安装cassandra集群
# 下载镜像并启动cassandra测试容器
docker run -d --rm --name=cassandra -p 9042:9042 cassandra
# 查看进程
docker ps
# 查看运行日志
docker logs cassandra
# 等待一小段时间后查看Cassandra集群状态
docker exec cassandra nodetool status
# 使用cqlsh连接到Cassandra集群
# 使用cqlsh连接到Cassandra集群
docker exec -ti cassandra cqlsh localhost
# 创建一个密钥空间pulsar_itxs_keyspace
CREATE KEYSPACE pulsar_itxs_keyspace WITH replication = 'class':'SimpleStrategy', 'replication_factor':1;
# 创建一个表pulsar_itxs_table
USE pulsar_itxs_keyspace;
CREATE TABLE pulsar_itxs_table (key text PRIMARY KEY, col text);
配置Cassandra接收器
现在已经有一个Cassandra集群在本地运行;要运行Cassandra接收器连接器,需要准备一个配置文件,其中包括Pulsar连接器运行时需要知道的信息,例如Pulsar连接器如何找到Cassandra集群,Pulsar连接器用于写入Pulsar消息的键空间和表是什么等等;可以使用Json或者Yaml这两种格式创建配置文件。
vim examples/cassandra-sink.json
"roots": "192.168.3.100:9042",
"keyspace": "pulsar_itxs_keyspace",
"columnFamily": "pulsar_itxs_table",
"keyname": "key",
"columnName": "col"
vim examples/cassandra-sink.yml
configs:
roots: "192.168.3.100:9042"
keyspace: "pulsar_itxs_keyspace"
columnFamily: "pulsar_itxs_table"
keyname: "key"
columnName: "col"
创建Cassandra Sink
可以使用Connector Admin CLI创建sink连接器和操作。运行下面命令来创建一个Cassandra接收器连接器,接收器类型为Cassandra,配置文件为上一步创建的examples/cassandra-sink.yml。
bin/pulsar-admin sinks create \\
--tenant my-test \\
--namespace my-namespace \\
--name cassandra-itxs-sink \\
--sink-type cassandra \\
--sink-config-file examples/cassandra-sink.yml \\
--inputs persistent://my-test/my-namespace/itxs_cassandra
命令执行后,Pulsar创建接收器连接器cassandra-itxs-sink。这个接收器连接器作为Pulsar函数运行,并将主题itxs_cassandra中产生的消息写入Cassandra表pulsar_itxs_table;
可以使用Connector Admin CLI对连接器进行监控和其他操作。
- 获取连接器的信息
bin/pulsar-admin sinks get \\
--tenant my-test \\
--namespace my-namespace \\
--name cassandra-itxs-sink
- 检查连接器的状态
bin/pulsar-admin sinks status \\
--tenant my-test \\
--namespace my-namespace \\
--name cassandra-itxs-sink
验证Cassandra Sink结果
生成一些消息到Cassandra接收器itxs_cassandra的输入主题
for i in 0..9; do bin/pulsar-client produce -m "itxskey-$i" -n 1 persistent://my-test/my-namespace/itxs_cassandra; done
再次查看连接器的状态,可以有10条记录处理统计信息
查看Cassandra的pulsar_itxs_table
USE pulsar_itxs_keyspace;
select * from pulsar_itxs_table;
删除Cassandra Sink
bin/pulsar-admin sinks delete \\
--tenant my-test \\
--namespace my-namespace \\
--name cassandra-itxs-sink
连接Pulsar到PostgreSQL
安装PostgreSQL集群
这里使用PostgreSQL 12 docker镜像在docker中启动一个单节点PostgreSQL集群。
# 从Docker中拉取PostgreSQL 12映像
docker pull postgres:12
# 启动postgres容器
docker run -d -it --rm \\
--name pulsar-postgres \\
-p 5432:5432 \\
-e POSTGRES_PASSWORD=password \\
-e POSTGRES_USER=postgres \\
postgres:12
# 查看运行日志
docker logs -f pulsar-postgres
# 进入容器
docker exec -it pulsar-postgres /bin/bash
# 使用默认用户名和密码登录PostgreSQL
psql -U postgres postgres
# 使用以下命令创建pulsar_postgres_jdbc_sink表:
create table if not exists pulsar_postgres_jdbc_sink
(
id serial PRIMARY KEY,
name VARCHAR(255) NOT NULL
);
配置JDBC接收器
现在有一个本地运行的PostgreSQ,接下来需要配置JDBC接收器连接器。
- 创建配置文件vim connectors/pulsar-postgres-jdbc-sink.yaml
configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://192.169.3.100:5432/postgres"
tableName: "pulsar_postgres_jdbc_sink"
创建JDBC Sink
执行下面命令后,Pulsar将创建接收器连接器pulse -postgres-jdbc-sink。这个sink连接器作为Pulsar函数运行,并将Topic为pulsar-postgres-jdbc-sink-topic中产生的消息写入PostgreSQL表pulsar_postgres_jdbc_sink。
bin/pulsar-admin sinks create \\
--tenant my-test \\
--namespace my-namespace \\
--archive ./connectors/pulsar-io-jdbc-postgres-2.11.0.nar \\
--inputs persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic \\
--name pulsar-postgres-my-jdbc-sink \\
--sink-config-file ./connectors/pulsar-postgres-jdbc-sink.yaml \\
--parallelism 1
列出所有的sink
bin/pulsar-admin sinks list \\
--tenant my-test \\
--namespace my-namespace
验证JDBC Sink结果
通过JavaAPI生成一些消息到Cassandra接收器pulsar-postgres-jdbc-sink-topic这个主题,在Java项目添加maven依赖
<properties>
<pulsar.version>2.11.0</pulsar.version>
</properties>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>$pulsar.version</version>
</dependency>
这里演示实体类成员变量简单就直接使用public声明了
package sn.itxs.pulsar.io;
public class User
public int id;
public String name;
新增ClientDemo.java
package sn.itxs.pulsar.io;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.AvroSchema;
public class ClientDemo
public static void main(String[] args) throws Exception
PulsarClient client = null;
Producer<User> producer = null;
try
client = PulsarClient.builder()
.serviceUrl("pulsar://192.168.5.52:6650")
.build();
producer = client.newProducer(AvroSchema.of(User.class))
.topic("persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic")
.create();
User user = new User();
int index = 10;
while (index++ < 20)
try
user.id = index;
user.name = "this is a test " + index;
producer.newMessage().value(user).send();
catch (Exception e)
e.printStackTrace();
System.out.println("send finish");
catch (Exception e)
e.printStackTrace();
finally
if (producer!=null)
producer.close();
if (client!=null)
client.close();
运行程序后查看PostgreSQL表pulsar_postgres_jdbc_sink,已经有刚才
上面由于在Java中创建了Schema,因此不需要手工创建,可以查看当前persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic主体已生成Schema信息如下:
如果要从pulsar-admin命令行创建schema可以这样操作
- 创建schema,创建一个avro-schema文件,将以下内容复制到该文件中,并将该文件放在pulsar/connectors文件夹中。vim connectors/avro-schema
"type": "AVRO",
"schema": "\\"type\\":\\"record\\",\\"name\\":\\"Test\\",\\"fields\\":[\\"name\\":\\"id\\",\\"type\\":[\\"null\\",\\"int\\"],\\"name\\":\\"name\\",\\"type\\":[\\"null\\",\\"string\\"]]",
"properties":
- 上传schema到topic,将avro-schema模式上传到pulsar-postgres-jdbc-sink-topic主题
bin/pulsar-admin schemas upload persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema
- 检查模式是否上传成功。
bin/pulsar-admin schemas get persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic1
如需stop停止、restart重启指定的sinks可以如下操作,当然也可以更新指定sinks,详细命令可以查阅官网
bin/pulsar-admin sinks stop \\
--tenant my-test \\
--namespace my-namespace \\
--name pulsar-postgres-my-jdbc-sink \\
Pulsar SQL
定义
Apache Pulsar用于存储事件数据流,事件数据由预定义的字段构成。通过模式注册表的实现,可以在Pulsar中存储结构化数据,并使用Trino(以前是Presto SQL)查询数据。作为Pulsar SQL的核心,Pulsar Trino插件使Trino集群中的Trino worker能够查询来自Pulsar的数据.
由于Pulsar采用了基于两级段的架构,因此查询性能高效且可扩展性强。Pulsar中的主题在Apache BookKeeper中存储为段。每个主题段被复制到一些BookKeeper节点上,从而支持并发读和高读吞吐量。在Pulsar Trino连接器中,数据直接从BookKeeper中读取,因此Trino worker可以同时从水平可扩展数量的BookKeeper节点中读取
简单使用
在Pulsar中查询数据前,需要安装Pulsar和内置连接器。
# 这里演示就直接启动独立集群
PULSAR_STANDALONE_USE_ZOOKEEPER=1 ./bin/pulsar standalone
# 启动一个Pulsar SQL worker
./bin/pulsar sql-worker run
# 初始化Pulsar独立集群和SQL worker后,执行SQL CLI:
./bin/pulsar sql
show catalogs;
show schemas in pulsar;
show tables in pulsar."public/default";
通过前面的Java示例,我们改为Json格式写入Pulsar的user-topic
package sn.itxs.pulsar.io;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
public class ClientSqlDemo
public static void main(String[] args) throws Exception
PulsarClient client = null;
Producer<User> producer = null;
try
client = PulsarClient.builder()
.serviceUrl("pulsar://192.168.5.52:6650")
.build();
producer = client.newProducer(Schema.JSON(User.class))
.topic("user-topic")
.create();
User user = new User();
int index = 10;
while (index++ < 20)
try
user.id = index;
user.name = "this is a test " + index;
producer.newMessage().value(user).send();
catch (Exception e)
e.printStackTrace();
System.out.println("send finish");
catch (Exception e)
e.printStackTrace();
finally
if (producer!=null)
producer.close();
if (client!=null)
client.close();
运行程序后再来查询就有刚才发送的消息数据,_开头的字段为Pulsar 自带的。
select * from pulsar."public/default"."user-topic";
- 本人博客网站IT小神 www.itxiaoshen.com
腾讯宣布开源 RoP:Apache Pulsar 支持原生 RocketMQ 协议
作者 | 冉小龙,刘昱
RocketMQ 用户可以无缝迁移到 Apache Pulsar 了。自此,Apache Pulsar 补齐了兼容主流消息队列协议的能力。
我们很高兴地宣布腾讯云中间件开源 RoP!RoP 将 RocketMQ 协议处理插件引入 Pulsar broker,这样 Pulsar 就能支持原生 RocketMQ 协议了。
什么是 RoP?
与 KoP、MoP 和 AoP 相似,RoP 是一种可插拔的协议处理插件。
将 RoP 协议处理插件添加到现有 Pulsar 集群后,用户无需修改代码,便能将现有的 RocketMQ 应用程序和服务迁移到 Pulsar,同时还能使用 Pulsar 的强大功能,例如:
计算与存储分离
多租户
跨地域复制
分层分片
轻量化计算框架 -- Pulsar Functions
...
为什么开发 RoP?
Apache Pulsar 是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体。自 2016 年开源以来,Pulsar 已被广泛采用,并于 2018 年被指定为 Apache 顶级项目。
RocketMQ 是一款强大的开源分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
Pulsar 和 RocketMQ 拥有广泛的用户群体和强劲的开发支持,全球许多头部公司都在使用这两种消息服务。同时,我们也收到了用户的需求,希望能在 Pulsar 与 RocketMQ 之间传输数据,并充分利用这两种消息系统的优势。
Apache Pulsar 通过对 Consumer 层的抽象,提供了队列和流两种消费模型的统一抽象。在 Client 与 Broker 的交互中,Pulsar 基于 Protobuf 的二进制协议,提供更高的性能和更低的延迟。除此之外,通过 Protobuf 协议,Pulsar 可以更容易地支持并实现多语言的客户端,比如:Java、CPP、Python 和 Go 语言等客户端。
但是,对于使用其他消息传输协议编写的应用程序(例如,RocketMQ),由于使用的消息处理协议和 Pulsar 不同,如果 Pulsar 想要兼容 RocketMQ 协议,为了将 RocketMQ 的协议适配到 Pulsar 的消息协议层中,用户需要重写整个协议层,这给用户的迁移和切换带来了很大的成本。
为了解决这个问题,最直观的处理方式是使用类似 Pulsar Connector 的形式,将用户在 RocketMQ 中的现存数据通过 RocketMQ Wrapper 的方式导入到 Pulsar 集群,但是这需要业务端更改自己的业务代码逻辑,同时需要确保两边的数据能够保证一致,这给使用 RocketMQ 的用户带来了很大的技术挑战。所以,能否给用户提供一个开箱即用的迁移策略和方案并且用户无需做任何代码修改呢?这便是 RoP 诞生的最初目的。
怎样开发 RoP?
Apache Pulsar 在 PIP-41(https://github.com/apache/pulsar/wiki/PIP-41%3A-Pluggable-Protocol-Handler) 中介绍了一种全新的接入方式。通过在 Broker 端暴露 Protocol Handler 插件,将 Netty 的 channel 和 Pulsar 的 Broker Service(https://github.com/apache/pulsar/blob/907fcb5ba8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java) 对象暴露给用户。这允许用户直接操作和调用 Pulsar 中比较低阶的 API(例如:PersistentTopic 和 ManagerLedger)。基于这个协议,用户无需更改代码,只需将服务请求转发到 RoP 中,RoP 利用 Protocol Handler 的插件将用户的请求转发到 Pulsar 中即可。
RoP 架构
通过对比 Pulsar 和 RocketMQ 之间的协议可以发现,二者在消息处理的思路上有不少相似之处,比如这两种协议都包含如下操作:
Topic Lookup: 所有 Clients 与任意 Broker 建立连接之前,会先去查找当前 Topic 的 Owner Broker。获取到对应的 metadata 之后,Clients 会与 Owner Broker 之间建立 TCP 连接进行数据的交互。
Produce: Clients 与 Topic 所在的所有 Owner Broker 之间进行通信并将消息 append 到对应的分布式日志中。
Consume: Clients 与 Topic 所在的所有 Owner Broker 之间进行通信并从分布式日志中读取指定的消息。
Offset: Producer 生产到 topic 中的消息会分配一个唯一的 offset,Pulsar 中使用 MessageID 来标识 offset。消费者可以通过 offset 去日志中获取指定位置的消息。
Apache Pulsar 的存储层使用了 Apache BookKeeper,Pulsar 相当于 BookKeeper 的 Client,通过调用 ManagerLedger 对象能够很容易的达到为分布式日志操作的目的。基于此,RoP 可以很好的将 RocketMQ 中对 commitLog 和 queueLog 的操作映射到 BookKeeper 中来。
RoP 概念
Offset 和 MessageID
在 RocketMQ 中,使用 offset 来标识消息的位置,当消息被生产到指定的 Topic 之后,会为每一个消息分配一个唯一的 offset;在 Pulsar 中,使用 MessageID 来唯一标识每条消息,每一个 MessageID 由三部分组成,ledgerID
、entryID
和 partitionID
。我们通过合理的划分将 messageID 和 offset 进行映射,来唯一标识 Topic 中的每一条消息。
Message
对于一条消息,RocketMQ 和 Pulsar 都包含消息的 headers 和 payload 等字段,通过对消息协议的解析,我们可以轻松的将 RocketMQ message 转换为 Pulsar 的 message 格式。为了更好的兼容 Tag 消息的功能,在消息协议的处理方面增加了 8 字节的特殊字段,用来区分该消息是否属于 Tag 消息。
Topic Lookup
在 Pulsar 中,client 与 broker 建立连接之前,会根据当前传入的 Topic 执行 Lookup 操作,在 Broker 集群中寻找当前 Topic 所在的 Owner Broker,然后将该 Owner Broker 的地址返回并与 client 建立 TCP 连接,再进行数据交互。在 RocketMQ 中,client 与 broker 建立连接之前,会先处理 GET_ROUTEINTO_BY_TOPIC
命令,获取 topic 所在的路由信息后,建立对应的 TCP 连接,再进行数据交互。
如何使用 RoP?
目前,RoP 发布了 0.1.0 版本,你可以用过以下任一方式参与该项目:
想上手试试?
可在以下网址下载 RoP 和查阅用户指南:https://github.com/streamnative/rop/blob/master/README.md。无论是快速启动 standalone RoP 或在现有 Pulsar 集群中部署 RoP,都可轻松实现。
另外,为了方便快速使用并验证 RoP,我们提供了 RocketMQ 的常见使用场景和用例,你可以直接使用这些代码示例验证服务:https://github.com/streamnative/rop/tree/master/examples/src/main/java/org/streamnative/rocketmq/example
想解决问题?
如有任何问题,可以在 RoP GitHub repo 中 创建 issue 或加入 RoP 微信群进行讨论。无论哪种方式,RoP 资深专家都随时在线:https://github.com/streamnative/rop/issues/new
想参与贡献?
RoP 源码开放并托管在 GitHub 上:https://github.com/streamnative/rop%EF%BC%89%E3%80%82。如需改进功能或修复 bug,欢迎提交 PR。
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️
以上是关于云原生时代顶流消息中间件Apache Pulsar部署实操之Pulsar IO与Pulsar SQL的主要内容,如果未能解决你的问题,请参考以下文章
下一代云原生消息流平台 Apache Pulsar 消息保留和过期策略设计
腾讯宣布开源 RoP:Apache Pulsar 支持原生 RocketMQ 协议