Camel 和rabbitmq 集成处理
Posted willwillie
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Camel 和rabbitmq 集成处理相关的知识,希望对你有一定的参考价值。
camel可以看作是java中的一个编程范式,通过DSL语言来编写路由规则。
camel 核心
camel是一个基于规则路由和处理的引擎,提供企业集成模式的Java对象的实现,通过应用程序接口或称为陈述式的Java领域特定语言(DSL)来配置路由和处理的规则。
其核心的思想就是从一个from源头得到数据,通过processor处理,再发到一个to目的的.发散:像是网络中超强的路由器规则,是否可以看做是软件定义的交换机?
这里的from和to可以是我们在项目集成中经常碰到的类型:一个FTP文件夹中的文件,一个MQ的queue,一个HTTP request/response,一个webservice等等。
现在的问题是 什么是java领域特定语言(DSL)?
Camel uses a Java Domain Specific Language or DSL for creating Enterprise Integration Patterns or Routes in a variety of domain-specific languages (DSL) as listed below.
Java DSL - A Java based DSL using the fluent builder style.
Spring XML - A XML based DSL in Spring XML files
Blueprint XML - A XML based DSL in OSGi Blueprint XML files
Rest DSL - A DSL to define REST services using a REST style in either Java or XML.
Groovy DSL - A Groovy based DSL using Groovy programming language
Scala DSL - A Scala based DSL using Scala programming language
Annotation DSL - Use annotations in Java beans.
Kotlin DSL - Work in progress - Currently developed outside ASF, but will we included later in Camel when Kotlin and the DSL is ready.
The main entry points for the DSL are
CamelContext for creating a Camel routing rulebase
RouteBuilder for creating a collection of routes using the routing DSL—http://camel.apache.org/dsl.html
也就是说camel可以使用多种方式配置,包括Java xml等。
这里主要关注java DSL,java DSL的语法主要是什么呢?
RouteBuilder类就是Java的smart DSL。
public abstract class RouteBuilder
extends BuilderSupport
implements RoutesBuilder
A Java DSL which is used to build DefaultRoute instances in a CamelContext for smart routing.
主要的方法有:
public abstract void configure()
throws Exception
Called on initialization to build the routes using the fluent builder syntax.
This is a central method for RouteBuilder implementations to implement the routes using the Java fluent builder syntax.
configure()就是主要的实现路由语法的函数。
主要的路由配置的方法我整理成了一张表格:
语法 | 作用 | 语法 |
---|---|---|
from | 路由的起点 | from(String uri),返回the builder |
to | 路由的终点 | 参数String uri,返回the builder |
choice(when,otherwise) | 选择某个内容 | 以choice开头,when表示某个条件 |
fromF/toF | 使用String formatting创建URI | 和from/to类似 |
filter | 选择某个内容 | 过滤想要的内容 |
process | 对内容进行一定的处理 | 需要编写继承自Processor接口的类,实现process方法,完成对Exchanage(交换内容)处理的逻辑 |
Processer接口用于处理消息转换或者消息交换。
来自网络的一个简单例子:
这里实现了一个简单的路由,这里的起点是DefaultCamelConetxt
,那么DefaultCamelConetxt这个类是怎样的?
public class DefaultCamelContext
extends ServiceSupport
implements ModelCamelContext, SuspendableService
Represents the context used to configure routes and the policies to use.
也就是说DefaultCamelContext
类代表了配置camel
路由的上下文,和一些必要的策略,DefaultCamelContext
继承自Service
,可以使用start,stop等基本的管理生命周期的方法。
让camel引擎开始工作使用start方法,对应的停止工作的方法是stop。使用addRoutes方法给camelContext添加对应的路由。
下图表示了一个继承自Processor类的基本的实现,
主要的逻辑是对Exchange进行操作,来达到想要的目的。上图的逻辑是将a文件所有的行连成一行,以空格分隔组成一个新的文件给to对应的终点uri。
前面讲了一个简单的这里,到这个时候,还需要补充一些基本的概念。
camel一些定义
补充的概念有Uris,EndPoint,Exchange,Message等
关于uris的官方解释: Camel makes extensive use of URIs to allow you to refer to endpoints which are lazily created by a Component if you refer to them within Routes.
意思是,uris的作用就是用来指向组件的endpoint的一种表示方法。
那么endpoint呢?
官方解释: public interface Endpoint extends IsSingleton, Service
An endpoint implements the Message Endpoint pattern and represents an endpoint that can send and receive message exchanges
endpoint就是一个可以收发消息的组件,组件component就是一个一个的camel组件的术语代表。
那么,message呢?
public interface Exchange An Exchange is the message container holdingthe information during the entire routing of a Message received by a Consumer.
public interface Message Implements the Message pattern and represents an inbound or outbound message as part of an Exchange.
Exchange是message容器,message有属性inbound和outbound,是Exchange的一部分。比如上面的处理器使用exchange的getIn()方法可以得到message本身。
目前我写的一系列文章都是讲的clojure,比如语法和数据处理,同样我本文的目的依旧是讲camel在clojure中的应用,讲的更具体一点的话,其实是谈论camel在clojure 微服务中的使用方法。
camel在clojure 微服务中的使用
首先什么是微服务,这里有一篇很好的介绍微服务的文章,http://chuansong.me/n/350466751870(属于Chris Richardson 微服务系列7 篇中的第二篇)。
比如说一个微服务架构中,使用jsonrpc作为网关,rabbitmq作为amqp队列,一系列微服务通过amqp和网关(给外部提供web服务)通信。那么在这个架构中,camel的定义是怎样的?
(defrecord Camel [options configuration]
component/Lifecycle
(start [this]
(let [registry (SimpleRegistry.)
camel-context (DefaultCamelContext. registry)
error-handler (doto (DefaultErrorHandlerBuilder.) (.logStackTrace true) (.logExhaustedMessageHistory false))]
...
(assoc this :context camel-context
:registry registry)))
(stop [this] this))
这里使用上了clojure类型来定义Camel
,还使用了SimpleRegistry
。为何要使用SimpleRegistry
类?
Registry
接口:
Registry
也就是一个注册服务接口,simpleRegistry就是简单的实现了这种接口的一个类,可以方便camel使用某些查找类的方法。
然后这个camelcontext初始化了一些参数,并将app的配置文件的地址设置为camel context的“properties”组件(可能没有实际意义,因为并不作为路由使用)。在start完成后,Camel便是一个有四个初始化了的field(options,configuration,camel,registry)的记录类型。
然后定义另外一个组件CamelStarter
:
(defrecord CamelStarter [camel]
;; 启动或停止camel. 定义此component的目的:在start之前,其他component有机会初始化和添加routes)
component/Lifecycle
(start [this]
(log/info "Starting CamelStarter")
(.start (:context camel))
this)
(stop [this]
(.stop (:context camel))
this))
在这里刚开始的时候只是创建了一个camel的引擎,在每一个微服务正常启动后,camel引擎根据路由和处理规则会自动开始工作。
现在来看看在微服务中是怎么添加路由的?
路由也就是uri,像上文定义的那样,uri只是一个Component的EndPoint的表示。
上面还只是简单的定义了一个camel,看起来没有任何component,那么怎么给camel定义componet呢?下面给出几组基本的组件:
组件 | 作用 | 描述 | uri |
---|---|---|---|
direct | Synchronous call to another endpoint from same CamelContext. | 在同一个camel-context下有很多路由,当然根据一个名字就可以找到对应的组件了 | direct:someName[?options] |
timer | Used to generate message exchanges when a timer fires You can only consume events from this endpoint. | 算是比较特殊的用法了,用于产生Exchange? 定时产生的events可以产生消息 | timer:timerName[?options] |
amqp | For Messaging with AMQP protocol | 对于使用了amqp协议的消息传递 | amqp:[queue: |
那么,看看在这里camelcontext怎么将rabbitmq添加为一个路由组件,并且创建路由的。
(defrecord RabbitMQ [options configuration camel]
component/Lifecycle
...
(start [this]
(let [cfg (c/cfg-fn configuration)
cf (doto (ConnectionFactory.) (.setAutomaticRecoveryEnabled true) (.setHost (cfg "rabbitmq.host")) (.setPort (cfg "rabbitmq.port" :int 5672)) (.setVirtualHost (cfg "amqp.vhost")) (.setRequestedHeartbeat 1000))]
(doto camel
(-> (:registry) (.put "connectionFactory" cf))
(-> (:context) (.addComponent "rabbitmq" (MyRabbitMQComponent.)))
(r/add-routes (from [:timer "app-report" "2s"] (process this get-appinfo) (to [:amqp "/appinfo/report"])))))
this)
IRabbitMQ
(get-appinfo [this ex]
:appId (:app-id options)
:appName (:app options)
:hostAddr (:host options)))
使用ConnectionFactory启动一个到rabbitmq host的连接cf。给camel的SimpleRegistry添加了一个connectionFactory cf的键值对。
给CamelTontext添加了rabbitmq组件(MyRabbitMqComponent对象的创建,使用createEndpoint创建的Endpoint)。
然后使用add-routes添加规则和处理函数,使用timer组件作为定时器,
[:timer "app-report" "2s"]
这里的语法经过一定的处理,含义其实和之前的是相似的,定义一个名字为”app-report”,这里的options代表period,period是2s,也就是每2s触发一个event。这里触发的事件是用process调用的get-appinfo,逻辑很简单,只是返回一个App信息的map,也就是产生的Exchange 的message的内容。然后将这个信息路由到(to [:amqp "/appinfo/report"])
amqp组件,也就是刚刚创建的rabbitmq组件,队列的名字是”/appinfo/report”,把产生的消息放入这个队列可能没有什么意义,但是作为测试,这还是比较有趣的。
那么一般的微服务是怎么使用camel来达到目的的呢?比如说使用netty web服务器。
The netty4-http component is an extension to Netty4 component to facilitiate HTTP transport with Netty4. This camel component supports both producer and consumer endpoints.
这是之前没有介绍到的一个camel组件 netty4-http:
The URI scheme for a netty component is as follows
netty4-http:http://localhost:8080[?options]
比如可以这样使用:
from("netty4-http:http://0.0.0.0:8123matchOnUriPrefix=true").to("mock:foo");
现在来看看,clojure版本的jsonrpc是怎样使用netty-http组件的吧:
(defrecord JsonrpcApp [camel configuration producer discovery-map metrics-map appinfo-map]
...
(start [this]
(let [...
(doto camel (add-routes (entry (netty4-http "http.basejsonrpc.base" :matchOnUriPrefix true :compression true :httpMethodRestrict "GET,POST,OPTIONS" :headerFilterStrategy "headerFilterStrategy") (.streamCaching) (on-exception Exception (to "direct:output_jsonrpc")) ;(log "jsonrpc: $in.headers") (process this extract-client-ip) (choice (when "$in.header.CamelHttpMethod == 'OPTIONS'" (process this options-report)) (otherwise ...)) (to "direct:output_jsonrpc"))) (add-routes (entry "direct:output_jsonrpc" (process this set-cors http/marshal-jsonrpc) (log "response from amqp: $in.headers $in.body"))) (add-routes ... (entry [:amqp-in "/jsonrpc/discovery" "/jsonrpc/discovery-"] (process this discovery discovery2)) (entry [:amqp-in "/metrics/report" "/metrics/report-"] (process this metrics)) (entry [:timer "clear-appinfo" "1s"] (process this clear-expired-appinfo clear-expired-discovery)))) this))
这里在Jsonrpc组件的start函数里给已经初始化好的camel组件添加路由。当然,在clojure里面的语法会稍微有点不同,routes相当于from。然后创建一个netty4-http组件,设置收到的消息是可以缓存的。如果netty4-http的过程发生了任何异常,就把消息路由到"direct:output_jsonrpc"
。正常情况下的处理 步骤是
1.获取客户端IP extract-client-ip
2.判断message的内容,如果$in.header.CamelHttpMethod == ‘OPTIONS’,则执行options-report,否则
(otherwise
(process this
should-be-post
http/unmarshal-jsonrpc
check-is-permitted
check-availability)
(set-header c/ROUTING-KEY "/jsonrpc$header.CamelHttpPath")
(log "request to amqp: $in.headers $in.body")
(to :amqp))
should-be-post:如果http方法不是post方法,则抛出异常
http/unmarshal-jsonrpc:使用getIn和getBody取得Exchange的内容,然后设置Exchange头部RPC-ID和RPC-Method,返回内容的params部分给下一步。
check-is-permitted:调用权限服务,检查client-ip和api-key是否有权限,否则抛出异常
check-availability:检查服务是否可用,是否在jsonrpc发现的服务列表里面,否则抛出异常。
3.设置message头部的rabbitmq路由。
netty4-http组件(当然,首先作为web服务器),生成的Message头部字段包括:
在process处理后,使用(set-header c/ROUTING-KEY "/jsonrpc$header.CamelHttpPath")
设置对应的rabbitmq的路由后
4.将消息路由到amqp(根据第三步设置的路由,也就是对应的rabbitmq的队列),当然这个语法也是处理过了的。
5.最后,(也就是经过第4步处理过的Ex)将回到"direct:output_jsonrpc"
。
写到此处,还有两个问题需要解决。
1.网关jsonrpc这边的处理输完了,那么微服务比如资源规划怎么和jonrpc交互?
2.jsonrpc怎么完成服务发现的工作?
这两个问题将混合在一起解决。
首先看看怎么定义一个rpc微服务?
(defrecord RPC [camel options uri method-map service]
component/Lifecycle
(stop [this] this)
(start [this]
(add-routes camel
(from [:amqp uri uri]
(on-exception Exception (process this rpc-error))
(process this rpc-process))
(from [:timer "report-rpc" "2s"]
(process this rpc-desc)
(to [:amqp "/jsonrpc/discovery"])))
this)
定义RPC服务的过程,其实就是给camel添加路由的过程,将取得uri(比如说对于资源规划微服务,uri就是”/jsonrpc/res-plan-clj”)对应的rabbitmq中的队列,取得来自这个队列中的Exchange,然后一般情况下通过rpc-process
函数对Exchange进行处理。
rpc-process
是一个很关键的函数,它将取得对应的方法并进行参数检查和方法调用。
rpc处理函数的过程,只要amqp队列中有uri:就会对这些uri进行处理。
这里再调强一下rabbitmq,每个微服务start一个amqp的时候,rabbitmq都会在自己的服务器上为他创建一个队列。同时也会为jsonrpc创建discovery队列。
其实在这里会有一些疑问:
1.rabbitmq在什么时候创建的队列呢?难道是在建立连接的时候?
这里需要rabbitmq的内在逻辑
2.rabbitmq在不依靠camel的情况下也可以正常工作,就像kafka一样,和spark streaming一起工作也是可以的?
(updating)
并且。每个rpc微服务每隔2s就会向网关报告自己的app信息和rpc方法列表等。
在网关jsonrpc那边,
(entry [:amqp-in "/jsonrpc/discovery" "/jsonrpc/discovery-"]
(process this discovery discovery2))
在收到来自amqp:/jsonrpc/discovery的消息后,会使用discovery discovery2处理。
discovery:(swap! discovery-map assoc uri report)
将收集uri的report消息放入自己service的discovery-map中。
而discovery2:
appinfo (assoc appinfo :appid appid
:appName app-name
:hostAddr host-addr
:lastReportTime (System/currentTimeMillis))
appinfo (assoc-in appinfo [:services uri] report)]
(swap! appinfo-map assoc appid appinfo)
用于收集appinfo-map,包括lastReportTime,收集lastReportTime是有目的的,那么目的是什么呢?每隔1s执行这两个函数,
(entry [:timer "clear-appinfo" "1s"]
(process this
clear-expired-appinfo
clear-expired-discovery))
clear-expired-appinfo观察appinfo-map中的app,如果谁的lastReportTime和现在比已经超过4s,则认为这个app已经过期,将它从appinfo-map中去除。clear-expired-discovery同理,将它从discovery-map中除掉。
在网关的log中报告:
App resplan-clj-app-bf92feb169fd@10.6.208.10 expired at 2016-08-07 17:19:42
URI /jsonrpc/res-plan-clj expired at 2016-08-07 17:19:42
到此,整个流程都理顺了。
只是还有些小问题,比如
如果微服务和微服务之间要直接通信呢?这当然是绕开网关比较好,看看这里是怎么实现的:
call-app会直接构造一个带路由的Exchange,调用了这个函数之后,也就意味着要异步的调用另外一个方法(直接使用了invoke方法),,,比如说权限服务。。。(这样就会进入权限服务的rpc处理了,这是跳过了jsonrpc网关的,直接通信。。。)
(call-app [this app-key :keys [method params headers]]
(invoke
this "direct:call-app"
[:headers headers]
[:header c/ROUTING-KEY app-key]
[:header c/RPC-METHOD method]
[:header c/RPC-ID (u/uuid-short)]
[:body (if (map? params) params (vec params))]
[:parse #(u/replace-keys % keyword)]))
然后添加对应的路由发送:
(add-routes camel (from "direct:call-app" (to :amqp)))
invoke函数还会对输入和输出进行一定的处理。
展望
还有许多内容需要深入理解,比如camel的更多组件,rabbitmq的更加深入的用法,各种参数设置等等。
以上是关于Camel 和rabbitmq 集成处理的主要内容,如果未能解决你的问题,请参考以下文章
springboot 整合 apache camel实现企业级数据集成和处理
RabbitMQ - Apache Camel 读取消息如何处理失败的消息
Apache Kafka 和 Camel 之间的区别(代理与集成)