博文干货|在 Kotlin 中使用 Apache Pulsar

Posted ApachePulsar

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了博文干货|在 Kotlin 中使用 Apache Pulsar相关的知识,希望对你有一定的参考价值。

关于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/

本文翻译自:《Using Apache Pulsar With Kotlin》,作者 Gilles Barbier。
原文链接:https://gillesbarbier.medium.com/using-apache-pulsar-with-kotlin-3b0ab398cf52

译者简介

宋博,就职于北京百观科技有限公司,高级开发工程师,专注于微服务,云计算,大数据领域。

Apache Pulsar[1]通常被描述为下一代 Kafka,是开发人员工具集中一颗冉冉升起的新星。Pulsar 是用于 server-to-server 消息传递的多租户、高性能解决方案,通常用作可扩展应用程序的核心。

Pulsar 可以与 Kotlin[2]一起使用,因为它是用 Java 编写的。不过,它的 API 并没有考虑 Kotlin 带来的强大功能,例如数据类[3]协程[4]无反射序列化[5]

在这篇文章中,我将讨论如何通过 Kotlin 来使用 Pulsar。

为消息体使用原生序列化

在 Kotlin 中定义消息的一种默认方式是使用数据类[6],这些类的主要目的是保存数据。对于此类数据类,Kotlin 会自动提供 equals()、toString()、copy()等方法 ,从而缩短代码长度并降低出现错误的风险。

使用 Java 创建一个Pulsar 生产者[7]:

Producer<MyAvro> avroProducer = client
  .newProducer(Schema.AVRO(MyAvro.class))
  .topic(“some-avro-topic”)
  .create();

该 Schema.AVRO(MyAvro.class) 指令将内省 MyAvro Java 类并从中推断出一个 Schema。这需要校验新的生产者是否会产生与现有消费者实际兼容的消息。然而 Kotlin 数据类的 Java 实现不能很好地与 Pulsar 使用的默认序列化器配合使用。但幸运的是,从 2.7.0 版本开始,Pulsar 允许您对生产者和消费者使用自定义序列化程序。

首先,您需要安装官方 Kotlin 序列化插件[8]。使用它可以创建一个如下的消息类:

@Serializable
        data class RunTask(
             val taskName: TaskName,
             val taskId: TaskId,
        val taskInput: TaskInput,
        val taskOptions: TaskOptions,
        val taskMeta: TaskMeta
         )

注意 @Serializable 注解。
有了它,你就可以使用 RunTask.serialiser() 让序列化器在不内省的情况下工作,这将使效率大大提升!

目前,序列化插件仅支持 JSON(以及一些其他在 beta 内的格式 例如 protobuf)。所以我们还需要 avro4k[9] 库来扩展它并支持 Avro 格式。

使用这些工具,我们可以创建一个像下面这样的 Producer 任务:

import com.github.avrokotlin.avro4k.Avro
import com.github.avrokotlin.avro4k.io.AvroEncodeFormat
import io.infinitic.common.tasks.executors.messages.RunTask
import kotlinx.serialization.KSerializer
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.schema.SchemaDefinition
import org.apache.pulsar.client.api.schema.SchemaReader
import org.apache.pulsar.client.api.schema.SchemaWriter
import java.io.ByteArrayOutputStream
import java.io.InputStream

// Convert T instance to Avro schemaless binary format
fun <T : Any> writeBinary(t: T, serializer: KSerializer<T>): ByteArray 
    val out = ByteArrayOutputStream()
    Avro.default.openOutputStream(serializer) 
        encodeFormat = AvroEncodeFormat.Binary
        schema = Avro.default.schema(serializer)
    .to(out).write(t).close()

    return out.toByteArray()


// Convert Avro schemaless byte array to T instance
fun <T> readBinary(bytes: ByteArray, serializer: KSerializer<T>): T 
    val datumReader = GenericDatumReader<GenericRecord>(Avro.default.schema(serializer))
    val decoder = DecoderFactory.get().binaryDecoder(SeekableByteArrayInput(bytes), null)

    return Avro.default.fromRecord(serializer, datumReader.read(null, decoder))


