07_Pulsar高级组件基本使用(Connector,Functions,事务)Function(轻量级计算流程)概念与使用Connector 连接器概念与使用,其它Connector

Posted 涂作权的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了07_Pulsar高级组件基本使用(Connector,Functions,事务)Function(轻量级计算流程)概念与使用Connector 连接器概念与使用,其它Connector相关的知识,希望对你有一定的参考价值。

2.Apache Pulsar高级
2.1.Pulsar高级组件基本使用(Connector,Functions,事务)
2.1.1.Function(轻量级计算流程)概念与使用
2.1.1.1.Pulsar Function轻量级计算框架
2.1.2.Connector 连接器概念与使用
2.1.2.1.Pulsar Connector 连接器
2.1.2.2.Pulsar Connector连接器 —>Pulsar Flink Connector
2.1.2.3.如何在flink的流式环境中使用Pulsar: sink端
2.1.2.4.schema flink source sink
2.1.2.5.Pulsar Flume Connector
2.1.3.2.Pulsar Transactions(事务支持)

2.Apache Pulsar高级

2.1.Pulsar高级组件基本使用(Connector,Functions,事务)

2.1.1.Function(轻量级计算流程)概念与使用

2.1.1.1.Pulsar Function轻量级计算框架

  • Function背景介绍

当我们进行流式处理的时候,很多情况下,我们的需求可能只是下面这些简单的操作:简单的 ETL 操作\\聚合计算操作等相关服务。

但为了实现这些功能,我们不得不去部署一整套 SPE 服务。部署成功后才发现需要的仅是 SPE(流处理引擎) 服务中的一小部分功能,部署 SPE 的成本可能比用户开发这个功能本身更困难。由于SPE 本身 API 的复杂性,我们需要了解这些算子的使用场景,明白不同算子之间有哪些区别,什么情况下,应该使用什么算子来处理相应的逻辑。

基于以上原因,我们设计并实现了 Pulsar Functions,在 Pulsar Functions 中,用户只需关心计算逻辑本身,而不需要去了解或者部署 SPE 的相关服务,当然你也可以将 pulsar-function 与现有的 SPE 服务一起使用。 也就是说,在 Pulsar Functions 中,无需部署 SPE 的整套服务,就可以达到与 SPE 服务同样的优势 。

  • 什么是Functions

Pulsar Functions 是一个轻量级的计算框架,像 AWS 的 lambda、Google Cloud 的 Functions一样,Pulsar Functions 可以给用户提供一个部署简单、运维简单、API 简单的 FASS(Function as a service)平台。

Pulsar Functions 的设计灵感来自于 Heron 这样的流处理引擎,Pulsar Functions 将会拓展 Pulsar 和整个消息领域的未来。使用 Pulsar Functions,用户可以轻松地部署和管理 function,通过 function 从 Pulsar topic 读取数据或者生产新数据到 Pulsar topic。

引入 Pulsar Functions 后,Pulsar 成为统一的消息投递/计算/存储平台。只需部署一套 Pulsar 集群,便可以实现一个计算引擎,页面简单,操作便捷。

  • 什么是Functions

Input topic 是数据的来源,在 Pulsar Functions 中,所有的数据均来自 input topic。当数据进入input topic 中,Pulsar Functions 充当消费者的角色,去 input topic 中消费消息;当从 input topic 中拿到需要处理的消息时,Pulsar Functions 充当生产者的角色往 output topic 或者 log topic 中生产消息。

Output topic 和 log topic 都可以看作是 Pulsar Functions 的输出。从是否会有 output 这个点来看,我们可以将 Pulsar Functions 分为两类,当有输出的时候 Pulsar Functions 会将相应的 output 输出到 output topic中。log topic 主要存储用户的日志信息,当 Pulsar Functions 出现问题时,方便用户定位错误并调试。

综上所述:我们不难看出 Pulsar Functions 充当了一个消息处理和转运的角色。

在使用Pulsar Functions, 可以使用不同的语言来编写,比如Python, Java,Go等。编写方式主要两种:

  • 本地模式:集群外部,进行本地运行
  • 集群模式:集群内部运行(支持独立模式和集成模式)

如何使用呢?
首先,需要修改Pulsar中相关的配置:

[root@node1 bin]# cd /export/server/pulsar-2.8.1/conf/
[root@node1 conf]# vim broker.conf

