Camel 和rabbitmq 集成处理

Posted willwillie

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Camel 和rabbitmq 集成处理相关的知识,希望对你有一定的参考价值。

camel可以看作是java中的一个编程范式,通过DSL语言来编写路由规则。

camel 核心

camel是一个基于规则路由和处理的引擎,提供企业集成模式的Java对象的实现,通过应用程序接口或称为陈述式的Java领域特定语言(DSL)来配置路由和处理的规则。
其核心的思想就是从一个from源头得到数据,通过processor处理,再发到一个to目的的.

发散:像是网络中超强的路由器规则,是否可以看做是软件定义的交换机

这里的from和to可以是我们在项目集成中经常碰到的类型:一个FTP文件夹中的文件,一个MQ的queue,一个HTTP request/response,一个webservice等等。

现在的问题是 什么是java领域特定语言(DSL)?

Camel uses a Java Domain Specific Language or DSL for creating Enterprise Integration Patterns or Routes in a variety of domain-specific languages (DSL) as listed below.
Java DSL - A Java based DSL using the fluent builder style.
Spring XML - A XML based DSL in Spring XML files
Blueprint XML - A XML based DSL in OSGi Blueprint XML files
Rest DSL - A DSL to define REST services using a REST style in either Java or XML.
Groovy DSL - A Groovy based DSL using Groovy programming language
Scala DSL - A Scala based DSL using Scala programming language
Annotation DSL - Use annotations in Java beans.
Kotlin DSL - Work in progress - Currently developed outside ASF, but will we included later in Camel when Kotlin and the DSL is ready.
The main entry points for the DSL are
CamelContext for creating a Camel routing rulebase
RouteBuilder for creating a collection of routes using the routing DSL—http://camel.apache.org/dsl.html
也就是说camel可以使用多种方式配置,包括Java xml等。

这里主要关注java DSL,java DSL的语法主要是什么呢?
RouteBuilder类就是Java的smart DSL。

public abstract class RouteBuilder
extends BuilderSupport
implements RoutesBuilder
A Java DSL which is used to build DefaultRoute instances in a CamelContext for smart routing.

主要的方法有:

public abstract void configure()
throws Exception
Called on initialization to build the routes using the fluent builder syntax.
This is a central method for RouteBuilder implementations to implement the routes using the Java fluent builder syntax.

configure()就是主要的实现路由语法的函数。
主要的路由配置的方法我整理成了一张表格:

语法作用语法
from路由的起点from(String uri),返回the builder
to路由的终点参数String uri,返回the builder
choice(when,otherwise)选择某个内容以choice开头,when表示某个条件
fromF/toF使用String formatting创建URI和from/to类似
filter选择某个内容过滤想要的内容
process对内容进行一定的处理需要编写继承自Processor接口的类,实现process方法,完成对Exchanage(交换内容)处理的逻辑

Processer接口用于处理消息转换或者消息交换。

来自网络的一个简单例子:

这里实现了一个简单的路由,这里的起点是DefaultCamelConetxt,那么DefaultCamelConetxt这个类是怎样的?

public class DefaultCamelContext
extends ServiceSupport
implements ModelCamelContext, SuspendableService
Represents the context used to configure routes and the policies to use.

也就是说DefaultCamelContext类代表了配置camel路由的上下文,和一些必要的策略,DefaultCamelContext继承自Service,可以使用start,stop等基本的管理生命周期的方法。
让camel引擎开始工作使用start方法,对应的停止工作的方法是stop。使用addRoutes方法给camelContext添加对应的路由。
下图表示了一个继承自Processor类的基本的实现,

主要的逻辑是对Exchange进行操作,来达到想要的目的。上图的逻辑是将a文件所有的行连成一行,以空格分隔组成一个新的文件给to对应的终点uri。
前面讲了一个简单的这里,到这个时候,还需要补充一些基本的概念。

camel一些定义

补充的概念有Uris,EndPoint,Exchange,Message等

关于uris的官方解释: Camel makes extensive use of URIs to allow you to refer to endpoints which are lazily created by a Component if you refer to them within Routes.

意思是,uris的作用就是用来指向组件的endpoint的一种表示方法。

那么endpoint呢?

官方解释: public interface Endpoint extends IsSingleton, Service
An endpoint implements the Message Endpoint pattern and represents an endpoint that can send and receive message exchanges

endpoint就是一个可以收发消息的组件,组件component就是一个一个的camel组件的术语代表。

那么,message呢?

public interface Exchange An Exchange is the message container holdingthe information during the entire routing of a Message received by a Consumer.

