flume学习

Posted 厨 神

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume学习相关的知识,希望对你有一定的参考价值。

备忘录
官方网站

在linux的flume目录下,有一个docs文件夹,里面是一整个当前flume版本的引导文件,可以查看flume所有的属性

hadoop在$HADOOP_HOME/share/doc/hadoop
flume在$HADOOP_HOME/docs
sqoop在$SQOOP_HOME/docs
spark在$SPARK_HOME/R/lib/SparkR

1.flume的html文件在命令行打开

在terminal中,用命令快速调用浏览器打开html文件,而非通过鼠标点击的方式。

#直接see
see docs/FlumeUserGuide.html
#firefox
firefox docs/FlumeUserGuide.html
#google
chromium-browser docs/FlumeUserGuide.html

2.flume的html文件在vscode打开

把这个文件scp到ubuntu或者windows的vscode里,vscode安装运行插件
最后在需要运行的html的文件上右击选择open in other browers或者Alt+Shift+B
即可浏览器打开flume指导文件,查看flume各种属性

a1.sources = r1
a1.sinks = k1
a1.channels = c1

一、sources

1.netcat输入

socket源

# nc -lk 44444提前开启
#telnet localhost 44444也可
#bind表示指定地址 port指定端口
a1.sources.r1.type = netcat 
#或者netcatudp /syslogtcp /avro/thrift/multiport_syslogtcp/syslogudp
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#multiport_syslogtcp的时候可以.ports = 10001 10002 10003

avro输入

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

thrift

a1.sources.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

exec

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
=========================
sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

spooldir

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
a1.sources.r1.fileHeader = true

taildir

a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 =example-
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = example-
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

kafka

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$

seq

a1.sources.r1.type = seq

multiport_syslogtcp

a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port

http

a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300

legacy

a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
#org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666

自定义

a1.sources.r1.type = org.example.MySource

二、channels

1.memory管道

# capacity必须比transactionCapacity大,因为transactionCapacity包含于capacity。
#根据数据量的传输多少变。capacity太小会报错
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

2.jdbc

a1.channels.c1.type = jdbc
a1.channels.k1.jdbc.driver.class =
a1.channels.k1.jdbc.driver.url =
a1.channels.k1.jdbc.db.username =
a1.channels.k1.jdbc.db.password =

kafka

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.channel1.kafka.topic = topic_name
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

file

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

SPILLABLEMEMORY

a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

customer

a1.channels.c1.type = org.example.MyChannel

三、sinks

1.logger输出

a1.sinks.k1.type = logger

hdfs

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

hive

a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%country,%Y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\\t"
a1.sinks.k1.serializer.serdeSeparator = '\\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

logger

a1.sinks.k1.type = logger

avro

a1.sinks.k1.type = avro #/thrift
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

null

a1.sinks.k1.type = null

hbase

a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

asynchbase

a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1

kafka

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

http

a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

customer

a1.sinks.k1.type = org.example.MySink

hbase2

a1.sinks.k1.type = hbase2
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.channel = c1

四、连接

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

source-replicating-channel

a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

source-load_balancing-channel

a1.channels = c1 c2 c3 c4
a1.sources.r1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = load_balancing
a1.sources.r1.selector.policy = round_robin

source-multiplexing-channel

a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

source-customer

a1.sources.r1.selector.type = org.example.MyChannelSelector

sinkgroups-load_balande-sink

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

sinkgroups-failover-Sink

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

sinkgroups-random-sink

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

五、serializer

text

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

avro_event

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

AvroEventSerializer

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc

六、Interceptor

HostInterceptor

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%CollectorHost.%Y-%m-%d
a1.sinks.k1.channel = c1

Timestamp

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

host

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

static

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

morphlineinterceptor

a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

search-replace

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Use grouping operators to reorder and munge words on a line.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

regex_filter

a1.sources.avroSrc.interceptors = regex_filter
regex = ”.*”
excludeEvents = false

RegexExtractor

