为Go语言开发者介绍NATS

Posted 容器时代

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为Go语言开发者介绍NATS相关的知识,希望对你有一定的参考价值。


2018上海KubeCon

Kubernetes的全球盛会KubeCon将于11月13日~11月15日在中国上海隆重举行,此论坛汇集了众多在开源和云原生领域有卓越贡献的应用人员和技术专家。大会吸引了超过5000名行业精英前来参会,大家齐聚一堂相互分享经验,聚焦创新,并讨论云原生计算的未来。KubeCon + CloudNativeCon中国论坛将召开100多个分组会议,包括技术会议、深度学习、案例研究等。现在通过容器时代专属报名通道报名可以享受超大折扣哦,详情请戳此处链接:




写在前面

本文将为准备构建分布式系统和微服务的Go语言开发者介绍NATS消息系统。当你构建分布式应用时,消息系统对应用间通信而言非常关键,尤其是事件驱动架构的异步通信方式。为了构建现代分布式系统而诞生了很多分布式队列和消息系统。像Kafka, NATS, NSQ, RabbitMQ, ActiveMQ这类的开源技术, 以及像Google Cloud Pub/Sub, Amazon SQS, Amazon SNS Topic, Azure Service Bus PaaS云平台,上面这些都为消息中间件和分布式系统提供了不同的能力和模式。前面提及的技术中的NATSNSQ均由Go语言编写,借助于像微服务这样的现代方式,大大简化了构建分布式系统的过程。因为构建分布式系统本身就很复杂,如果使用复杂的消息系统则会使你的应用更加复杂。现代的消息系统应该在各种环境规模如内部服务器、云平台、容器上都能够轻松运行和扩展。


 NATS介绍


为Go语言开发者介绍NATS

NATS是一个开源、轻量级、高性能的云原生消息系统。它是实现了具有更高级别扩展性的发布-订阅消息系统。即使NATS是基于发布-订阅分发模型,你同样可以通过订阅服务器队列组实现分布式系统。NATS创建于2010年,原是服务于Cloud Foundary平台的消息系统。NATS最开始是由Ruby语言实现的,但随后NATS团队使用Go语言进行了重写。


NATS在两个互相操作的模块中使用:核心NATS平台-NATS服务器(其可执行文件的名字为gnatsd)简称为NATS;NATS流(其可执行文件的名字为nats-streaming-server)是一个事件流服务,用于NATS添加事件流、发布保障及再现历史数据。NATS服务器是面向高性能现代分布式系统架构而设计的,并不能进行消息持久化。因此,如果你的系统是离线状态,将不会接收到消息。如果你想要实现持续消息传递和发布保障,可以使用NATS流代替核心NATS平台,NATS流建立在核心NATS平台基础之上。本文我将专注于基础的NATS服务器的介绍,至于NATS流我将在后续的文章中介绍。


NATS服务器(gnatsd)是最高性能的分布式消息系统,可以达到每秒钟发送1.5千万-1.8千万条消息。NATS平台易于使用和扩展,NATS的简洁性和高性能性质使得它对于构建现代云原生分布式系统及微服务而言是个不错的选择。我过去使用过许多消息系统,因为NATS的性能和简洁性,所以我强烈推荐它。


为Go语言开发者介绍NATS(来源: bravenewgeek.com/dissecting-message-queues)



消息模式

当NATS作为发布-订阅引擎时,它提供了三种消息传递模式:

  • 发布-订阅

  • 队列

  • 请求-响应



消息架构组件

NATS消息基础结构的主要构成有:

  • 消息:消息是数据交换单元,用于应用间交换数据的有效载荷。

  • 主体:主体明确消息的目的。

  • 生产者:生产者向NATS服务器发送消息。

  • 消费者:消费者从NATS服务器中接收消息。

  • 消息服务器:NATS服务器从生产者到消费者间分配消息。



安装服务器和客户端

以下是下载 NATS 服务器的各种发行版:

http://nats.io/download/nats-io/gnatsd/


同样可以使用Go语言工具安装NATS服务器:

go get github.com/nats-io/gnatsd

通过运行可执行文件gnatsd来启动NATS服务器:

gnatsd

你也可以使用下面的Go语言工具安装NATS客户端:

go get github.com/nats-io/go-nats



在Go语言中使用NATS

让我们通过Go语言编写的一个分布式应用的例子探索下NATS。在这个例子中,我们使用请求-应答和发布-订阅消息传递模式。在发布-订阅模式中,将使用订阅者队列组进行排队。这个例子NATS将使用Protocol Buffers来发送和接收消息。


清单1:缓冲协议中的消息类型

message ServiceDiscovery {    
   string order_service_uri = 1; } message EventStore {  
   string aggregate_id = 1;  
   string aggregate_type = 2;  
   string event_id = 3;  
   string event_type = 4;  
   string event_data = 5; }

消息类型ServiceDiscovery使用请求-应答通信,类型EventStore使用的是发布-订阅模式。



NATS的请求应答示例