public interface Message Implements the Message pattern and represents an inbound or outbound message as part of an Exchange.

Exchange是message容器,message有属性inbound和outbound,是Exchange的一部分。比如上面的处理器使用exchange的getIn()方法可以得到message本身。

目前我写的一系列文章都是讲的clojure,比如语法和数据处理,同样我本文的目的依旧是讲camel在clojure中的应用,讲的更具体一点的话,其实是谈论camel在clojure 微服务中的使用方法。

camel在clojure 微服务中的使用

首先什么是微服务,这里有一篇很好的介绍微服务的文章,http://chuansong.me/n/350466751870(属于Chris Richardson 微服务系列7 篇中的第二篇)。

比如说一个微服务架构中,使用jsonrpc作为网关,rabbitmq作为amqp队列,一系列微服务通过amqp和网关(给外部提供web服务)通信。那么在这个架构中,camel的定义是怎样的?

(defrecord Camel [options configuration]
  component/Lifecycle
  (start [this]
    (let [registry      (SimpleRegistry.)
          camel-context (DefaultCamelContext. registry)
          error-handler (doto (DefaultErrorHandlerBuilder.) (.logStackTrace true) (.logExhaustedMessageHistory false))]
      ...
      (assoc this :context camel-context
                  :registry registry)))
  (stop [this] this))

这里使用上了clojure类型来定义Camel,还使用了SimpleRegistry。为何要使用SimpleRegistry类?
Registry接口:

Registry也就是一个注册服务接口,simpleRegistry就是简单的实现了这种接口的一个类,可以方便camel使用某些查找类的方法。
然后这个camelcontext初始化了一些参数,并将app的配置文件的地址设置为camel context的“properties”组件(可能没有实际意义,因为并不作为路由使用)。在start完成后,Camel便是一个有四个初始化了的field(options,configuration,camel,registry)的记录类型。
然后定义另外一个组件CamelStarter

(defrecord CamelStarter [camel]
  ;; 启动或停止camel. 定义此component的目的:在start之前,其他component有机会初始化和添加routes)
  component/Lifecycle
  (start [this]
    (log/info "Starting CamelStarter")
    (.start (:context camel))
    this)
  (stop [this]
    (.stop (:context camel))
    this))

在这里刚开始的时候只是创建了一个camel的引擎,在每一个微服务正常启动后,camel引擎根据路由和处理规则会自动开始工作。

现在来看看在微服务中是怎么添加路由的?
路由也就是uri,像上文定义的那样,uri只是一个Component的EndPoint的表示。
上面还只是简单的定义了一个camel,看起来没有任何component,那么怎么给camel定义componet呢?下面给出几组基本的组件:

组件作用描述uri
directSynchronous call to another endpoint from same CamelContext.在同一个camel-context下有很多路由,当然根据一个名字就可以找到对应的组件了direct:someName[?options]
timerUsed to generate message exchanges when a timer fires You can only consume events from this endpoint.算是比较特殊的用法了,用于产生Exchange? 定时产生的events可以产生消息timer:timerName[?options]
amqpFor Messaging with AMQP protocol对于使用了amqp协议的消息传递amqp:[queue:

那么,看看在这里camelcontext怎么将rabbitmq添加为一个路由组件,并且创建路由的。

(defrecord RabbitMQ [options configuration camel]
  component/Lifecycle
 ...
  (start [this]
    (let [cfg (c/cfg-fn configuration)
          cf (doto (ConnectionFactory.) (.setAutomaticRecoveryEnabled true) (.setHost (cfg "rabbitmq.host")) (.setPort (cfg "rabbitmq.port" :int 5672)) (.setVirtualHost (cfg "amqp.vhost")) (.setRequestedHeartbeat 1000))]
      (doto camel
        (-> (:registry) (.put "connectionFactory" cf))
        (-> (:context) (.addComponent "rabbitmq" (MyRabbitMQComponent.)))
        (r/add-routes (from [:timer "app-report" "2s"] (process this get-appinfo) (to [:amqp "/appinfo/report"])))))
    this)
  IRabbitMQ
  (get-appinfo [this ex]
    :appId    (:app-id options)
     :appName  (:app options)
     :hostAddr (:host options)))

使用ConnectionFactory启动一个到rabbitmq host的连接cf。给camel的SimpleRegistry添加了一个connectionFactory cf的键值对。
给CamelTontext添加了rabbitmq组件(MyRabbitMqComponent对象的创建,使用createEndpoint创建的Endpoint)。
然后使用add-routes添加规则和处理函数,使用timer组件作为定时器,

[:timer "app-report" "2s"]

这里的语法经过一定的处理,含义其实和之前的是相似的,定义一个名字为”app-report”,这里的options代表period,period是2s,也就是每2s触发一个event。这里触发的事件是用process调用的get-appinfo,逻辑很简单,只是返回一个App信息的map,也就是产生的Exchange 的message的内容。然后将这个信息路由到(to [:amqp "/appinfo/report"])amqp组件,也就是刚刚创建的rabbitmq组件,队列的名字是”/appinfo/report”,把产生的消息放入这个队列可能没有什么意义,但是作为测试,这还是比较有趣的。

那么一般的微服务是怎么使用camel来达到目的的呢?比如说使用netty web服务器。

The netty4-http component is an extension to Netty4 component to facilitiate HTTP transport with Netty4. This camel component supports both producer and consumer endpoints.
这是之前没有介绍到的一个camel组件 netty4-http:
The URI scheme for a netty component is as follows
netty4-http:http://localhost:8080[?options]

比如可以这样使用:

from("netty4-http:http://0.0.0.0:8123matchOnUriPrefix=true").to("mock:foo");

现在来看看,clojure版本的jsonrpc是怎样使用netty-http组件的吧:

(defrecord JsonrpcApp [camel configuration producer discovery-map metrics-map appinfo-map]
 ...
  (start [this]
    (let [...
     (doto camel (add-routes (entry (netty4-http "http.basejsonrpc.base" :matchOnUriPrefix true :compression true :httpMethodRestrict "GET,POST,OPTIONS" :headerFilterStrategy "headerFilterStrategy") (.streamCaching) (on-exception Exception (to "direct:output_jsonrpc")) ;(log "jsonrpc: $in.headers") (process this extract-client-ip) (choice (when "$in.header.CamelHttpMethod == 'OPTIONS'" (process this options-report)) (otherwise ...)) (to "direct:output_jsonrpc"))) (add-routes (entry "direct:output_jsonrpc" (process this set-cors http/marshal-jsonrpc) (log "response from amqp: $in.headers $in.body"))) (add-routes ... (entry [:amqp-in "/jsonrpc/discovery" "/jsonrpc/discovery-"] (process this discovery discovery2)) (entry [:amqp-in "/metrics/report" "/metrics/report-"] (process this metrics)) (entry [:timer "clear-appinfo" "1s"] (process this clear-expired-appinfo clear-expired-discovery)))) this))

这里在Jsonrpc组件的start函数里给已经初始化好的camel组件添加路由。当然,在clojure里面的语法会稍微有点不同,routes相当于from。然后创建一个netty4-http组件,设置收到的消息是可以缓存的。如果netty4-http的过程发生了任何异常,就把消息路由到"direct:output_jsonrpc"。正常情况下的处理 步骤是
1.获取客户端IP extract-client-ip
2.判断message的内容,如果$in.header.CamelHttpMethod == ‘OPTIONS’,则执行options-report,否则

(otherwise
                                   (process this
                                            should-be-post
                                            http/unmarshal-jsonrpc
                                            check-is-permitted
                                            check-availability)
                                   (set-header c/ROUTING-KEY "/jsonrpc$header.CamelHttpPath")
                                   (log "request to amqp: $in.headers $in.body")
                                   (to :amqp))

should-be-post:如果http方法不是post方法,则抛出异常
http/unmarshal-jsonrpc:使用getIn和getBody取得Exchange的内容,然后设置Exchange头部RPC-ID和RPC-Method,返回内容的params部分给下一步。
check-is-permitted:调用权限服务,检查client-ip和api-key是否有权限,否则抛出异常
check-availability:检查服务是否可用,是否在jsonrpc发现的服务列表里面,否则抛出异常。

3.设置message头部的rabbitmq路由。

netty4-http组件(当然,首先作为web服务器),生成的Message头部字段包括:

在process处理后,使用(set-header c/ROUTING-KEY "/jsonrpc$header.CamelHttpPath")设置对应的rabbitmq的路由后
4.将消息路由到amqp(根据第三步设置的路由,也就是对应的rabbitmq的队列),当然这个语法也是处理过了的。
5.最后,(也就是经过第4步处理过的Ex)将回到"direct:output_jsonrpc"

写到此处,还有两个问题需要解决。
1.网关jsonrpc这边的处理输完了,那么微服务比如资源规划怎么和jonrpc交互?
2.jsonrpc怎么完成服务发现的工作?

这两个问题将混合在一起解决。
首先看看怎么定义一个rpc微服务?

(defrecord RPC [camel options uri method-map service]
  component/Lifecycle
  (stop [this] this)
  (start [this]
    (add-routes camel
                (from [:amqp uri uri]
                      (on-exception Exception (process this rpc-error))
                      (process this rpc-process))

                (from [:timer "report-rpc" "2s"]
                      (process this rpc-desc)
                      (to [:amqp "/jsonrpc/discovery"])))
    this)

定义RPC服务的过程,其实就是给camel添加路由的过程,将取得uri(比如说对于资源规划微服务,uri就是”/jsonrpc/res-plan-clj”)对应的rabbitmq中的队列,取得来自这个队列中的Exchange,然后一般情况下通过rpc-process函数对Exchange进行处理。
rpc-process 是一个很关键的函数,它将取得对应的方法并进行参数检查和方法调用。
rpc处理函数的过程,只要amqp队列中有uri:就会对这些uri进行处理。

这里再调强一下rabbitmq,每个微服务start一个amqp的时候,rabbitmq都会在自己的服务器上为他创建一个队列。同时也会为jsonrpc创建discovery队列。

其实在这里会有一些疑问:
1.rabbitmq在什么时候创建的队列呢?难道是在建立连接的时候?
这里需要rabbitmq的内在逻辑

2.rabbitmq在不依靠camel的情况下也可以正常工作,就像kafka一样,和spark streaming一起工作也是可以的?

(updating)

并且。每个rpc微服务每隔2s就会向网关报告自己的app信息和rpc方法列表等。
在网关jsonrpc那边,

(entry [:amqp-in "/jsonrpc/discovery" "/jsonrpc/discovery-"]
                         (process this discovery discovery2))

在收到来自amqp:/jsonrpc/discovery的消息后,会使用discovery discovery2处理。
discovery:(swap! discovery-map assoc uri report) 将收集uri的report消息放入自己service的discovery-map中。
而discovery2:

 appinfo     (assoc appinfo :appid appid
                                   :appName app-name
                                   :hostAddr host-addr
                                   :lastReportTime (System/currentTimeMillis))
        appinfo     (assoc-in appinfo [:services uri] report)]
    (swap! appinfo-map assoc appid appinfo)

用于收集appinfo-map,包括lastReportTime,收集lastReportTime是有目的的,那么目的是什么呢?每隔1s执行这两个函数,

(entry [:timer "clear-appinfo" "1s"]
                         (process this
                                  clear-expired-appinfo
                                  clear-expired-discovery))

clear-expired-appinfo观察appinfo-map中的app,如果谁的lastReportTime和现在比已经超过4s,则认为这个app已经过期,将它从appinfo-map中去除。clear-expired-discovery同理,将它从discovery-map中除掉。
在网关的log中报告:

App resplan-clj-app-bf92feb169fd@10.6.208.10 expired at 2016-08-07 17:19:42
 URI /jsonrpc/res-plan-clj expired at 2016-08-07 17:19:42

到此,整个流程都理顺了。
只是还有些小问题,比如
如果微服务和微服务之间要直接通信呢?这当然是绕开网关比较好,看看这里是怎么实现的:
call-app会直接构造一个带路由的Exchange,调用了这个函数之后,也就意味着要异步的调用另外一个方法(直接使用了invoke方法),,,比如说权限服务。。。(这样就会进入权限服务的rpc处理了,这是跳过了jsonrpc网关的,直接通信。。。)

(call-app [this app-key :keys [method params headers]]
    (invoke
      this "direct:call-app"
      [:headers headers]
      [:header c/ROUTING-KEY app-key]
      [:header c/RPC-METHOD method]
      [:header c/RPC-ID (u/uuid-short)]
      [:body (if (map? params) params (vec params))]
      [:parse #(u/replace-keys % keyword)]))

然后添加对应的路由发送:

(add-routes camel (from "direct:call-app" (to :amqp)))

invoke函数还会对输入和输出进行一定的处理。

展望

还有许多内容需要深入理解,比如camel的更多组件,rabbitmq的更加深入的用法,各种参数设置等等。

以上是关于Camel 和rabbitmq 集成处理的主要内容,如果未能解决你的问题,请参考以下文章

springboot 整合 apache camel实现企业级数据集成和处理

RabbitMQ - Apache Camel 读取消息如何处理失败的消息

Apache Kafka 和 Camel 之间的区别(代理与集成)

Apache Camel 与 Spring Boot 集成,通过FTP定时采集处理文件

SpringBoot集成rabbitmq

RabbitMQ Java客户端使用