内容如下:
# Enable Functions Worker Service in Broker
functionsWorkerEnabled=false   更改为:true

注意:三台节点都需要调整

接着,重启Broker即可:

cd /export/server/pulsar-2.8.1/bin
./pulsar-daemon stop broker
./pulsar-daemon start broker

最后通过:./pulsar-admin brokers list pulsar-cluster 查看broker列表

注意:三台节点都需要执行,依次都停止,然后依次启动

如何使用呢?
最后,测试是否可用

bin/pulsar-admin functions create \\
--jar examples/api-examples.jar \\
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \\
--inputs persistent://public/default/exclamation-input \\
--output persistent://public/default/exclamation-output \\
--tenant public \\
--namespace default \\
--name exclamation

输出结果:
"Created successfully"

检查是否按照预期触发函数运行:

[root@node1 pulsar-2.8.1]# bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"
hello world!

如何使用呢?

[root@node1 pulsar-2.8.1]# bin/pulsar-admin functions
Usage: pulsar-admin functions [command] [command options]
  Commands:
    localrun      Run a Pulsar Function locally, rather than deploy to a 
            Pulsar cluster)
      Usage: localrun [options]
        Options:
          --auto-ack
            Whether or not the framework acknowledges messages automatically
          --batch-builder
            BatcherBuilder provides two types of batch construction methods, 
            DEFAULT and KEY_BASED. The default value is: DEFAULT
          --broker-service-url
            The URL for Pulsar broker
          --classname
            The class name of a Pulsar Function
          --client-auth-params
            Client authentication param
          --client-auth-plugin
            Client authentication plugin using which function-process can 
            connect to broker
          --cpu
            The cpu in cores that need to be allocated per function 
            instance(applicable only to docker runtime)
          --custom-runtime-options
            A string that encodes options to customize the runtime, see docs 
            for configured runtime for details
          --custom-schema-inputs
            The map of input topics to Schema properties (as a JSON string)
          --custom-schema-outputs
            The map of input topics to Schema properties (as a JSON string)
          --custom-serde-inputs
            The map of input topics to SerDe class names (as a JSON string)

            The namespace of a Pulsar Function
        * -s, --state
            The FunctionState that needs to be put
          --tenant
            The tenant of a Pulsar Function

    trigger      Trigger the specified Pulsar Function with a supplied value
      Usage: trigger [options]
        Options:
          --fqfn
            The Fully Qualified Function Name (FQFN) for the function
          --name
            The name of a Pulsar Function
          --namespace
            The namespace of a Pulsar Function
          --tenant
            The tenant of a Pulsar Function
          --topic
            The specific topic name that the function consumes from that you 
            want to inject the data to
          --trigger-file
            The path to the file that contains the data with which you want to 
            trigger the function
          --trigger-value
            The value with which you want to trigger the function
bin/pulsar-admin functions 
属性说明: 
functions:可选值: 
    localrun: 创建本地function进行运行 
    create: 在集群模式下创建 
    delete: 删除在集群中运行的function 
    get: 获取function的相关信息 
    restart: 重启 
    stop : 停止运行 
    start: 启动 
    status: 检查状态 
    stats: 查看状态 
    list: 查看特定租户和名称空间下的所有的function 
--classname: 设置function执行类 
--jar 设置function对应的jar包 
--inputs : 输入的topic 
--output : 输出的topic 
--tenant : 设置function运行在那个租户中 
--namespace: 设置function运行在那个名称空间中 
--name : 定义function的名称 

接下来, 我们尝试编写一个function的操作, 基于Pulsar Function完成流式计算操作:

案例需求:
使用Pulsar Function读取某一个Topic中旧期(格式为:yyyy/MM/dd HH/mm/ss)数据,读取后,对数据进行日期转换(格式为:yyyy-MM-dd HH:mm:ss)

首先加入依赖:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-functions-api</artifactId>
    <version>2.8.1</version>
</dependency>

接着编写程序:

package com.toto.learn.functions;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author tuzuoquan
 * @version 1.0
 * @ClassName WordCountFunction
 * @description TODO
 * @date 2022/8/24 23:55
 **/
public class WordCountFunction implements Function<String,String> 

    private SimpleDateFormat format1 = new SimpleDateFormat("yyyy/MM/dd HH/mm/ss");
    private SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Override
    public String process(String input, Context context) throws Exception 
        Date oldDate = format1.parse(input);

        return format2.format(oldDate);
    