请求-应答消息模式的工作方式类似于正常的请求-响应通信,其中发布请求操作发布带有回复主题的消息,同时等等对该回复主题的响应。这里我们使用这个格式用于找出服务端点的简单发现。值得注意的是我此处使用NATS的请求-应答进行消息传递仅仅是为了举个例子。NATS的请求-应答模式与gRPC的简单RPC非常类似,因此我一贯的使用gRPC的API接口代替NATS请求-应答消息传递。但对于以事件驱动的Go语言微服务结构,我通常使用NATS用于pub/sub通信。


这是一段代码块,在项目“Discovery.OrderService”中发送一个请求以得到服务端点。


清单2:作用于NATS请求-应答消息传递的请求:

func main() {	
   // Create NATS server connection    natsConnection, _ := nats.Connect(nats.DefaultURL)    log.Println("Connected to " + nats.DefaultURL)    msg, err := natsConnection.Request("Discovery.OrderService", nil, 1000*time.Millisecond)
   if err == nil && msg != nil {        orderServiceDiscovery := pb.ServiceDiscovery{} err := proto.Unmarshal(msg.Data, &orderServiceDiscovery) if err != nil {    log.Fatalf("Error on unmarshal: %v", err) } address := orderServiceDiscovery.OrderServiceUri log.Println("OrderService endpoint found at:", address)
       //Set up a connection to the gRPC server. conn, err := grpc.Dial(address, grpc.WithInsecure())    } }

Go语言NATS客户端的库被导入的项目中:

import "github.com/nats-io/go-nats"

函数nats.Connect尝试连接NATS系统,默认的NATS服务器运在“nats://localhost:4222”。这里我们使用默认的URL连接服务器。

natsConnection, _ := nats.Connect(nats.DefaultURL)

NATS通常在项目中发送一个名为“Discovery.OrderService”的请求以得到应答。

msg, err := natsConnection.Request("Discovery.OrderService", nil, 1000*time.Millisecond)

当对某个项目发送请求时,你可以传递请求数据和超时。这里我们不提供任何数据,向这个项目发送请求只是为了接收应答。我们采用Protocol Buffers进行发送和接收消息,响应数据被编码在Go语言结构值中。

orderServiceDiscovery := pb.ServiceDiscovery{}
err := proto.Unmarshal(msg.Data, &orderServiceDiscovery
)if err != nil {       log.Fatalf("Error on unmarshal: %v", err) } address := orderServiceDiscovery.OrderServiceUri log.Println("OrderService endpoint found at:", address)

这是另一个应用的代码块,订阅项目 “Discovery.OrderService” 提供了对应请求的应答。


清单3:NATS的请求-应答消息传递的响应

var orderServiceUri string
orderServiceUri = viper.GetString("discovery.orderservice")

func main() {
   // Create server connection    natsConnection, _ := nats.Connect(nats.DefaultURL)    log.Println("Connected to " + nats.DefaultURL)    natsConnection.Subscribe("Discovery.OrderService", func(m *nats.Msg) { orderServiceDiscovery := pb.ServiceDiscovery{OrderServiceUri: orderServiceUri} data, err := proto.Marshal(&orderServiceDiscovery)
       if err == nil {    natsConnection.Publish(m.Reply, data) }
    })
    // Keep the connection alive    runtime.Goexit() }

项目 “Discovery.OrderService”参与发送响应,这里通过编码到Protocol Buers以发送响应数据。



NATS发布订阅示例

我极力推荐NATS用于发布订阅引擎的pub/sub消息模型,构建企业级消息队列及Go语言搭建的分布式系统。NATS发布订阅是一对多通信系统,项目中一个发布者发送一个消息,项目中所有活跃的订阅者接收这个消息。此通信模型是典型的异步方式,发布的消息被分发到订阅消息的处理者。如果没有处理者,订阅以异步模型工作,客户端有可能被阻塞直到消息被处理。大多数真实情况下,你可能不需要通信的异步方式用于pub/sub通信。


为Go语言开发者介绍NATS


当创建订阅者时,你可以同时为它注册一个队列名称。所有具有相同的队列名称的订阅者构成一个队列组。随着消息在被注册的项目中发布,队列组中被随机选择的一个订阅者用于接收消息。虽然队列组中有多个订阅者,但每个消息只能被一个订阅者接收,且只能接收一次。当创建订阅者时,你可以选择是否注册队列名。在队列组中的订阅者们,其中一个订阅者接收消息,而那些没有队列组的订阅者们,所有订阅者共同接受这条消息。有意思的是NATS本身提供队列甚至它本身也是基于消息发布-订阅模式。


在本文的消息发布-订阅模式,我们创建一个无队列的订阅者及一个名“Order.OrdersCreatedQueue”含有多个订阅者的订阅组。因此队列组中的一个订阅者及其他的订阅者(无队列组的)可以接收消息。发布者的客户端是一个gRPC服务器,当项目“Order.OrderCreated”中创建一个命令时,发布者将发布一个消息,详见下面的代码块:


清单4:用于NATS发布订阅消息传递的发布者客户端

const (
	aggregate = "Order"
	event     = "OrderCreated")
       // publishOrderCreated publish an event via NATS server
       
func publishOrderCreated(order *pb.Order) {
       // Connect to NATS server natsConnection, _ := nats.Connect(nats.DefaultURL) log.Println("Connected to " + nats.DefaultURL)
       defer natsConnection.Close() eventData, _ := json.Marshal(order)
       event := pb.EventStore{    AggregateId:   order.OrderId,    AggregateType: aggregate,    EventId:       uuid.NewV4().String(),    EventType:     event,    EventData:     string(eventData), } subject := "Order.OrderCreated" data, _ := proto.Marshal(&event)
       // Publish message on subject natsConnection.Publish(subject, data) log.Println("Published message on subject " + subject) }


消息模式

NATS客户端的Publish功能,向给定的项目中发布一个消息。这里消息被整理到Protocol Buffers中,当信息从发布者客户端发布到项目中时,我们创建一个订阅者接收消息。


这段代码块来自于订阅者客户端用于订阅消息。


清单5:用于NATS发布订阅消息传递的订阅者客户端

const subject = "Order.>"

func main() {
   // Create server connection
   natsConnection, _ := nats.Connect(nats.DefaultURL)    log.Println("Connected to " + nats.DefaultURL)
   // Subscribe to subject    natsConnection.Subscribe(subject, func(msg *nats.Msg) { eventStore := pb.EventStore{} err := proto.Unmarshal(msg.Data, &eventStore)
       if err == nil {
           // Handle the message    log.Printf("Received message in EventStore service: %+v ", eventStore)    store := store.EventStore{}    store.CreateEvent(&eventStore)    log.Println("Inserted event into Event Store") }    })
   // Keep the connection alive    runtime.Goexit() }

使用通配符项目“Order.>”订阅消息,NATS支持在项目订阅中使用通配符,支持星号字符(*)和大于号(>),也被认为所有的通配符被用于通配项目订阅。通配符Order.> will 被匹配为Order.Created,Order.Shipped, Order.Delivered, Order.Delivered.Returned等等。通配符Order.* 将被匹配为 Order.Created, Order.Shipped, Order.Delivered等等,而不是 Order.Delivered.Returned。



NATS客户端的Subscribe功能,当消息被给定的项目发布时,订阅消息处理异步接收消息。由于消息由Protocol Buffers编码所发布,接收消息则是通过proto.Unmarshal解码到Go语言结构体值中。


让我们添加订阅者到一个队列组中,这是订阅者客户端从项目中订阅消息的代码块:


清单6:订阅者客户端队列组的NATS发布-订阅消息传递

const (
    queue   = "Order.OrdersCreatedQueue"
    subject = "Order.OrderCreated"
)
func
main()
{
   // Create server connection    natsConnection, _ := nats.Connect(nats.DefaultURL)    log.Println("Connected to " + nats.DefaultURL)
   // Subscribe to subject    natsConnection.QueueSubscribe(subject, queue, func(msg *nats.Msg) {     eventStore := pb.EventStore{} err := proto.Unmarshal(msg.Data, &eventStore)
       if err == nil {
       // Handle the message    log.Printf("Subscribed message in Worker 1: %+v ", eventStore) }    })
   // Keep the connection alive    runtime.Goexit() }

主题“Order.OrderCreated”的消息使用名为“Order.OrdersCreatedQueue"的队列通过QueueSubscribe功能被订阅。当我们使用同一个队列名创建多个订阅者,它们被创建在一个队列组中,随机选择一个订阅者用来接收消息。如果只是想通过NATS仅仅用于排队,可以只通过一个队列组创建订阅者。


本文我仅展示了核心NATS平台的基础能力,后续我将写另一篇文章讲述NATS流服务器。

本文中的源代码可见:https://github.com/shijuvar/gokit/tree/master/examples/grpc-nats


你也可以在twitter (@shijucv) 上关注我



原文链接

原文链接:https://medium.com/@shijuvar/introducing-nats-to-go-developers-3cfcb98c21d0


容器时代志愿者招募

为Go语言开发者介绍NATS

如果你对技术懵懵懂懂,想要入门却不知从何下手;

如果你求知若渴,想要学习更多技术、思想;

如果你对于技术有着一种狂热的喜爱并且热爱开源,以其为信仰。


为Go语言开发者介绍NATS
快来加入我们吧
为Go语言开发者介绍NATS



志愿者计划JOIN US

容器时代志愿编辑

志愿内容

  1. 翻译 —— 容器生态圈相关教程、文章、资讯等的翻译;



点击阅读原文即可加入,加入之后还有神秘福利等着你呦~





编辑:立尧


以上是关于为Go语言开发者介绍NATS的主要内容,如果未能解决你的问题,请参考以下文章

windows通过Visual Studio Code中配置GO开发环境(转)

在Visual Studio Code中配置GO开发环境

NATS_13:NATS Streaming案例讲解

NATS_05:服务器部署

Go语言基础之包

Go语言基础之包