Flume高可用+断点续传
Posted gegepahei
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume高可用+断点续传相关的知识,希望对你有一定的参考价值。
Flume高可用集群
工欲善其事,必先利其器。
感谢以下博主:
https://www.cnblogs.com/qingyunzong/p/8994494.html
https://blog.csdn.net/peng_0129/article/details/80793440
https://blog.csdn.net/suojie123/article/details/86577935
https://blog.csdn.net/kelong_xhu/article/details/42677045
https://blog.csdn.net/johnnychu/article/details/82780521
flume简介
官网:http://flume.apache.org/
打开官网【经翻译】
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
Flume是一种分布式的、可靠的、可用的服务,用于有效地收集、聚合和移动大量的日志数据。它具有简单灵活的基于数据流的体系结构。它具有健壮性和容错性,具有可调的可靠性机制和许多故障转移和恢复机制。它时一个使用一个简单的可扩展数据模型,允许在线分析应用程序。
【附模型】
flume作为cloudera开发的实时日志收集系统,收到业界的普遍认可和广泛应用。flume按新老来分可分为2个版本:Flume OG【0.94及之前】和Flume NG。 并且重构后的Flume NG纳入了apache旗下,更名为Apache Flume, 目前广泛使用的都是Flume NG即Apache Fluem。
而且目前flume只支持Linux启动。
Flume特点:
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
flume的数据流由事件(Event)贯穿始终。event是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
(1)flume的可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。
(2)flume的可恢复性
还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。
核心概念:
Client:Client生产数据,运行在一个独立的线程。
Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
Flow: Event从源点到达目的点的迁移的抽象。
Agent: 一个独立的Flume进程,包含组件Source、 Channel、 Sink。(Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。)
Source: 数据收集组件。(source从Client收集数据,传递给Channel)
Channel: 中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接 sources 和 sinks ,这个有点像一个队列。)
Sink: 从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent(如果有的话)(Sink从Channel收集数据,运行在一个独立线程。)
体系结构:
Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。
Flume source:
1、Avro Source
监听Avro端口,从Avro client streams接收events。
属性
【必需设置】
channels:【不表】
type:组件类型名称,需要时avro
bind:监听的主机名或ip地址
port:绑定的端口号
【选填设置】
threads:生成的工作现成的最大数量
interceptors:空格分隔的列表拦截器
a1.sources.s1.channels = c1 a1.sources.s1.type = avro a1.sources.s1.bind = 192.168.123.102 a1.sources.s1.port = 6666
2、Thrift Source:与Avro基本一致
a1.sources.s1.channels = c1 a1.sources.s1.type = thrift a1.sources.s1.bind = 192.168.123.102 a1.sources.s1.port = 6666
3、Exec Source
ExecSource的配置就是设定一个Unix(linux)命令,然后通过这个命令不断输出数据。如果进程退出,Exec Source也一起退出,不会产生进一步的数据。
channels:【不表】
type:exec
command:【shell invocation】
a1.sources.s1.type = exec a1.sources.s1.command = tail -F /home/hadoop/logs/test.log a1.sources.s1.channels = c1
4、JMS Source
从JMS系统(消息、主题)中读取数据
channels:不表
type:JMS
initialContextFactory:org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory:
providerURL:JMS系统URL
destinationName:desctination name
destinationType:desctination type(queue or topic)
示例:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = jms a1.sources.r1.channels = c1 a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory a1.sources.r1.connectionFactory = GenericConnectionFactory a1.sources.r1.providerURL = tcp://mqserver:61616 a1.sources.r1.destinationName = BUSINESS_DATA a1.sources.r1.destinationType = QUEUE
5、Spooling Directory Source:
Spooling Directory Source监测配置的目录下新增的文件,并将文件中的数据读取出来。
拷贝到spool目录下的文件不可以再打开编辑;
spool目录下不可包含相应的子目录。(作为对日志的准实时监控)
channels:
type:spooldir
示例:
a1.channels = ch-1 a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true
Flume Channel:
Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.
Channels是events在agent上进行的存储库。Source添加events,Sink移除events。
1、Memory Channel(内存)
events存储在配置最大大小的内存队列中。对于流量较高和由于agent故障而准备丢失数据的流程来说,这是一个理想的选择。
官方示例:
a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
2、File Channel
type:file
checkPointDir:
dataDirs:
示例:
a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /mnt/flume/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data
3、JDBC Channel
events存储在持久化存储库中(其背后是一个数据库)。JDBC channel目前支持嵌入式Derby。这是一个持续的channel,对于可恢复性非常重要的流程来说是理想的选择。
官方示例:
a1.channels = c1
a1.channels.c1.type = jdbc
4、Kafka Channel
events存储在Kafka集群中。Kafka提供高可用性和高可靠性,所以当agent或者kafka broker 崩溃时,events能马上被其他sinks可用。
Kafka channel可以被多个场景使用:
Flume source和sink - 它为events提供可靠和高可用的channel
Flume source和interceptor,但是没sink - 它允许写Flume evnets到Kafka topic
Flume sink,但是没source - 这是一种低延迟,容错的方式从Kafka发送events到Flume sinks 例如 HDFS, HBase或者Solr
type:org.apache.flume.channel.kafka.KafkaChannel
kafak.bootstrap.servers:kafka集群的broker-list【List of brokers in the Kafka cluster】
kafka.topic:flume-channel
示例:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer
以上几种较为常用,尤其时1和2,还有更多类型。可以到官网学习。
Flume Sinks:
1、Avro sink
Flume events发送到sink,转换为Avro events,并发送到配置好的hostname/port。从配置好的channel按照配置好的批量大小批量获取events
channel:
type:avro
hostname:The hostname or IP address to bind to
port:The port # to listen on
官方示例:【表示监听10.10.10.10主机的4545端口】
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545
2、DHFS Sink
该sink把events写进Hadoop分布式文件系统(HDFS)。它目前支持创建文本和序列文件。它支持在两种文件类型压缩。文件可以基于数据的经过时间或者大小或者事件的数量周期性地滚动。它还通过属性(如时间戳或发生事件的机器)把数据划分为桶或区。
channel:
tyoe:hdfs
hdfs。path:hdfs://namenode/flume/webdata/【HDFS directory path】
hdfs.filePrefix:【FlumeData】Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix:Suffix to append to file
官方示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute
The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.
3、File Roll Sink
在本地文件系统存储的events
channel:
type:file_roll
sink.directory:【The directory where files will be stored】存储路径
官网示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume
4、Hive Sink:
该sink streams 将包含分割文本或者JSON数据的events直接传送到Hive表或分区中。使用Hive 事务写events。当一系列events提交到Hive时,它们马上可以被Hive查询到。
channel: type:hive hive.metastore:Hive metastore URI (eg thrift://a.b.com:9083 ) hive.database:Hive database name hive.table:Hive table name serializer:【Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON】
--------------------------------------------------
hive.partition:
以hive作为sink注意事项:
1、Hive表必须cluster by bucket
2、Hive表需要stored as orc【大数据:Hive - ORC 文件存储格式 - ^_TONY_^ - 博客园】
3、在Flume配置的Hive 列名必须都为小写字母
4、hive是事物表且分区或分桶
5、依赖问题【两个依赖导入flume的lib目录】
hive-hcatalog-core-2.1.1-cdh6.1.0.jar
hive-hcatalog-streaming-2.1.1-cdh6.1.0.jar
hive表创建实例【官网】:
create table weblogs ( id int , msg string ) partitioned by (continent string, country string, time string) clustered by (id) into 5 buckets stored as orc;
flume sink 配置示例【官网】:
a1.channels = c1 a1.channels.c1.type = memory a1.sinks = k1 a1.sinks.k1.type = hive a1.sinks.k1.channel = c1 a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083 a1.sinks.k1.hive.database = logsdb a1.sinks.k1.hive.table = weblogs a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M a1.sinks.k1.useLocalTimeStamp = false a1.sinks.k1.round = true a1.sinks.k1.roundValue = 10 a1.sinks.k1.roundUnit = minute a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = " " a1.sinks.k1.serializer.serdeSeparator = ‘ ‘ #表字段 a1.sinks.k1.serializer.fieldnames =id,,msg #批量提交到hive表的条数 a1.sinks.k1.batchSize=10000
hive表实例:
create table flume_user( user_id int ,user_name string ,age int ) clustered by (user_id) into 2 buckets stored as orc tableproperties("transactional"=‘true‘) set hive.support.concurrency=true set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; select * from flume_user
flume hive sink配置实例:
a1.sources=r1 a1.channels=c1 a1.sinks=s1 a1.sources.r1.type=netcat a1.sources.r1.bind=master a1.sources.r1.port=44444 a1.sinks.s1.type=hive a1.sinks.s1.hive.metastore=thrift://master:9083 a1.sinks.s1.hive.database=bd14 a1.sinks.s1.hive.table=flume_user a1.sinks.s1.serializer=DELIMITED a1.sinks.s1.serializer.delimiter=" " a1.sinks.s1.serializer.serdeSeparator=‘ ‘ a1.sinks.s1.serializer.fieldnames=user_id,user_name,age a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
5、HBaseSinks / HBase2Sink【2种不同的sink类型,这里把配置sink.type写在了一起】
【https://cloud.tencent.com/developer/article/1025431:利用Flume 汇入数据到HBase:Flume-hbase-sink 使用方法详解】
该sink写数据到HBase
channel:
type: hbase
table:The name of the table in Hbase to write to
columnFamily:The column family in Hbase to write to
zookeeperQuorum:The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
官网示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hbase #a1.sinks.k1.type = hbase a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer #a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.sinks.k1.channel = c1
6、ElasticSearchSink
该sink写数据到elasticsearch集群
channel:
type:org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostname:ES-cluster:9300
indexName:索引名
indexType:索引类型
clusterName:【Name of the ElasticSearch cluster to connect to】
7、kafka Sink
Flume Sink实现可以导出数据到一个Kafka topic。
type:org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers:List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
kafka topic:The topic in Kafka to which the messages will be published
官方示例:
a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy
8、Custom Sink
自定义sink是你实现Sink接口。当启动Flume agent时,一个自定义sink类和它依赖项必须在agent的classpath中。
官方示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = org.example.MySink a1.sinks.k1.channel = c1
还有更多的sink类型,如:HTTP Sink、Kite Dataset Sink、MorphlineSolrSink、AsyncHBaseSink
Flume部署:
1、单一流程
配置示例【这里拿了官网的示例】
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2、多代理流程【多个agent顺序链接】
从tail命令获取数据发送到avro端口
另一个节点可配置一个avro源来中继数据,发送外部存储
#tail-avro.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/log/test.log a1.sources.r1.channels = c1 # //Describe the sink # //绑定的不是本机, 是另外一台机器的服务地址, # //sink端的avro是一个发送端, avro的客户端, 往shizhan02这个机器上发 a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = shizhan02 a1.sinks.k1.port = 4141 a1.sinks.k1.batch-size = 2 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
从avro端口接收数据,下沉到logger【shizhan02】
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #source中的avro组件是接收者服务, 绑定本机 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3、流合并【扇入】【多个agent汇聚到一个agent】
agent配置:将agent配置复制到不同的服务器上
#agent1表示代理名称
a1.sources=source1 a1.sinks=sink1 a1.channels=c1 #配置source1 a1.sources.source1.type = spooldir a1.sources.source1.spoolDir=/data/hadoop/spooldata a1.sources.source1.channels=c1 a1.sources.source1.fileHeader = true a1.sources.source1.interceptors =i1 i2 i3 a1.sources.source1.interceptors.i1.type = timestamp a1.sources.source1.interceptors.i2.type = host a1.sources.source1.interceptors.i2.hostHeader = hostname a1.sources.source1.interceptors.i3.type = static a1.sources.source1.interceptors.i3.key = app a1.sources.source1.interceptors.i3.value = AOTA #配置sink1 a1.sinks.sink1.type =avro a1.sinks.sink1.channel = c1 a1.sinks.sink1.hostname=172.26.50.24 a1.sinks.sink1.port=10001 #配置channel1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 collector配置 #agent1表示代理名称 a1.sources=source1 a1.sinks=sink1 a1.channels=c1 #配置source1 a1.sources.source1.type = avro a1.sources.source1.channels = c1 a1.sources.source1.bind = 172.26.50.24 a1.sources.source1.port = 10001 #a1.sources.source1.type = spooldir #a1.sources.source1.spoolDir=/data/hadoop/tmp/flume #a1.sources.source1.channels=c1 #a1.sources.source1.fileHeader = true #a1.sources.source1.interceptors =i1 #a1.sources.source1.interceptors.i1.type = timestamp #配置sink1 #a1.sinks.sink1.type =avro #a1.sinks.sink1.hostname=172.26.50.24 #a1.sinks.sink1.port=10001 a1.sinks.sink1.channel = c1 a1.sinks.sink1.type = hdfs a1.sinks.sink1.hdfs.path = hdfs://172.26.50.24:9000/log_in/flume/%Y%m%d a1.sinks.sink1.hdfs.filePrefix = %{app}_%{hostname}_%y%m%d a1.sinks.sink1.hdfs.fileSuffix = .log a1.sinks.sink1.hdfs.rollSize = 1048576 a1.sinks.sink1.hdfs.rollCount = 0 a1.sinks.sink1.hdfs.batchSize = 1500 a1.sinks.sink1.hdfs.round = true a1.sinks.sink1.hdfs.roundUnit = minute a1.sinks.sink1.hdfs.threadsPoolSize = 25 a1.sinks.sink1.hdfs.useLocalTimeStamp = true a1.sinks.sink1.hdfs.minBlockReplicas = 1 a1.sinks.sink1.fileType = SequenceFile a1.sinks.sink1.writeFormat = TEXT a1.sinks.sink1.rollInterval = 0 #配置channel1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
会有日志数据顺序的问题??
4、多路复用【扇出】【多级流:一个agent对应多个channel】
多路复用:当syslog, java, nginx、 tomcat等混合在一起的日志流开始流入一个agent后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。
多路复制:source被发送到所有三个channel中。
选择器:Selecttor.type 决定多路取数据的规则
Replication 多路复制
Multiplexing 多路复用
Custom 自定义【自定义方案待调研】
多路复用配置【根据event的header决定传输到不同的channal】
#如果文件名1_txt 传输到logger 如果是2_txt 传输到本地 a1.sources = s1 a1.channels = c1 c2 a1.sinks = k1 k2 a1.sources.s1.type = spooldir a1.sources.s1.channels = c1 c2 a1.sources.s1.spoolDir = /home/wangfutai/a/flume/logs a1.sources.s1.basenameHeader = true a1.sources.s1.basenameHeaderKey = myselect #添加选择器 多路复用 a1.sources.s1.selector.type= multiplexing #复用规则 selector.header 头名称 mapping.变量名 #mapping后的变量名 就是 每个event 中header 值 # basename=2_txt a1.sources.s1.selector.header = myselect a1.sources.s1.selector.mapping.1_txt= c1 a1.sources.s1.selector.mapping.2_txt= c2 a1.sources.s1.selector.default= c1 c2 #配置c1 内存 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 #配置c2 本地磁盘 a1.channels.c2.type = file a1.channels.c2.checkpointDir = /home/wangfutai/a/flume/checkpoint a1.channels.c2.dataDirs = /home/wangfutai/a/flume/data a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /home/wangfutai/a/flume/netlogs #本地回滚时间,每隔60s自动生成一个新的文本. a1.sinks.k1.sink.rollInterval = 60 a1.sinks.k1.sink.pathManager.prefix = network #配置 sinks a1.sinks.k2.type = logger #注意:一个sinks只能设置一个channel a1.sinks.k2.channel = c2
多路复制【同一个source数据复制到不同的channal】
agent:
#2个channel和2个sink的配置文件 # Name the components on this agent a1.sources = s1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.s1.type = netcat a1.sources.s1.bind = 192.168.191.201 a1.sources.s1.port = 44444 # 指定source进行扇出到多个channnel的规则 a1.sources.s1.selector.type = replicating a1.sources.s1.channels = c1 c2 # Use a channel which buffers events in memory # 指定channel c1 a1.channels.c1.type = memory # 指定c2的channel a1.channels.c2.type = memory # Describe the sink # 指定k1的设置 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.191.202 a1.sinks.k1.port = 44445 # 指定k2的 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 192.168.191.203 a1.sinks.k2.port = 44445
2个collector配置:
a1.sources=s1 a1.channels=c1 a1.sinks=k1 #设定sources 数据源监听本机的44444端口 a1.sources.s1.channels=c1 a1.sources.s1.type=avro #指定ip或者主机名 a1.sources.s1.bind=192.168.191.202 #指定需要监控的端口 a1.sources.s1.port=44445 #channel a1.channels.c1.type=memory #sinks 写出数据 logger a1.sinks.k1.channel=c1 a1.sinks.k1.type=logger a1.sources=s1 a1.channels=c1 a1.sinks=k1 #设定sources 数据源监听本机的44444端口 a1.sources.s1.channels=c1 a1.sources.s1.type=avro #指定ip或者主机名 a1.sources.s1.bind=192.168.191.203 #指定需要监控的端口 a1.sources.s1.port=44445 #channel a1.channels.c1.type=memory #sinks 写出数据 logger a1.sinks.k1.channel=c1 a1.sinks.k1.type=logger
自定义:【待调研】
5、load balance【负载均衡:高可用】
Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,当其中的一个agent失去服务时,agent1会自动切换【maxTimeOut】到相同sinkgroups中的一个,保证数据不丢失。
agent配置:引入sinkgroups
# Name the components on this agent a1.sources = r1 a1.channels = c1 a1.sinkgroups = g1 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # round轮训 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
同一个sinkgroup的collector配置:
collector1
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = logger # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 collector2配置: # Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
Flume的安装:
1、下载
http://mirrors.hust.edu.cn/apache/
http://flume.apache.org/download.html
2、安装
Flume框架对hadoop和zookeeper的依赖只是在jar包上,并不要求flume启动时必须将hadoop和zookeeper服务也启动。
上传服务器并解压
tar -zxvf apache-flume-1.8.0-bin.tar.gz -C apps/
创建软连接
ln -s apache-flume-1.8.0-bin/ flume
修改配置文件
cp flume-env.sh.template flume-env.sh
export JAVA_HOME=/usr/local/jdk1.8.0_73
配置环境变量
vi ~/.bashrc
export FLUME_HOME=/home/hadoop/apps/flume
export PATH=$PATH:$FLUME_HOME/bin
使配置生效
source ~/.bashrc
验证【查看版本】
flume-ng version
Flume断点续传
https://blog.csdn.net/Abysscarry/article/details/89420560
1、官方现成版
条件:Flume版本1.7.0及其后的版本
来自官网:
Taildir Source
Note
This source is provided as a preview feature. It does not work on Windows.
Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.
This source is reliable and will not miss data even when the tailing files rotate. It periodically writes the last read position of each files on the given position file in JSON format. If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.
In other use case, this source can also start tailing from the arbitrary position for each files using the given position file. When there is no position file on the specified path, it will start tailing from the first line of each files by default.
Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.
This source does not rename or delete or do any modifications to the file being tailed. Currently this source does not support tailing binary files. It reads text files line by line.
大概的意思是:提供的这个新的source源,会实时记录采集文件的position记录,每次重新采集可以从该记录中读取上一次的position,并从该position开始读取,从而解决断点续传的问题。
先来看下配置吧
# 先看下官网提供配置示例的【没有给出sink】 a1.sources = r1 a1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /var/log/flume/taildir_position.json # 文件组的空格分隔列表。每个文件组表示要跟踪的一组文件 a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log #这个header不知道怎么使用呢 a1.sources.r1.headers.f1.headerKey1 = value1 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 # 是否添加存储绝对路径文件名的头文件。 a1.sources.r1.fileHeader = true a1.sources.ri.maxBatchCount = 1000
以下是完整的配置实例:
pro.sources = s1 pro.channels = c1 pro.sinks = k1 pro.sources.s1.type = TAILDIR #位置信息保存的文件 pro.sources.s1.positionFile = /home/dev/flume/flume-1.8.0/log/taildir_position.json pro.sources.s1.filegroups = f1 pro.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.log pro.sources.s1.headers.f1.headerKey1 = aaa pro.sources.s1.fileHeader = true pro.channels.c1.type = memory pro.channels.c1.capacity = 1000 pro.channels.c1.transactionCapacity = 100 pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink pro.sinks.k1.kafka.topic = moercredit_log_test pro.sinks.k1.kafka.bootstrap.servers = cdh1:9092,cdh2:9092,cdh3:9092 pro.sinks.k1.kafka.flumeBatchSize = 20 pro.sinks.k1.kafka.producer.acks = 1 pro.sinks.k1.kafka.producer.linger.ms = 1 pro.sinks.k1.kafka.producer.compression.type = snappy pro.sources.s1.channels = c1 pro.sinks.k1.channel = c1
其中pro.sources.s1.positionFile = /home/dev/flume/flume-1.8.0/log/taildir_position.json 内容
其实就是个json array,每个采集文件对应一个数组元素,每个元素包含三个属性:inode(文件唯一标识号码)、
pos(被采集文件的最后采集位置,也就是文件的byte字节数)、
file(被采集文件的绝对路径)
通过以上配置,再读取的文件不变【没有新建等】情况下,保证消费数据的断点续传及重复消费,如果被读取的文件被重命名和重新创建后,依然可以保证读取新写入的文件内容。但如果Flume进程挂掉,同时文件又被重新创建并写入数据,怎无法保证续传的问题。
??看到过log4j支持不太好的博客,因为log4j日志时自动切分日志文件,且会重命名新的文件??(https://blog.csdn.net/mcj1314bb/article/details/78850748 通过源码来支持文件重命名的博客。)
2、tail命令版
首先明确以下tail命令的-f/-F参数问题:【囧:之前一直不知道】
tail -f 当文件变了,不会再输出
tail -F当文件变了,还会再输出 【使用这个实现续传】
只需改动配置文件中command命令:
tail -n +$(tail -n1 /root/log) -F /root/data/nginx.log | awk ‘ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "/root/log";fflush("")}‘ /root/log
实现思路大概是,把读取的文件的位置【行数】实时写入一个指定的文件中,在启动flume时会先获取上次读取到的行数,然后从该文件的这一行开始消费。
路过~shell命令真心很渣~看了也不懂~
3、大神NB版【未亲测】
https://baijiahao.baidu.com/po/feed/share?wfr=spider&for=pc&context=%7B%22sourceFrom%22%3A%22bjh%22%2C%22nid%22%3A%22news_3433179683779105534%22%7D
自己修改Flume source源码!!!基本实在官方现成版的基础上对Taildir Source组件修改和重新编译。
下面是我寄几梳理的【当然大部分都是别人的】
1、下载apache-flume-1.7.0源码
2、修改Taildir Source组件中的类代码
apache-flume-1.7.0-srcflume-ng-sourcesflume-taildir-sourcesrcmainjavaorgapacheflumesource aildir
ReliableTaildirEventReader.java类
2.1、修改ReliableTaildirEventReader构造方法里面的updateTailFiles(skipToEnd)方法。
修改如下代码:
if(tf==null||!tf.getPath().equals(f.getAbsolutePath())) { //skipToEnd如果没有记录读取位置时,是否跳过文件结尾,默认false longstartPos=skipToEnd?f.length():0; //根据f具体文件生成TailFile tf=openFile(f,headers,inode,startPos); }
修改为:
if(tf==null) //去掉了绝对路径判断
科普一下:
Unix/linux系统内部不使用文件名,而使用inode号码来识别文件。对于系统来说,文件名只是inode号码便于识别的别称或者绰号【Inode:每个文件的唯一标识(即使文件名称改变,此标识也不会变)】。表面上,用户通过文件名,打开文件。实际上,系统内部这个过程分成三步:
首先,系统找到这个文件名对应的inode号码;
其次,通过inode号码,获取inode信息;
最后,根据inode信息,找到文件数据所在的block,读出数据。
/所以就算文件名相同,内部的inode号码也是不同的,所以不能通过绝对路径找到之前的文件
由于inode号码与文件名分离,这种机制导致了一些Unix/Linux系统特有的现象
1. 有时,文件名包含特殊字符,无法正常删除。这时,直接删除inode节点,就能起到删除文件的作用。
2. 移动文件或重命名文件,只是改变文件名,不影响inode号码。
3. 打开一个文件以后,系统就以inode号码来识别这个文件,不再考虑文件名
2.2、修改ReliableTaildirEventReader构造方法里面的loadPositionFile(positionFilePath)方法
修改如下代码:
if(tf!=null&&tf.updatePos(path,inode,pos)){ //更新inode与tf映射 tailFiles.put(inode,tf); //修改为: //将updatePos方法中的path修改为了tf.getPath(),强制修改前的文件路径跟修改后的文件路径一致。 if(tf!=null&&tf.updatePos(tf.getPath(),inode,pos)){
2.3、Flume编译打包
mvnpackage对上述两个模块进行源码编译生成flume-taildirsource.jar
将flume-taildirsource.jar上传到flumelib目录下
以上是关于Flume高可用+断点续传的主要内容,如果未能解决你的问题,请参考以下文章
高并发/高可用/哨兵机制/集群模式/高可用与主备切换/主从复制/断点续传
高并发/高可用/哨兵机制/集群模式/高可用与主备切换/主从复制/断点续传