a1.sources.r1.interceptors.i1.regex = ^(?:\\\\n)?(\\\\d\\\\d\\\\d\\\\d-\\\\d\\\\d-\\\\d\\\\d\\\\s\\\\d\\\\d:\\\\d\\\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

四、启动

bin/flume-ng agent --conf conf --conf-file example.conf --name a1

变量使用

conf文件内NC_PORT 指明变量
运行命令加上 -DNC_PORT=44444

Component InterfaceType AliasImplementation Class
org.apache.flume.Channelmemoryorg.apache.flume.channel.MemoryChannel
org.apache.flume.Channeljdbcorg.apache.flume.channel.jdbc.JdbcChannel
org.apache.flume.Channelfileorg.apache.flume.channel.file.FileChannel
org.apache.flume.Channelorg.apache.flume.channel.PseudoTxnMemoryChannel
org.apache.flume.Channelorg.example.MyChannel
org.apache.flume.Sourceavroorg.apache.flume.source.AvroSource
org.apache.flume.Sourcenetcatorg.apache.flume.source.NetcatSource
org.apache.flume.Sourceseqorg.apache.flume.source.SequenceGeneratorSource
org.apache.flume.Sourceexecorg.apache.flume.source.ExecSource
org.apache.flume.Sourcesyslogtcporg.apache.flume.source.SyslogTcpSource
org.apache.flume.Sourcemultiport_syslogtcporg.apache.flume.source.MultiportSyslogTCPSource
org.apache.flume.Sourcesyslogudporg.apache.flume.source.SyslogUDPSource
org.apache.flume.Sourcespooldirorg.apache.flume.source.SpoolDirectorySource
org.apache.flume.Sourcehttporg.apache.flume.source.http.HTTPSource
org.apache.flume.Sourcethriftorg.apache.flume.source.ThriftSource
org.apache.flume.Sourcejmsorg.apache.flume.source.jms.JMSSource
org.apache.flume.Sourceorg.apache.flume.source.avroLegacy.AvroLegacySource
org.apache.flume.Sourceorg.apache.flume.source.thriftLegacy.ThriftLegacySource
org.apache.flume.Sourceorg.example.MySource
org.apache.flume.Sinknullorg.apache.flume.sink.NullSink
org.apache.flume.Sinkloggerorg.apache.flume.sink.LoggerSink
org.apache.flume.Sinkavroorg.apache.flume.sink.AvroSink
org.apache.flume.Sinkhdfsorg.apache.flume.sink.hdfs.HDFSEventSink
org.apache.flume.Sinkhbaseorg.apache.flume.sink.hbase.HBaseSink
org.apache.flume.Sinkhbase2org.apache.flume.sink.hbase2.HBase2Sink
org.apache.flume.Sinkasynchbaseorg.apache.flume.sink.hbase.AsyncHBaseSink
org.apache.flume.Sinkfile_rollorg.apache.flume.sink.RollingFileSink
org.apache.flume.Sinkircorg.apache.flume.sink.irc.IRCSink
org.apache.flume.Sinkthriftorg.apache.flume.sink.ThriftSink
org.apache.flume.Sinkorg.example.MySink
org.apache.flume.ChannelSelectorreplicatingorg.apache.flume.channel.ReplicatingChannelSelector
org.apache.flume.ChannelSelectormultiplexingorg.apache.flume.channel.MultiplexingChannelSelector
org.apache.flume.ChannelSelectororg.example.MyChannelSelector
org.apache.flume.SinkProcessordefaultorg.apache.flume.sink.DefaultSinkProcessor
org.apache.flume.SinkProcessorfailoverorg.apache.flume.sink.FailoverSinkProcessor
org.apache.flume.SinkProcessorload_balanceorg.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.SinkProcessor
org.apache.flume.interceptor.Interceptortimestamporg.apache.flume.interceptor.TimestampInterceptor$Builder
org.apache.flume.interceptor.Interceptorhostorg.apache.flume.interceptor.HostInterceptor$Builder
org.apache.flume.interceptor.Interceptorstaticorg.apache.flume.interceptor.StaticInterceptor$Builder
org.apache.flume.interceptor.Interceptorregex_filterorg.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.interceptor.Interceptorregex_extractororg.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.channel.file.encryption.KeyProvider$Builderjceksfileorg.apache.flume.channel.file.encryption.JCEFileKeyProvider
org.apache.flume.channel.file.encryption.KeyProvider$Builderorg.example.MyKeyProvider
org.apache.flume.channel.file.encryption.CipherProvideraesctrnopaddingorg.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider
org.apache.flume.channel.file.encryption.CipherProviderorg.example.MyCipherProvider
org.apache.flume.serialization.EventSerializer$Buildertextorg.apache.flume.serialization.BodyTextEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builderavro_eventorg.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builderorg.example.MyEventSerializer$Builder
Alias NameAlias Type
aagent
cchannel
rsource
ksink
gsink group
iinterceptor
ykey
hhost
sserializer

以上是关于flume学习的主要内容,如果未能解决你的问题,请参考以下文章

flume学习一:flume基础知识

Flume学习之路 Flume的基础介绍

Flume学习笔记

Flume学习之路 Flume的Source类型

Flume 使用学习小结

flume学习