// custom Pulsar SchemaReader
class RunTaskSchemaReader: SchemaReader<RunTask> 
    override fun read(bytes: ByteArray, offset: Int, length: Int) =
        read(bytes.inputStream(offset, length))

    override fun read(inputStream: InputStream) =
        readBinary(inputStream.readBytes(), RunTask.serializer())


// custom Pulsar SchemaWriter
class RunTaskSchemaWriter : SchemaWriter<RunTask> 
    override fun write(message: RunTask) = writeBinary(message, RunTask.serializer())


// custom Pulsar SchemaDefinition<RunTask>
fun runTaskSchemaDefinition(): SchemaDefinition<RunTask> =
    SchemaDefinition.builder<RunTask>()
        .withJsonDef(Avro.default.schema(RunTask.serializer()).toString())
        .withSchemaReader(RunTaskSchemaReader())
        .withSchemaWriter(RunTaskSchemaWriter())
        .withSupportSchemaVersioning(true)
        .build()

// Create an instance of Producer<RunTask>
fun runTaskProducer(client: PulsarClient): Producer<RunTask> = client
    .newProducer(Schema.AVRO(runTaskSchemaDefinition()))
    .topic("some-avro-topic")
    .create();

// Create an instance of Consumer<RunTask>
fun runTaskConsumer(client: PulsarClient): Consumer<RunTask> = client
    .newConsumer(Schema.AVRO(runTaskSchemaDefinition()))
    .topic("some-avro-topic")
    .subscribe();

密封类消息和每个 Topic 一个封装

Pulsar 每个 Topic 只允许一种类型的消息。在某些特殊情况下,这并不能满足全部需求。但这个问题可以通过使用封装模式来变通。

首先,使用密封类从一个 Topic 创建所有类型消息:

@Serializable
sealed class TaskEngineMessage() 
    abstract val taskId: TaskId


@Serializable
data class DispatchTask(
    override val taskId: TaskId,
    val taskName: TaskName,
    val methodName: MethodName,
    val methodParameterTypes: MethodParameterTypes?,
    val methodInput: MethodInput,
    val workflowId: WorkflowId?,
    val methodRunId: MethodRunId?,
    val taskMeta: TaskMeta,
    val taskOptions: TaskOptions = TaskOptions()
) : TaskEngineMessage()

@Serializable
data class CancelTask(
    override val taskId: TaskId,
    val taskOutput: MethodOutput
) : TaskEngineMessage()

@Serializable
data class TaskCanceled(
    override val taskId: TaskId,
    val taskOutput: MethodOutput,
    val taskMeta: TaskMeta
) : TaskEngineMessage()

@Serializable
data class TaskCompleted(
    override val taskId: TaskId,
    val taskName: TaskName,
    val taskOutput: MethodOutput,
    val taskMeta: TaskMeta
) : TaskEngineMessage()

然后,再为这些消息创建一个封装:

