博文干货|在 Kotlin 中使用 Apache Pulsar
Posted ApachePulsar
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了博文干货|在 Kotlin 中使用 Apache Pulsar相关的知识,希望对你有一定的参考价值。
关于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/
本文翻译自:《Using Apache Pulsar With Kotlin》,作者 Gilles Barbier。
原文链接:https://gillesbarbier.medium.com/using-apache-pulsar-with-kotlin-3b0ab398cf52
译者简介
宋博,就职于北京百观科技有限公司,高级开发工程师,专注于微服务,云计算,大数据领域。
Apache Pulsar[1]通常被描述为下一代 Kafka,是开发人员工具集中一颗冉冉升起的新星。Pulsar 是用于 server-to-server 消息传递的多租户、高性能解决方案,通常用作可扩展应用程序的核心。
Pulsar 可以与 Kotlin[2]一起使用,因为它是用 Java 编写的。不过,它的 API 并没有考虑 Kotlin 带来的强大功能,例如数据类[3]、协程[4]或无反射序列化[5]。
在这篇文章中,我将讨论如何通过 Kotlin 来使用 Pulsar。
为消息体使用原生序列化
在 Kotlin 中定义消息的一种默认方式是使用数据类[6],这些类的主要目的是保存数据。对于此类数据类,Kotlin 会自动提供 equals()、toString()、copy()等方法 ,从而缩短代码长度并降低出现错误的风险。
使用 Java 创建一个Pulsar 生产者[7]:
Producer<MyAvro> avroProducer = client
.newProducer(Schema.AVRO(MyAvro.class))
.topic(“some-avro-topic”)
.create();
该 Schema.AVRO(MyAvro.class) 指令将内省 MyAvro Java 类并从中推断出一个 Schema。这需要校验新的生产者是否会产生与现有消费者实际兼容的消息。然而 Kotlin 数据类的 Java 实现不能很好地与 Pulsar 使用的默认序列化器配合使用。但幸运的是,从 2.7.0 版本开始,Pulsar 允许您对生产者和消费者使用自定义序列化程序。
首先,您需要安装官方 Kotlin 序列化插件[8]。使用它可以创建一个如下的消息类:
@Serializable
data class RunTask(
val taskName: TaskName,
val taskId: TaskId,
val taskInput: TaskInput,
val taskOptions: TaskOptions,
val taskMeta: TaskMeta
)
注意 @Serializable 注解。
有了它,你就可以使用 RunTask.serialiser() 让序列化器在不内省的情况下工作,这将使效率大大提升!
目前,序列化插件仅支持 JSON(以及一些其他在 beta 内的格式 例如 protobuf)。所以我们还需要 avro4k[9] 库来扩展它并支持 Avro 格式。
使用这些工具,我们可以创建一个像下面这样的 Producer
import com.github.avrokotlin.avro4k.Avro
import com.github.avrokotlin.avro4k.io.AvroEncodeFormat
import io.infinitic.common.tasks.executors.messages.RunTask
import kotlinx.serialization.KSerializer
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.schema.SchemaDefinition
import org.apache.pulsar.client.api.schema.SchemaReader
import org.apache.pulsar.client.api.schema.SchemaWriter
import java.io.ByteArrayOutputStream
import java.io.InputStream
// Convert T instance to Avro schemaless binary format
fun <T : Any> writeBinary(t: T, serializer: KSerializer<T>): ByteArray
val out = ByteArrayOutputStream()
Avro.default.openOutputStream(serializer)
encodeFormat = AvroEncodeFormat.Binary
schema = Avro.default.schema(serializer)
.to(out).write(t).close()
return out.toByteArray()
// Convert Avro schemaless byte array to T instance
fun <T> readBinary(bytes: ByteArray, serializer: KSerializer<T>): T
val datumReader = GenericDatumReader<GenericRecord>(Avro.default.schema(serializer))
val decoder = DecoderFactory.get().binaryDecoder(SeekableByteArrayInput(bytes), null)
return Avro.default.fromRecord(serializer, datumReader.read(null, decoder))
// custom Pulsar SchemaReader
class RunTaskSchemaReader: SchemaReader<RunTask>
override fun read(bytes: ByteArray, offset: Int, length: Int) =
read(bytes.inputStream(offset, length))
override fun read(inputStream: InputStream) =
readBinary(inputStream.readBytes(), RunTask.serializer())
// custom Pulsar SchemaWriter
class RunTaskSchemaWriter : SchemaWriter<RunTask>
override fun write(message: RunTask) = writeBinary(message, RunTask.serializer())
// custom Pulsar SchemaDefinition<RunTask>
fun runTaskSchemaDefinition(): SchemaDefinition<RunTask> =
SchemaDefinition.builder<RunTask>()
.withJsonDef(Avro.default.schema(RunTask.serializer()).toString())
.withSchemaReader(RunTaskSchemaReader())
.withSchemaWriter(RunTaskSchemaWriter())
.withSupportSchemaVersioning(true)
.build()
// Create an instance of Producer<RunTask>
fun runTaskProducer(client: PulsarClient): Producer<RunTask> = client
.newProducer(Schema.AVRO(runTaskSchemaDefinition()))
.topic("some-avro-topic")
.create();
// Create an instance of Consumer<RunTask>
fun runTaskConsumer(client: PulsarClient): Consumer<RunTask> = client
.newConsumer(Schema.AVRO(runTaskSchemaDefinition()))
.topic("some-avro-topic")
.subscribe();
密封类消息和每个 Topic 一个封装
Pulsar 每个 Topic 只允许一种类型的消息。在某些特殊情况下,这并不能满足全部需求。但这个问题可以通过使用封装模式来变通。
首先,使用密封类从一个 Topic 创建所有类型消息:
@Serializable
sealed class TaskEngineMessage()
abstract val taskId: TaskId
@Serializable
data class DispatchTask(
override val taskId: TaskId,
val taskName: TaskName,
val methodName: MethodName,
val methodParameterTypes: MethodParameterTypes?,
val methodInput: MethodInput,
val workflowId: WorkflowId?,
val methodRunId: MethodRunId?,
val taskMeta: TaskMeta,
val taskOptions: TaskOptions = TaskOptions()
) : TaskEngineMessage()
@Serializable
data class CancelTask(
override val taskId: TaskId,
val taskOutput: MethodOutput
) : TaskEngineMessage()
@Serializable
data class TaskCanceled(
override val taskId: TaskId,
val taskOutput: MethodOutput,
val taskMeta: TaskMeta
) : TaskEngineMessage()
@Serializable
data class TaskCompleted(
override val taskId: TaskId,
val taskName: TaskName,
val taskOutput: MethodOutput,
val taskMeta: TaskMeta
) : TaskEngineMessage()
然后,再为这些消息创建一个封装:
Note @Serializable
data class TaskEngineEnvelope(
val taskId: TaskId,
val type: TaskEngineMessageType,
val dispatchTask: DispatchTask? = null,
val cancelTask: CancelTask? = null,
val taskCanceled: TaskCanceled? = null,
val taskCompleted: TaskCompleted? = null,
)
init
val noNull = listOfNotNull(
dispatchTask,
cancelTask,
taskCanceled,
taskCompleted
)
require(noNull.size == 1)
require(noNull.first() == message())
require(noNull.first().taskId == taskId)
companion object
fun from(msg: TaskEngineMessage) = when (msg)
is DispatchTask -> TaskEngineEnvelope(
msg.taskId,
TaskEngineMessageType.DISPATCH_TASK,
dispatchTask = msg
)
is CancelTask -> TaskEngineEnvelope(
msg.taskId,
TaskEngineMessageType.CANCEL_TASK,
cancelTask = msg
)
is TaskCanceled -> TaskEngineEnvelope(
msg.taskId,
TaskEngineMessageType.TASK_CANCELED,
taskCanceled = msg
)
is TaskCompleted -> TaskEngineEnvelope(
msg.taskId,
TaskEngineMessageType.TASK_COMPLETED,
taskCompleted = msg
)
fun message(): TaskEngineMessage = when (type)
TaskEngineMessageType.DISPATCH_TASK -> dispatchTask!!
TaskEngineMessageType.CANCEL_TASK -> cancelTask!!
TaskEngineMessageType.TASK_CANCELED -> taskCanceled!!
TaskEngineMessageType.TASK_COMPLETED -> taskCompleted!!
enum class TaskEngineMessageType
CANCEL_TASK,
DISPATCH_TASK,
TASK_CANCELED,
TASK_COMPLETED
请注意 Kotlin 如何优雅地检查init
! 可以借助 TaskEngineEnvelope.from(msg)
很容易创建一个封装,并通过envelope.message()
返回原始消息。
为什么这里添加了一个显式 taskId 值,而非使用一个全局字段message:TaskEngineMessage
,并且针对每种消息类型使用一个字段呢?是因为通过这种方式,我就可以借助 taskId 或 type,亦或者两者相结合的方式使用PulsarSQL[10] 来获取这个 Topic 的信息。
通过协程来构建 Worker
在普通 Java 中使用 Thread 很复杂且容易出错。好在 Koltin 提供了 coroutines[11]——一种更简单的异步处理抽象——和 channels[12]——一种在协程之间传输数据的便捷方式。
我可以通过以下方式创建一个 Worker:
•单个("task-engine-message-puller")专用于从 Pulsar 拉取消息的协程•N 个协程 ( "task-engine-$i") 并行处理消息•单个("task-engine-message-acknoldeger")处理后确认 Pulsar 消息的协程
有很多个类似于这样的进程后我已经添加了一个 logChannel 用来采集日志。请注意,为了能够在与接收它的协程不同的协程中确认 Pulsar 消息,我需要将TaskEngineMessage
封装到包含Pulsar messageId
的MessageToProcess<TaskEngineMessage>
中:
typealias TaskEngineMessageToProcess = MessageToProcess<TaskEngineMessage>
fun CoroutineScope.startPulsarTaskEngineWorker(
taskEngineConsumer: Consumer<TaskEngineEnvelope>,
taskEngine: TaskEngine,
logChannel: SendChannel<TaskEngineMessageToProcess>?,
enginesNumber: Int
) = launch(Dispatchers.IO)
val taskInputChannel = Channel<TaskEngineMessageToProcess>()
val taskResultsChannel = Channel<TaskEngineMessageToProcess>()
// coroutine dedicated to pulsar message pulling
launch(CoroutineName("task-engine-message-puller"))
while (isActive)
val message: Message<TaskEngineEnvelope> = taskEngineConsumer.receiveAsync().await()
try
val envelope = readBinary(message.data, TaskEngineEnvelope.serializer())
taskInputChannel.send(MessageToProcess(envelope.message(), message.messageId))
catch (e: Exception)
taskEngineConsumer.negativeAcknowledge(message.messageId)
throw e
// coroutines dedicated to Task Engine
repeat(enginesNumber)
launch(CoroutineName("task-engine-$it"))
for (messageToProcess in taskInputChannel)
try
messageToProcess.output = taskEngine.handle(messageToProcess.message)
catch (e: Exception)
messageToProcess.exception = e
taskResultsChannel.send(messageToProcess)
// coroutine dedicated to pulsar message acknowledging
launch(CoroutineName("task-engine-message-acknowledger"))
for (messageToProcess in taskResultsChannel)
if (messageToProcess.exception == null)
taskEngineConsumer.acknowledgeAsync(messageToProcess.messageId).await()
else
taskEngineConsumer.negativeAcknowledge(messageToProcess.messageId)
logChannel?.send(messageToProcess)
data class MessageToProcess<T> (
val message: T,
val messageId: MessageId,
var exception: Exception? = null,
var output: Any? = null
)
总结
在本文中,我们介绍了如何在 Kotlin 中实现的 Pulsar 使用方法:
•代码消息(包括接收多种类型消息的 Pulsar Topic 的封装);•创建 Pulsar 的生产者/消费者;•构建一个能够并行处理许多消息的简单 Worker。
引用链接
[1]
Apache Pulsar: https://pulsar.apache.org/zh-CN/[2]
Kotlin: https://kotlinlang.org/[3]
数据类: https://kotlinlang.org/docs/reference/data-classes.html[4]
协程: https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html[5]
无反射序列化: https://kotlinlang.org/docs/reference/serialization.html#serialization[6]
数据类: https://kotlinlang.org/docs/reference/data-classes.html[7]
Pulsar 生产者: https://pulsar.apache.org/docs/en/client-libraries-java/#schema-example[8]
Kotlin 序列化插件: https://github.com/Kotlin/kotlinx.serialization[9]
avro4k: https://github.com/avro-kotlin/avro4k[10]
PulsarSQL: https://pulsar.apache.org/docs/en/sql-overview/[11]
coroutines: https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html[12]
channels: https://kotlinlang.org/docs/reference/coroutines/channels.html
▼ 关注「Apache Pulsar」,获取更多技术干货 ▼
干货丨Kotlin在Spring Boot中的应用
随着Kotlin在移动端开发的普及,它也逐步走入后端开发者的视野。Kotlin是JVM体系的语言,和Java有着良好的互操作性,上手较容易,且可以使用Java强大的生态,其还具有函数式编程的优点。另外,Spring Initializr提供了对Java、Kotlin语言的支持。
Kotlin是JetBrains公司开发的,目前流行的IntelliJ IDEA软件也是该公司开发的。IDEA对Kotlin支持较好,可以将Java代码转换为Kotlin代码。IDEA还支持Java、Kotlin混合编程,历史代码使用Java编写,新的代码可以尝试使用Kotlin编写。
本文作者袁康研究使用Kotlin、Spring Boot做后端开发,取得了不错的效果。市面上介绍使用Kotlin进行后端开发的图书和文章也比较少,袁康在大量实践的基础上,萌生了写一本书的想法,希望和更多的Java开发人员分享Kotlin在后端开发中的实践经验。
《基于Kotlin的Spring Boot微服务实战》一书因此而生。
本文选自书中“Kotlin在常用中间件中的应用”一章,这一章主要介绍Kotlin在常用中间件中的应用,通过示例程序,将展示Kotlin集成Spring Boot、Redis、JPA、QueryDSL、MongoDB、Spring Security、RocketMQ、Elasticsearch、Swagger的方法。读者可以掌握使用Kotlin操作常用中间件的技巧。
Kotlin集成Spring Boot
Spring Boot是由Pivotal团队开发的,设计的目的是简化Spring应用的初始搭建和开发过程。
Spring Boot介绍
从2014年4月发布1.0.0.RELEASE到现在的最新版本2.2.2.RELEASE,从最初的基于Spring 4到现在基于Spring 5,从同步阻塞编程到异步响应式编程,Spring Boot经历了数十个RELEASE版本,发展迅速,表现稳定,其各版本发行时间如下表。越来越多的企业在生产中使用Spring Boot进行企业级应用开发。
Spring Boot、Spring版本的发行时间
时间 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Spring Boot基于约定优于配置的思想,让开发人员不必在配置与逻辑业务之间进行思维的切换。Spring Boot简化了Spring应用的开发,不再需要XML配置文件,使用注解方式提高了开发效率。Spring Boot默认配置了很多框架的使用方式,提供starter包,简化配置,开箱即用。Spring Boot尽可能地根据项目依赖来自动配置Spring框架。Spring Boot提供了可以直接在生产环境中使用的功能,如性能指标、应用信息和应用健康检查。
Spring Boot内嵌Tomcat、Jetty、Undertow等容器,直接用Jar包的方式进行部署,而传统的Spring应用需要用war包方式进行部署。Spring Boot的部署方法非常简单,一行命令就可以部署一个Spring Boot应用;可以很方便地用Docker、Kubernetes进行部署,适用于云原生应用,使系统的扩容、运维更加方便。
用Kotlin开发一个Spring Boot项目
在Spring网站上创建一个基于Maven的Kotlin Spring Boot项目。填写Group、Artifact,选择依赖的包Spring Web,然后下载到本地,如图。
Spring Initializr
解压文件,用IDEA打开这个工程,可以看到pom文件如下:该pom文件定义了父依赖,通过父依赖可以自动找到dependencies中依赖包的版本号;此外,还指定了Kotlin的版本是1.3.61,Spring Boot的版本是2.2.2.RELEASE。
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/ 2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5 <!— 父pom,定义包的依赖 -->
6 <parent>
7 <groupId>org.springframework.boot</groupId>
8 <artifactId>spring-boot-starter-parent</artifactId>
9 <version>2.2.2.RELEASE</version>
10 <relativePath/> <!-- lookup parent from repository -->
11 </parent>
12 <!— 子工程相关信息 -->
13 <groupId>io.kang.example</groupId>
14 <artifactId>kolinspringboot</artifactId>
15 <version>0.0.1-SNAPSHOT</version>
16 <name>kolinspringboot</name>
17 <description>Demo project for Spring Boot</description>
18 <!— 定义属性 -->
19 <properties>
20 <java.version>1.8</java.version>
21 <kotlin.version>1.3.61</kotlin.version>
22 </properties>
23 <dependencies>
24 <!— Spring Boot启动包 -->
25 <dependency>
26 <groupId>org.springframework.boot</groupId>
27 <artifactId>spring-boot-starter</artifactId>
28 </dependency>
29 <!— Kotlin相关依赖包 -->
30 <dependency>
31 <groupId>org.jetbrains.kotlin</groupId>
32 <artifactId>kotlin-reflect</artifactId>
33 </dependency>
34 <dependency>
35 <groupId>org.jetbrains.kotlin</groupId>
36 <artifactId>kotlin-stdlib-jdk8</artifactId>
37 </dependency>
38 <dependency>
39 <groupId>org.springframework.boot</groupId>
40 <artifactId>spring-boot-starter-test</artifactId>
41 <scope>test</scope>
42 <exclusions>
43 <exclusion>
44 <groupId>org.junit.vintage</groupId>
45 <artifactId>junit-vintage-engine</artifactId>
46 </exclusion>
47 </exclusions>
48 </dependency>
49 </dependencies>
50 <build>
51 <!— Kotlin源码路径 -->
52 <sourceDirectory>${project.basedir}/src/main/kotlin</sourceDirectory>
53 <testSourceDirectory>${project.basedir}/src/test/kotlin</testSourceDirectory>
54 <plugins>
55 <!— Spring Boot Maven打包插件 -->
56 <plugin>
57 <groupId>org.springframework.boot</groupId>
58 <artifactId>spring-boot-maven-plugin</artifactId>
59 </plugin>
60 <!— Kotlin Maven插件 -->
61 <plugin>
62 <groupId>org.jetbrains.kotlin</groupId>
63 <artifactId>kotlin-maven-plugin</artifactId>
64 <configuration>
65 <args>
66 <arg>-Xjsr305=strict</arg>
67 </args>
68 <compilerPlugins>
69 <plugin>spring</plugin>
70 </compilerPlugins>
71 </configuration>
72 <dependencies>
73 <dependency>
74 <groupId>org.jetbrains.kotlin</groupId>
75 <artifactId>kotlin-maven-allopen</artifactId>
76 <version>${kotlin.version}</version>
77 </dependency>
78 </dependencies>
79 </plugin>
80 </plugins>
81 </build>
82 </project>
下面用Kotlin编写一个简单的Spring Boot Web应用:定义一个Spring Boot启动类,加上@SpringBootApplication注解;定义一个接口,通过http://localhost:8080/index可以访问这个接口;相关的配置放在application.yml中。
和用Java开发Spring Boot项目类似,Kotlin在main函数中启动应用,用GetMapping定义一个get接口,使用@RestController后就不用为每个方法添加@ResponseBody注解了。Kotlin的语法更加简洁。
KotlinSpringbootApplication.kt的代码如下:
1 @SpringBootApplication
2 class KotlinSpringbootApplication
3 // 主函数,启动类
4 fun main(args: Array<String>) {
5 runApplication<KotlinSpringbootApplication>(*args);
6 }
IndexController.kt的代码如下:
1 @RestController
2 class IndexController {
3 // 定义index接口
4 @GetMapping("/index")
5 fun index(): String {
6 return "Hello, Kotlin for Spring Boot!!"
7 }
8 }
通过浏览器访问“index”接口,显示“Hello,Kotlin for Spring Boot!!”。仅通过短短几行代码就开发了一个简单的Kotlin Web应用,非常便捷。
更多精彩内容欢迎大家阅读《基于Kotlin的Spring Boot微服务实战》一书。
本书专注于Kotlin在Spring Boot微服务开发中的实践,介绍了函数式编程思想、Kotlin的语法、Kotlin在常用中间件中的应用,以及其在微服务注册中心、微服务配置中心、微服务网关、Spring Cloud Alibaba、服务监控和服务链路监控方面的应用。还给出了详细的实例代码和一个完整的博客示例,可以帮助读者使用Kotlin开发基于Spring Boot微服务的程序。
(扫码获取本书详情)
▼
如果喜欢本文 欢迎 在看丨留言丨分享至朋友圈 三连
热文推荐
▼点击阅读原文,了解本书详情!
以上是关于博文干货|在 Kotlin 中使用 Apache Pulsar的主要内容,如果未能解决你的问题,请参考以下文章
干货分享| Kotlin VS Java 赛跑,谁能赢得比赛?
#yyds干货盘点# Kotlin随查指南,妈妈再也不担心我不会Ctrl+F了