Flume 读取RabbitMq消息队列消息,并将消息写入kafka

Posted 也许明天1

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume 读取RabbitMq消息队列消息,并将消息写入kafka相关的知识,希望对你有一定的参考价值。

首先是关于flume的基础介绍

组件名称    

功能介绍

Agent代理

使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。

Client客户端

生产数据,运行在一个独立的线程。

Source源

从Client收集数据,传递给Channel。

Sink接收器

从Channel收集数据,进行相关操作,运行在一个独立线程。

Channel通道

连接 sources 和 sinks ,这个有点像一个队列。

Events事件

传输的基本数据负载。

目前来说flume是支持多种source

其中是支持读取jms消息队列消息,网上并没有找到关于读取rabbitmq的教程

虽然flume并不支持读取rabbitMq,所以需要对flume进行二次开发

这里主要就是flume怎么从rabbitMq读取数据

这里从git上找到了一个关于flume从rabbitMq读取数据的插件

  下载地址是:https://github.com/gmr/rabbitmq-flume-plugin

上面有一些英文的描述,大家可以看下

环境介绍

centOS 7.3   jdk1.8   cdh5.14.0

1.用 mvn 打包该项目,会生成两个JAR包

 

 

2.因为我这边使用的以cdh方式安装集成flume的,所以把这两个jar  扔到  /usr/lib   下面

  如果是普通的安装方式需要把这两个jar包复制到 flume安装目录的lib下面

 

 

3.进入cdh管理页面配置Agent

 

下面是详细的配置,我这边是直接把消息写入kafka集群里 的

tier1.sources  = source1

tier1.channels = channel1

tier1.sinks    = sink1

tier1.sources.source1.type     = com.aweber.flume.source.rabbitmq.RabbitMQSource

tier1.sources.source1.bind     = 127.0.0.1

tier1.sources.source1.port     = 5672

tier1.sources.source1.virtual-host = /

tier1.sources.source1.username = guest

tier1.sources.source1.password = guest

tier1.sources.source1.queue = test

tier1.sources.source1.prefetchCount = 10

tier1.sources.source1.channels = channel1

tier1.sources.source1.threads = 2

tier1.sources.source1.interceptors = i1

tier1.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

tier1.sources.source1.interceptors.i1.preserveExisting = true

tier1.channels.channel1.type   = memory

tier1.sinks.sink1.channel      = channel1

tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink 

tier1.sinks.sink1.topic = flume_out 

tier1.sinks.sink1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,27.0.0.1:9094

tier1.sinks.sink1.requiredAcks = 1 

tier1.sinks.sink11.batchSize = 20 

配置完成更新配置重新启动Agent

 

 

 

这个是接收到rabbitMq消息

 

 

大功告成,如果配置中有疑问的可以留言,我看到后会回复

以上是关于Flume 读取RabbitMq消息队列消息,并将消息写入kafka的主要内容,如果未能解决你的问题,请参考以下文章

求助,rabbitmq读取消息队列的问题

Apache Flume 每小时推出 HDFS 文件

从RabbitMQ队列中读取大量消息的最佳方法是什么?

获取 RabbitMQ 队列中的消息数

RabbitMQ - Apache Camel 读取消息如何处理失败的消息

消息队列之RabbitMQ