对项目程序进行打包部署:

将上面的jar包放到服务器
构建function

[root@node1 pulsar-2.8.1]# mkdir -p /export/server/pulsar-2.8.1/functions    (3个节点)
cd /export/server/pulsar-2.8.1/functions
将pulsar-base-1.0-SNAPSHOT.jar上传到该目录下

[root@node1 functions]# scp -r pulsar-base-1.0-SNAPSHOT.jar root@node2:$PWD
pulsar-base-1.0-SNAPSHOT.jar                               100%   15KB 692.7KB/s   00:00    
[root@node1 functions]# scp -r pulsar-base-1.0-SNAPSHOT.jar root@node3:$PWD
pulsar-base-1.0-SNAPSHOT.jar                               100%   15KB  11.8MB/s   00:00    
[root@node1 functions]#

bin/pulsar-admin functions create \\
--jar functions/pulsar-base-1.0-SNAPSHOT.jar \\
--classname com.toto.learn.functions.WordCountFunction \\
--inputs persistent://public/default/wd_input \\
--output persistent://public/default/wd_output \\
--tenant public \\
--namespace default \\
--name wordcount

"Created successfully"

启动function

trigger触发启动,并向函数发送数据测试
[root@node1 pulsar-2.8.1]# bin/pulsar-admin functions trigger --name wordcount --trigger-value "2021/10/10 15/30/30"
2021-10-10 15:30:30

从上可以看出日期格式出现了变化

此外, 也可以通过代码向input对应的Topic发送消息, 并消费output对应的Topic中数据, 也是可以看到function可以正常处理的。

2.1.2.Connector 连接器概念与使用

2.1.2.1.Pulsar Connector 连接器

虽然可以使用 Pulsar 消费者和生产者 API 编写代码(例如,从数据库同步数据时,先查询数据,再使用
Pulsar 的 API 将数据发布至 Pulsar),但这种方法耗时费力。因此,Pulsar 提出了 Connector (也称为
Pulsar IO),用于解决 Pulsar 与周边系统的集成问题,帮助用户高效完成工作。

这张图非常直观地描述了 Pulsar IO 的组成。
Pulsar IO分为输入(Input)和输出(Output)两个模块。

输入:代表数据从哪里来,通过Source实现数据输入。数据的来源可以是数据库(例如mysql、Oracle、MongoDB)、文件、日志或自定义系统等。
输出:代表数据往哪里去,通过Sink实现数据输出。数据的输出可以是数据仓库、数据库等。

而目前Pulsar支持非常多的Connector,可以参考以下几个网站:
http://pulsar.apache.org/docs/zh-CN/io-connectors/#source-connector
http://pulsar.apache.org/docs/zh-CN/io-connectors/#sink-connector
目前我们主要介绍 Pulsar flink ConnectorPulsar Flume Connector
其他的连接器的使用方式, 基本是类似的

2.1.2.2.Pulsar Connector连接器 —>Pulsar Flink Connector

Pulsar Flink Connector是Apache Pulsar和Apache Flink(数据处理引擎)的集成,它允许Flink从Pulsar读取数据
,并向Pulsar写入数据,并提供精确一次的源语义和至少一次的汇聚语义。

如何使用pulsar Flink Connector, 首先在pom中加入相关的依赖环境:(注意: 还需要添加 pulsar客户端包)

<repositories><!--代码库--> 
    <repository> 
        <id>aliyun</id> 
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url> 
        <releases><enabled>true</enabled></releases> 
        <snapshots> 
            <enabled>false</enabled> 
            <updatePolicy>never</updatePolicy> 
        </snapshots> 
     </repository> 
</repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-all</artifactId>
        <version>2.8.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-functions-api</artifactId>
        <version>2.8.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.13.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.13.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.13.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>1.13.1</version>
    </dependency>

    <dependency>
        <groupId>io.streamnative.connectors</groupId>
        <artifactId>pulsar-flink-connector_2.11</artifactId>
        <version>1.13.1.5-rc1</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.pulsar</groupId>
                <artifactId>pulsar-client-all</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <target>1.8</target>
                <source>1.8</source>
            </configuration>
        </plugin>
    </plugins>
