07_Pulsar高级组件基本使用(Connector,Functions,事务)Function(轻量级计算流程)概念与使用Connector 连接器概念与使用,其它Connector
Posted 涂作权的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了07_Pulsar高级组件基本使用(Connector,Functions,事务)Function(轻量级计算流程)概念与使用Connector 连接器概念与使用,其它Connector相关的知识,希望对你有一定的参考价值。
2.Apache Pulsar高级
2.1.Pulsar高级组件基本使用(Connector,Functions,事务)
2.1.1.Function(轻量级计算流程)概念与使用
2.1.1.1.Pulsar Function轻量级计算框架
2.1.2.Connector 连接器概念与使用
2.1.2.1.Pulsar Connector 连接器
2.1.2.2.Pulsar Connector连接器 —>Pulsar Flink Connector
2.1.2.3.如何在flink的流式环境中使用Pulsar: sink端
2.1.2.4.schema flink source sink
2.1.2.5.Pulsar Flume Connector
2.1.3.2.Pulsar Transactions(事务支持)
2.Apache Pulsar高级
2.1.Pulsar高级组件基本使用(Connector,Functions,事务)
2.1.1.Function(轻量级计算流程)概念与使用
2.1.1.1.Pulsar Function轻量级计算框架
- Function背景介绍
当我们进行流式处理的时候,很多情况下,我们的需求可能只是下面这些简单的操作:简单的 ETL 操作\\聚合计算操作等相关服务。
但为了实现这些功能,我们不得不去部署一整套 SPE 服务。部署成功后才发现需要的仅是 SPE(流处理引擎) 服务中的一小部分功能,部署 SPE 的成本可能比用户开发这个功能本身更困难。由于SPE 本身 API 的复杂性,我们需要了解这些算子的使用场景,明白不同算子之间有哪些区别,什么情况下,应该使用什么算子来处理相应的逻辑。
基于以上原因,我们设计并实现了 Pulsar Functions,在 Pulsar Functions 中,用户只需关心计算逻辑本身,而不需要去了解或者部署 SPE 的相关服务,当然你也可以将 pulsar-function 与现有的 SPE 服务一起使用。 也就是说,在 Pulsar Functions 中,无需部署 SPE 的整套服务,就可以达到与 SPE 服务同样的优势 。
- 什么是Functions
Pulsar Functions 是一个轻量级的计算框架,像 AWS 的 lambda、Google Cloud 的 Functions一样,Pulsar Functions 可以给用户提供一个部署简单、运维简单、API 简单的 FASS(Function as a service)平台。
Pulsar Functions 的设计灵感来自于 Heron 这样的流处理引擎,Pulsar Functions 将会拓展 Pulsar 和整个消息领域的未来。使用 Pulsar Functions,用户可以轻松地部署和管理 function,通过 function 从 Pulsar topic 读取数据或者生产新数据到 Pulsar topic。
引入 Pulsar Functions 后,Pulsar 成为统一的消息投递/计算/存储平台。只需部署一套 Pulsar 集群,便可以实现一个计算引擎,页面简单,操作便捷。
- 什么是Functions
Input topic 是数据的来源,在 Pulsar Functions 中,所有的数据均来自 input topic。当数据进入input topic 中,Pulsar Functions 充当消费者的角色,去 input topic 中消费消息;当从 input topic 中拿到需要处理的消息时,Pulsar Functions 充当生产者的角色往 output topic 或者 log topic 中生产消息。
Output topic 和 log topic 都可以看作是 Pulsar Functions 的输出。从是否会有 output 这个点来看,我们可以将 Pulsar Functions 分为两类,当有输出的时候 Pulsar Functions 会将相应的 output 输出到 output topic中。log topic 主要存储用户的日志信息,当 Pulsar Functions 出现问题时,方便用户定位错误并调试。
综上所述:我们不难看出 Pulsar Functions 充当了一个消息处理和转运的角色。
在使用Pulsar Functions, 可以使用不同的语言来编写,比如Python, Java,Go等。编写方式主要两种:
- 本地模式:集群外部,进行本地运行
- 集群模式:集群内部运行(支持独立模式和集成模式)
如何使用呢?
首先,需要修改Pulsar中相关的配置:
[root@node1 bin]# cd /export/server/pulsar-2.8.1/conf/
[root@node1 conf]# vim broker.conf
内容如下:
# Enable Functions Worker Service in Broker
functionsWorkerEnabled=false 更改为:true
注意:三台节点都需要调整
接着,重启Broker即可:
cd /export/server/pulsar-2.8.1/bin
./pulsar-daemon stop broker
./pulsar-daemon start broker
最后通过:./pulsar-admin brokers list pulsar-cluster 查看broker列表
注意:三台节点都需要执行,依次都停止,然后依次启动
如何使用呢?
最后,测试是否可用
bin/pulsar-admin functions create \\
--jar examples/api-examples.jar \\
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \\
--inputs persistent://public/default/exclamation-input \\
--output persistent://public/default/exclamation-output \\
--tenant public \\
--namespace default \\
--name exclamation
输出结果:
"Created successfully"
检查是否按照预期触发函数运行:
[root@node1 pulsar-2.8.1]# bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"
hello world!
如何使用呢?
[root@node1 pulsar-2.8.1]# bin/pulsar-admin functions
Usage: pulsar-admin functions [command] [command options]
Commands:
localrun Run a Pulsar Function locally, rather than deploy to a
Pulsar cluster)
Usage: localrun [options]
Options:
--auto-ack
Whether or not the framework acknowledges messages automatically
--batch-builder
BatcherBuilder provides two types of batch construction methods,
DEFAULT and KEY_BASED. The default value is: DEFAULT
--broker-service-url
The URL for Pulsar broker
--classname
The class name of a Pulsar Function
--client-auth-params
Client authentication param
--client-auth-plugin
Client authentication plugin using which function-process can
connect to broker
--cpu
The cpu in cores that need to be allocated per function
instance(applicable only to docker runtime)
--custom-runtime-options
A string that encodes options to customize the runtime, see docs
for configured runtime for details
--custom-schema-inputs
The map of input topics to Schema properties (as a JSON string)
--custom-schema-outputs
The map of input topics to Schema properties (as a JSON string)
--custom-serde-inputs
The map of input topics to SerDe class names (as a JSON string)
The namespace of a Pulsar Function
* -s, --state
The FunctionState that needs to be put
--tenant
The tenant of a Pulsar Function
trigger Trigger the specified Pulsar Function with a supplied value
Usage: trigger [options]
Options:
--fqfn
The Fully Qualified Function Name (FQFN) for the function
--name
The name of a Pulsar Function
--namespace
The namespace of a Pulsar Function
--tenant
The tenant of a Pulsar Function
--topic
The specific topic name that the function consumes from that you
want to inject the data to
--trigger-file
The path to the file that contains the data with which you want to
trigger the function
--trigger-value
The value with which you want to trigger the function
bin/pulsar-admin functions
属性说明:
functions:可选值:
localrun: 创建本地function进行运行
create: 在集群模式下创建
delete: 删除在集群中运行的function
get: 获取function的相关信息
restart: 重启
stop : 停止运行
start: 启动
status: 检查状态
stats: 查看状态
list: 查看特定租户和名称空间下的所有的function
--classname: 设置function执行类
--jar 设置function对应的jar包
--inputs : 输入的topic
--output : 输出的topic
--tenant : 设置function运行在那个租户中
--namespace: 设置function运行在那个名称空间中
--name : 定义function的名称
接下来, 我们尝试编写一个function的操作, 基于Pulsar Function完成流式计算操作:
案例需求:
使用Pulsar Function读取某一个Topic中旧期(格式为:yyyy/MM/dd HH/mm/ss)数据,读取后,对数据进行日期转换(格式为:yyyy-MM-dd HH:mm:ss)
首先加入依赖:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>2.8.1</version>
</dependency>
接着编写程序:
package com.toto.learn.functions;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName WordCountFunction
* @description TODO
* @date 2022/8/24 23:55
**/
public class WordCountFunction implements Function<String,String>
private SimpleDateFormat format1 = new SimpleDateFormat("yyyy/MM/dd HH/mm/ss");
private SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public String process(String input, Context context) throws Exception
Date oldDate = format1.parse(input);
return format2.format(oldDate);
对项目程序进行打包部署:
将上面的jar包放到服务器
构建function
[root@node1 pulsar-2.8.1]# mkdir -p /export/server/pulsar-2.8.1/functions (3个节点)
cd /export/server/pulsar-2.8.1/functions
将pulsar-base-1.0-SNAPSHOT.jar上传到该目录下
[root@node1 functions]# scp -r pulsar-base-1.0-SNAPSHOT.jar root@node2:$PWD
pulsar-base-1.0-SNAPSHOT.jar 100% 15KB 692.7KB/s 00:00
[root@node1 functions]# scp -r pulsar-base-1.0-SNAPSHOT.jar root@node3:$PWD
pulsar-base-1.0-SNAPSHOT.jar 100% 15KB 11.8MB/s 00:00
[root@node1 functions]#
bin/pulsar-admin functions create \\
--jar functions/pulsar-base-1.0-SNAPSHOT.jar \\
--classname com.toto.learn.functions.WordCountFunction \\
--inputs persistent://public/default/wd_input \\
--output persistent://public/default/wd_output \\
--tenant public \\
--namespace default \\
--name wordcount
"Created successfully"
启动function
trigger触发启动,并向函数发送数据测试
[root@node1 pulsar-2.8.1]# bin/pulsar-admin functions trigger --name wordcount --trigger-value "2021/10/10 15/30/30"
2021-10-10 15:30:30
从上可以看出日期格式出现了变化
此外, 也可以通过代码向input对应的Topic发送消息, 并消费output对应的Topic中数据, 也是可以看到function可以正常处理的。
2.1.2.Connector 连接器概念与使用
2.1.2.1.Pulsar Connector 连接器
虽然可以使用 Pulsar 消费者和生产者 API 编写代码(例如,从数据库同步数据时,先查询数据,再使用
Pulsar 的 API 将数据发布至 Pulsar),但这种方法耗时费力。因此,Pulsar 提出了 Connector (也称为
Pulsar IO),用于解决 Pulsar 与周边系统的集成问题,帮助用户高效完成工作。
这张图非常直观地描述了 Pulsar IO 的组成。
Pulsar IO分为输入(Input)和输出(Output)两个模块。
输入:代表数据从哪里来,通过Source实现数据输入。数据的来源可以是数据库(例如mysql、Oracle、MongoDB)、文件、日志或自定义系统等。
输出:代表数据往哪里去,通过Sink实现数据输出。数据的输出可以是数据仓库、数据库等。
而目前Pulsar支持非常多的Connector,可以参考以下几个网站:
http://pulsar.apache.org/docs/zh-CN/io-connectors/#source-connector
http://pulsar.apache.org/docs/zh-CN/io-connectors/#sink-connector
目前我们主要介绍 Pulsar flink Connector 和 Pulsar Flume Connector
其他的连接器的使用方式, 基本是类似的
2.1.2.2.Pulsar Connector连接器 —>Pulsar Flink Connector
Pulsar Flink Connector是Apache Pulsar和Apache Flink(数据处理引擎)的集成,它允许Flink从Pulsar读取数据
,并向Pulsar写入数据,并提供精确一次的源语义和至少一次的汇聚语义。
如何使用pulsar Flink Connector, 首先在pom中加入相关的依赖环境:(注意: 还需要添加 pulsar客户端包)
<repositories><!--代码库-->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector_2.11</artifactId>
<version>1.13.1.5-rc1</version>
<exclusions>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
2.1.2.3.如何在flink的流式环境中使用Pulsar: sink端
- 前置准备:装nc工具
yum -y install nc
- 发送数据:
nc -lk 44444
例如:
编写FlinkFromPulsarSink,内容如下:
package com.toto.learn.connector.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.util.serialization.PulsarPrimitiveSchema;
import java.util.Optional;
import java.util.Properties;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName FlinkFromPulsarSink
* @description TODO
* @date 2022/8/25 23:44
**/
public class FlinkFromPulsarSink
public static void main(String[] args) throws Exception
//1.创建flink的流式处理的核心类对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.添加source组件:监控某一个端口号,从端口号读取数据操作
DataStreamSource<String> streamSource = env.socketTextStream("node1", 44444);
//3.添加转换的组件
//4.添加sink的组件: 如何将处理后的数据输出到Pulsar中
FlinkPulsarSink<String> pulsarSink = new FlinkPulsarSink<String>(
"pulsar://node1:6650,node2:6650,node3:6650",
"http://node1:8080,node2:8080,node3:8080",
Optional.of("persistent://toto_pulsar_t/toto_pulsar_n/t_topic1"),
new Properties(),
new PulsarPrimitiveSchema<>(String.class)
);
streamSource.addSink(pulsarSink);
//5.启动flink程序
env.execute("flinkPulsarSink");
观察消费者消费数据情况:
FlinkFromPulsarSink的内容如下:
package com.toto.learn.connector.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.util.serialization.PulsarPrimitiveSchema;
import java.util.Optional;
import java.util.Properties;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName FlinkFromPulsarSink
* @description TODO
* @date 2022/8/25 23:44
**/
public class FlinkFromPulsarSink
public static void main(String[] args) throws Exception
//1.创建flink的流式处理的核心类对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.添加source组件:监控某一个端口号,从端口号读取数据操作
DataStreamSource<String> streamSource = env.socketTextStream("node1", 44444);
//3.添加转换的组件
//4.添加sink的组件: 如何将处理后的数据输出到Pulsar中
FlinkPulsarSink<String> pulsarSink = new FlinkPulsarSink<String>(
"pulsar://node1:6650,node2:6650,node3:6650",
"http://node1:8080,node2:8080,node3:8080",
Optional.of("persistent://toto_pulsar_t/toto_pulsar_n/t_topic1"),
new Properties(),
new PulsarPrimitiveSchema<>(String.class)
);
streamSource.addSink(pulsarSink);
//5.启动flink程序
env.execute("flinkPulsarSink");
启动nc工具:
借助:启动FlinkFromPulsarSink
nc工具发送数据的时候,可以看到:
2.1.2.4.schema flink source sink
package com.toto.learn.connector.flink;
import com.toto.learn.pojo.User;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType;
import org.apache.flink.streaming.connectors.pulsar.internal.AvroDeser;
import org.apache.flink.streaming.connectors.pulsar.internal.AvroSer;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSerializer;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper;
import java.util.Optional;
import java.util.Properties;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName FlinkFromPulsarSchema
* @description 需求: 基于Flink实现读取一个POJO类型的数据, 将将数据写入到Pulsar中
* @date 2022/8/26 0:49
**/
public class FlinkFromPulsarSchema
public static 02_Pulsar的集群架构架构基本介绍Pulsar提供的组件介绍Brokers介绍Zookeeper的元数据存储基于bookKeeper持久化存储Pulsar代理
01.pulsar基本介绍多租户模式云原生架构Segmented Streams支持跨地域复制pulsar组件介绍Pulsar IO (Connector)Pulsar与kafka的对比
03_Apache Pulsar的Local与分布式集群构建Pulsar的分布式集群模式Pulsar的分布式集群模式构建启动测试