Note @Serializable
data class TaskEngineEnvelope(
    val taskId: TaskId,
    val type: TaskEngineMessageType,
    val dispatchTask: DispatchTask? = null,
    val cancelTask: CancelTask? = null,
    val taskCanceled: TaskCanceled? = null,
    val taskCompleted: TaskCompleted? = null,

    init 
        val noNull = listOfNotNull(
            dispatchTask,
            cancelTask,
            taskCanceled,
            taskCompleted
        )

        require(noNull.size == 1)
        require(noNull.first() == message())
        require(noNull.first().taskId == taskId)
    

    companion object 
        fun from(msg: TaskEngineMessage) = when (msg) 
            is DispatchTask -> TaskEngineEnvelope(
                msg.taskId,
                TaskEngineMessageType.DISPATCH_TASK,
                dispatchTask = msg
            )
            is CancelTask -> TaskEngineEnvelope(
                msg.taskId,
                TaskEngineMessageType.CANCEL_TASK,
                cancelTask = msg
            )
            is TaskCanceled -> TaskEngineEnvelope(
                msg.taskId,
                TaskEngineMessageType.TASK_CANCELED,
                taskCanceled = msg
            )
            is TaskCompleted -> TaskEngineEnvelope(
                msg.taskId,
                TaskEngineMessageType.TASK_COMPLETED,
                taskCompleted = msg
            )
        
    

    fun message(): TaskEngineMessage = when (type) 
        TaskEngineMessageType.DISPATCH_TASK -> dispatchTask!!
        TaskEngineMessageType.CANCEL_TASK -> cancelTask!!
        TaskEngineMessageType.TASK_CANCELED -> taskCanceled!!
        TaskEngineMessageType.TASK_COMPLETED -> taskCompleted!!
    


enum class TaskEngineMessageType 
    CANCEL_TASK,
    DISPATCH_TASK,
    TASK_CANCELED,
    TASK_COMPLETED

请注意 Kotlin 如何优雅地检查init! 可以借助 TaskEngineEnvelope.from(msg) 很容易创建一个封装,并通过envelope.message()返回原始消息。

为什么这里添加了一个显式 taskId 值,而非使用一个全局字段message:TaskEngineMessage,并且针对每种消息类型使用一个字段呢?是因为通过这种方式,我就可以借助 taskId 或 type,亦或者两者相结合的方式使用PulsarSQL[10] 来获取这个 Topic 的信息。

通过协程来构建 Worker

在普通 Java 中使用 Thread 很复杂且容易出错。好在 Koltin 提供了 coroutines[11]——一种更简单的异步处理抽象——和 channels[12]——一种在协程之间传输数据的便捷方式。

我可以通过以下方式创建一个 Worker:

单个("task-engine-message-puller")专用于从 Pulsar 拉取消息的协程N 个协程 ( "task-engine-$i") 并行处理消息单个("task-engine-message-acknoldeger")处理后确认 Pulsar 消息的协程

有很多个类似于这样的进程后我已经添加了一个 logChannel 用来采集日志。请注意,为了能够在与接收它的协程不同的协程中确认 Pulsar 消息,我需要将TaskEngineMessage封装到包含Pulsar messageIdMessageToProcess<TaskEngineMessage>中:

typealias TaskEngineMessageToProcess = MessageToProcess<TaskEngineMessage>

fun CoroutineScope.startPulsarTaskEngineWorker(
    taskEngineConsumer: Consumer<TaskEngineEnvelope>,
    taskEngine: TaskEngine,
    logChannel: SendChannel<TaskEngineMessageToProcess>?,
    enginesNumber: Int
) = launch(Dispatchers.IO) 

    val taskInputChannel = Channel<TaskEngineMessageToProcess>()
    val taskResultsChannel = Channel<TaskEngineMessageToProcess>()

    // coroutine dedicated to pulsar message pulling
    launch(CoroutineName("task-engine-message-puller")) 
        while (isActive) 
            val message: Message<TaskEngineEnvelope> = taskEngineConsumer.receiveAsync().await()

            try 
                val envelope = readBinary(message.data, TaskEngineEnvelope.serializer())
                taskInputChannel.send(MessageToProcess(envelope.message(), message.messageId))
             catch (e: Exception) 
                taskEngineConsumer.negativeAcknowledge(message.messageId)
                throw e
            
        
    

    // coroutines dedicated to Task Engine
    repeat(enginesNumber) 
        launch(CoroutineName("task-engine-$it")) 
            for (messageToProcess in taskInputChannel) 
                try 
                    messageToProcess.output = taskEngine.handle(messageToProcess.message)
                 catch (e: Exception) 
                    messageToProcess.exception = e
                
                taskResultsChannel.send(messageToProcess)
            
        
    

    // coroutine dedicated to pulsar message acknowledging
    launch(CoroutineName("task-engine-message-acknowledger")) 
        for (messageToProcess in taskResultsChannel) 
            if (messageToProcess.exception == null) 
                taskEngineConsumer.acknowledgeAsync(messageToProcess.messageId).await()
             else 
                taskEngineConsumer.negativeAcknowledge(messageToProcess.messageId)
            
            logChannel?.send(messageToProcess)
        
    


data class MessageToProcess<T> (
    val message: T,
    val messageId: MessageId,
    var exception: Exception? = null,
    var output: Any? = null
)

总结

在本文中,我们介绍了如何在 Kotlin 中实现的 Pulsar 使用方法:

代码消息(包括接收多种类型消息的 Pulsar Topic 的封装);创建 Pulsar 的生产者/消费者;构建一个能够并行处理许多消息的简单 Worker。

引用链接

[1] Apache Pulsar: https://pulsar.apache.org/zh-CN/
[2] Kotlin: https://kotlinlang.org/
[3] 数据类: https://kotlinlang.org/docs/reference/data-classes.html
[4] 协程: https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html
[5] 无反射序列化: https://kotlinlang.org/docs/reference/serialization.html#serialization
[6] 数据类: https://kotlinlang.org/docs/reference/data-classes.html
[7] Pulsar 生产者: https://pulsar.apache.org/docs/en/client-libraries-java/#schema-example
[8] Kotlin 序列化插件: https://github.com/Kotlin/kotlinx.serialization
[9] avro4k: https://github.com/avro-kotlin/avro4k
[10] PulsarSQL: https://pulsar.apache.org/docs/en/sql-overview/
[11] coroutines: https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html
[12] channels: https://kotlinlang.org/docs/reference/coroutines/channels.html


▼ 关注「Apache Pulsar」,获取更多技术干货 ▼

干货丨Kotlin在Spring Boot中的应用


随着Kotlin在移动端开发的普及,它也逐步走入后端开发者的视野。Kotlin是JVM体系的语言,和Java有着良好的互操作性,上手较容易,且可以使用Java强大的生态,其还具有函数式编程的优点。另外,Spring Initializr提供了对Java、Kotlin语言的支持。

Kotlin是JetBrains公司开发的,目前流行的IntelliJ IDEA软件也是该公司开发的。IDEA对Kotlin支持较好,可以将Java代码转换为Kotlin代码。IDEA还支持Java、Kotlin混合编程,历史代码使用Java编写,新的代码可以尝试使用Kotlin编写。

本文作者袁康研究使用Kotlin、Spring Boot做后端开发,取得了不错的效果。市面上介绍使用Kotlin进行后端开发的图书和文章也比较少,袁康在大量实践的基础上,萌生了写一本书的想法,希望和更多的Java开发人员分享Kotlin在后端开发中的实践经验。

《基于Kotlin的Spring Boot微服务实战》一书因此而生。

干货丨Kotlin在Spring Boot中的应用

本文选自书中“Kotlin在常用中间件中的应用”一章,这一章主要介绍Kotlin在常用中间件中的应用,通过示例程序,将展示Kotlin集成Spring Boot、Redis、JPA、QueryDSL、MongoDB、Spring Security、RocketMQ、Elasticsearch、Swagger的方法。读者可以掌握使用Kotlin操作常用中间件的技巧。

下面我们一起来学习了解下本章中“Kotlin集成Spring Boot”部分。


Kotlin集成Spring Boot

Spring Boot是由Pivotal团队开发的,设计的目的是简化Spring应用的初始搭建和开发过程。

Spring Boot介绍

从2014年4月发布1.0.0.RELEASE到现在的最新版本2.2.2.RELEASE,从最初的基于Spring 4到现在基于Spring 5,从同步阻塞编程到异步响应式编程,Spring Boot经历了数十个RELEASE版本,发展迅速,表现稳定,其各版本发行时间如下表。越来越多的企业在生产中使用Spring Boot进行企业级应用开发。

 Spring Boot、Spring版本的发行时间

时间

Spring Boot版本
Spring版本
2014年
1.0.x
4.0.x.RELEASE
2014—2015年
1.1.x
4.0.x.RELEASE
2015年
1.2.x
4.1.x.RELEASE
2015—2016年
1.3.x
4.2.x.RELEASE
2016—2017年
1.4.x
4.3.x.RELEASE
2017—2018年
1.5.x
4.3.x.RELEASE
2018—2019年
2.0.x
5.0.x.RELEASE
2018—2020年
2.1.x
5.1.x.RELEASE
2019—2020年
2.2.x
5.2.x.RELEASE

Spring Boot基于约定优于配置的思想,让开发人员不必在配置与逻辑业务之间进行思维的切换。Spring Boot简化了Spring应用的开发,不再需要XML配置文件,使用注解方式提高了开发效率。Spring Boot默认配置了很多框架的使用方式,提供starter包,简化配置,开箱即用。Spring Boot尽可能地根据项目依赖来自动配置Spring框架。Spring Boot提供了可以直接在生产环境中使用的功能,如性能指标、应用信息和应用健康检查。

Spring Boot内嵌Tomcat、Jetty、Undertow等容器,直接用Jar包的方式进行部署,而传统的Spring应用需要用war包方式进行部署。Spring Boot的部署方法非常简单,一行命令就可以部署一个Spring Boot应用;可以很方便地用Docker、Kubernetes进行部署,适用于云原生应用,使系统的扩容、运维更加方便。

Spring Boot广泛应用于企业级应用和微服务开发。Spring Cloud微服务框架就是在Spring Boot基础上开发的。此外,很多开源项目提供了Spring Boot的集成,如rocketmq- spring-boot-starter,方便用户使用。


用Kotlin开发一个Spring Boot项目

在Spring网站上创建一个基于Maven的Kotlin Spring Boot项目。填写Group、Artifact,选择依赖的包Spring Web,然后下载到本地,如图。

干货丨Kotlin在Spring Boot中的应用

Spring Initializr

解压文件,用IDEA打开这个工程,可以看到pom文件如下:该pom文件定义了父依赖,通过父依赖可以自动找到dependencies中依赖包的版本号;此外,还指定了Kotlin的版本是1.3.61,Spring Boot的版本是2.2.2.RELEASE。

 1  <?xml version="1.0" encoding="UTF-8"?>  
2  <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/ 2001/XMLSchema-instance"  
3      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

4      <modelVersion>4.0.0</modelVersion>  
5      <!— 父pom,定义包的依赖 -->
6      <parent>  
7          <groupId>org.springframework.boot</groupId>  
8          <artifactId>spring-boot-starter-parent</artifactId>  
9          <version>2.2.2.RELEASE</version>  
10          <relativePath/> <!-- lookup parent from repository -->  
11      </parent>  
12      <!— 子工程相关信息 -->
13      <groupId>io.kang.example</groupId>  
14      <artifactId>kolinspringboot</artifactId>  
15      <version>0.0.1-SNAPSHOT</version>  
16      <name>kolinspringboot</name>  
17      <description>Demo project for Spring Boot</description>  
18      <!— 定义属性 -->
19      <properties>  
20          <java.version>1.8</java.version>  
21          <kotlin.version>1.3.61</kotlin.version>  
22      </properties>  
23      <dependencies>  
24           <!— Spring Boot启动包 -->
25          <dependency>  
26              <groupId>org.springframework.boot</groupId>  
27              <artifactId>spring-boot-starter</artifactId>  
28          </dependency>  
29           <!— Kotlin相关依赖包 -->
30          <dependency>  
31              <groupId>org.jetbrains.kotlin</groupId>  
32              <artifactId>kotlin-reflect</artifactId>  
33          </dependency>  
34          <dependency>  
35              <groupId>org.jetbrains.kotlin</groupId>  
36              <artifactId>kotlin-stdlib-jdk8</artifactId>  
37          </dependency>  
38          <dependency>  
39              <groupId>org.springframework.boot</groupId>  
40              <artifactId>spring-boot-starter-test</artifactId>  
41              <scope>test</scope>  
42              <exclusions>  
43                  <exclusion>  
44                      <groupId>org.junit.vintage</groupId>  
45                      <artifactId>junit-vintage-engine</artifactId>  
46                  </exclusion>  
47              </exclusions>  
48          </dependency>  
49      </dependencies>  
50      <build>  
51           <!— Kotlin源码路径 -->
52          <sourceDirectory>${project.basedir}/src/main/kotlin</sourceDirectory>  
53          <testSourceDirectory>${project.basedir}/src/test/kotlin</testSourceDirectory>
54          <plugins>  
55               <!— Spring Boot Maven打包插件 -->
56              <plugin>  
57                  <groupId>org.springframework.boot</groupId>  
58                  <artifactId>spring-boot-maven-plugin</artifactId>  
59              </plugin> 
60               <!— Kotlin Maven插件 -->
61              <plugin>  
62                  <groupId>org.jetbrains.kotlin</groupId>  
63                 <artifactId>kotlin-maven-plugin</artifactId>  
64                  <configuration>  
65                      <args>  
66                          <arg>-Xjsr305=strict</arg>  
67                      </args>  
68                      <compilerPlugins>  
69                          <plugin>spring</plugin>  
70                      </compilerPlugins>  
71                  </configuration>  
72                  <dependencies>  
73                      <dependency>  
74                          <groupId>org.jetbrains.kotlin</groupId>  
75                          <artifactId>kotlin-maven-allopen</artifactId>  
76                          <version>${kotlin.version}</version>  
77                      </dependency>  
78                  </dependencies>  
79              </plugin>  
80          </plugins>  
81      </build>  
82  </project>  


下面用Kotlin编写一个简单的Spring Boot Web应用:定义一个Spring Boot启动类,加上@SpringBootApplication注解;定义一个接口,通过http://localhost:8080/index可以访问这个接口;相关的配置放在application.yml中。

和用Java开发Spring Boot项目类似,Kotlin在main函数中启动应用,用GetMapping定义一个get接口,使用@RestController后就不用为每个方法添加@ResponseBody注解了。Kotlin的语法更加简洁。

KotlinSpringbootApplication.kt的代码如下:

1  @SpringBootApplication  
2  class KotlinSpringbootApplication  
3  // 主函数,启动类
4  fun main(args: Array<String>) {  
5      runApplication<KotlinSpringbootApplication>(*args);  
6  }  


IndexController.kt的代码如下:

1  @RestController  
2  class IndexController {  
3      // 定义index接口
4      @GetMapping("/index")  
5      fun index(): String {  
6         return "Hello, Kotlin for Spring Boot!!"  
7      }  
8  } 


通过浏览器访问“index”接口,显示“Hello,Kotlin for Spring Boot!!”。仅通过短短几行代码就开发了一个简单的Kotlin Web应用,非常便捷。

干货丨Kotlin在Spring Boot中的应用


更多精彩内容欢迎大家阅读《基于Kotlin的Spring Boot微服务实战》一书。

干货丨Kotlin在Spring Boot中的应用

本书专注于Kotlin在Spring Boot微服务开发中的实践,介绍了函数式编程思想、Kotlin的语法、Kotlin在常用中间件中的应用,以及其在微服务注册中心、微服务配置中心、微服务网关、Spring Cloud Alibaba、服务监控和服务链路监控方面的应用。还给出了详细的实例代码和一个完整的博客示例,可以帮助读者使用Kotlin开发基于Spring Boot微服务的程序。

干货丨Kotlin在Spring Boot中的应用

扫码获取本书详情

干货丨Kotlin在Spring Boot中的应用

   
     
     
   
如果喜欢本文
欢迎 在看留言分享至朋友圈 三连

 热文推荐  








▼点击阅读原文,了解本书详情!

以上是关于博文干货|在 Kotlin 中使用 Apache Pulsar的主要内容,如果未能解决你的问题,请参考以下文章

干货丨Kotlin在Spring Boot中的应用

干货分享| Kotlin VS Java 赛跑,谁能赢得比赛?

#yyds干货盘点# Kotlin随查指南,妈妈再也不担心我不会Ctrl+F了

我收到错误:使用 Kotlin 时,Apache Beam 中 MapElements 转换的“重载分辨率歧义”

Kotlin 1.2 新特性

学习干货最新Android Kotlin入门教程指南,2023最新!