</build>

2.1.2.3.如何在flink的流式环境中使用Pulsar: sink端

  • 前置准备:装nc工具
yum -y install nc
  • 发送数据:
nc -lk 44444

例如:

编写FlinkFromPulsarSink,内容如下:

package com.toto.learn.connector.flink;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.util.serialization.PulsarPrimitiveSchema;

import java.util.Optional;
import java.util.Properties;

/**
 * @author tuzuoquan
 * @version 1.0
 * @ClassName FlinkFromPulsarSink
 * @description TODO
 * @date 2022/8/25 23:44
 **/
public class FlinkFromPulsarSink 

    public static void main(String[] args) throws Exception 
        //1.创建flink的流式处理的核心类对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.添加source组件:监控某一个端口号,从端口号读取数据操作
        DataStreamSource<String> streamSource = env.socketTextStream("node1", 44444);

        //3.添加转换的组件
        //4.添加sink的组件: 如何将处理后的数据输出到Pulsar中
        FlinkPulsarSink<String> pulsarSink = new FlinkPulsarSink<String>(
                "pulsar://node1:6650,node2:6650,node3:6650",
                "http://node1:8080,node2:8080,node3:8080",
                Optional.of("persistent://toto_pulsar_t/toto_pulsar_n/t_topic1"),
                new Properties(),
                new PulsarPrimitiveSchema<>(String.class)
        );

        streamSource.addSink(pulsarSink);

        //5.启动flink程序
        env.execute("flinkPulsarSink");
    


观察消费者消费数据情况:

FlinkFromPulsarSink的内容如下:

package com.toto.learn.connector.flink;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.util.serialization.PulsarPrimitiveSchema;

import java.util.Optional;
import java.util.Properties;

/**
 * @author tuzuoquan
 * @version 1.0
 * @ClassName FlinkFromPulsarSink
 * @description TODO
 * @date 2022/8/25 23:44
 **/
public class FlinkFromPulsarSink 

    public static void main(String[] args) throws Exception 
        //1.创建flink的流式处理的核心类对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.添加source组件:监控某一个端口号,从端口号读取数据操作
        DataStreamSource<String> streamSource = env.socketTextStream("node1", 44444);

        //3.添加转换的组件
        //4.添加sink的组件: 如何将处理后的数据输出到Pulsar中
        FlinkPulsarSink<String> pulsarSink = new FlinkPulsarSink<String>(
                "pulsar://node1:6650,node2:6650,node3:6650",
                "http://node1:8080,node2:8080,node3:8080",
                Optional.of("persistent://toto_pulsar_t/toto_pulsar_n/t_topic1"),
                new Properties(),
                new PulsarPrimitiveSchema<>(String.class)
        );

        streamSource.addSink(pulsarSink);

        //5.启动flink程序
        env.execute("flinkPulsarSink");
    


启动nc工具:

借助:启动FlinkFromPulsarSink
nc工具发送数据的时候,可以看到:

2.1.2.4.schema flink source sink

package com.toto.learn.connector.flink;

import com.toto.learn.pojo.User;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType;
import org.apache.flink.streaming.connectors.pulsar.internal.AvroDeser;
import org.apache.flink.streaming.connectors.pulsar.internal.AvroSer;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSerializer;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper;

import java.util.Optional;
import java.util.Properties;

/**
 * @author tuzuoquan
 * @version 1.0
 * @ClassName FlinkFromPulsarSchema
 * @description 需求: 基于Flink实现读取一个POJO类型的数据, 将将数据写入到Pulsar中
 * @date 2022/8/26 0:49
 **/
public class FlinkFromPulsarSchema 

    public static 02_Pulsar的集群架构架构基本介绍Pulsar提供的组件介绍Brokers介绍Zookeeper的元数据存储基于bookKeeper持久化存储Pulsar代理

01.pulsar基本介绍多租户模式云原生架构Segmented Streams支持跨地域复制pulsar组件介绍Pulsar IO (Connector)Pulsar与kafka的对比

03_Apache Pulsar的Local与分布式集群构建Pulsar的分布式集群模式Pulsar的分布式集群模式构建启动测试

聊聊 Pulsar: Pulsar 分布式集群搭建

聊聊 Pulsar: Pulsar 分布式集群搭建

聊聊 Pulsar: Pulsar 分布式集群搭建