使用flume在两个单独的表中写入hive仓库目录中的数据
Posted
技术标签:
【中文标题】使用flume在两个单独的表中写入hive仓库目录中的数据【英文标题】:Writing data in hive warehouse directory in two separate tables using flume 【发布时间】:2018-11-13 16:12:40 【问题描述】:我想将数据写入 hive 仓库目录中的两个单独的表中,分别称为 flumemaleemployee
和 flumefemaleemployee
。最后3 records
应该插入female
表中,上3 records
应该插入male
表中。下面是我的数据:
1,alok,mumbai
1,jatin,chennai
1,yogesh,kolkata
2,ragini,delhi
2,jyotsana,pune
1,valmiki,banglore
下面是我的flume
conf
代码:
agent.sources = tailsrc
agent.channels = mem1 mem2
agent.sinks = stdl std2
agent.sources.tailsrc.type = exec
agent.sources.tailsrc.command = tail -F /home/cloudera/Desktop/in.txt
agent.sources.tailsrc.batchSize = 1
agent.sources.tailsrc.interceptors = i1
agent.sources.tailsrc.interceptors.i1.type = regex_extractor
agent.sources.tailsrc.interceptors.il.regex = A(\\d
agent.sources.tailsrc. interceptors. M.serializers = t1
agent.sources.tailsrc. interceptors, i1.serializers.t1. name = type
agent.sources.tailsrc.selector.type = multiplexing
agent.sources.tailsrc.selector.header = type
agent.sources.tailsrc.selector.mapping.1 = mem1
agent.sources.tailsrc.selector.mapping.2 = mem2
agent.sinks.std1.type = hdfs
agent.sinks.stdl.channel = mem1
agent.sinks.stdl.batchSize = 1
agent.sinks.std1.hdfs.path = /user/hive/warehouse/aisehibanayatp.db/flumemaleemployee
agent.sinks.stdl.rolllnterval = 0
agent.sinks.stdl.hdfs.fileType = DataStream
agent.sinks.std2.type = hdfs
agent.sinks.std2.channel = mem2
agent.sinks.std2.batchSize = 1
agent.sinks.std2.hdfs.path = /user/hi ve/warehouse/aisehibanayatp.db/flumefemaleemployee
agent.sinks.std2.rolllnterval = 0
agent.sinks.std2.hdfs.fileType = DataStream
agent.channels.mem1.type = memory
agent.channels.meml.capacity = 100
agent.channels.mem2.type = memory
agent.channels.mem2.capacity = 100
agent.sources.tailsrc.channels = mem1 mem2
我没有收到任何错误,但是当我使用以下命令启动 flume
service
时,它只是卡在了一些我不知道如何处理的地方,因为我没有收到任何错误
flume-ng agent --name agent -conf-file /home/cloudera/Desktop/flume1.config
它卡在下面的步骤:
18/11/13 08:03:00 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: mem2. channel.event.take.success == 0
18/11/13 08:03:00 INFO node.Application: Starting new configuration: sourceRunners: sinkRunners:std2=SinkRunner: policy:org.apache.flume.sink.DefaultSinkProcessor@17ade71c counterGroup: name:null counters: channels:mem2=org.apache.flume.channel.MemoryChannelname: mem2
18/11/13 08:03:00 INFO node.Application: Starting Channel mem2
18/11/13 08:03:00 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: mem2 started
18/11/13 08:03:00 INFO node.Application: Starting Sink std2
18/11/13 08:03:00 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: std2: Successfully registered new MBean.
18/11/13 08:03:00 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: std2 started
那么我该如何实现呢??
【问题讨论】:
不确定我是否理解正则表达式。它不应该基于第一个 col 的值,1 还是 2? 乍一看还不错 是的,它不会抛出任何错误 我的第一条评论?你能找到输出文件吗? 我认为没有错误 【参考方案1】:问题是拼写错误,缺少格式和空格,而 l 而不是 1。我设法解决了这些问题,它运行了,我改变了你的正则表达式,你可以调整它,但大部分是准确性问题。如下使用该文件,它可以工作,当然是您自己的 HDFS 和设置:
agent.sources = tailsrc
agent.channels = mem1 mem2
agent.sinks = std1 std2
agent.sources.tailsrc.type = exec
agent.sources.tailsrc.command = tail -F /home/cloudera/in.txt
agent.sources.tailsrc.batchSize = 1
agent.sources.tailsrc.interceptors = i1
agent.sources.tailsrc.interceptors.i1.type = regex_extractor
agent.sources.tailsrc.interceptors.i1.regex = ^.*(1|2)
agent.sources.tailsrc.interceptors.i1.serializers = t1
agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type
agent.sources.tailsrc.selector.type = multiplexing
agent.sources.tailsrc.selector.header = type
agent.sources.tailsrc.selector.mapping.1 = mem1
agent.sources.tailsrc.selector.mapping.2 = mem2
agent.sinks.std1.type = hdfs
agent.sinks.std1.channel = mem1
agent.sinks.std1.batchSize = 1
agent.sinks.std1.hdfs.path = hdfs://quickstart.cloudera:8020/user/hive/warehouse/flumemaleemployee
agent.sinks.std1.rolllnterval = 0
agent.sinks.std1.hdfs.fileType = DataStream
agent.sinks.std2.type = hdfs
agent.sinks.std2.channel = mem2
agent.sinks.std2.batchSize = 1
agent.sinks.std2.hdfs.path = hdfs://quickstart.cloudera:8020/user/hive/warehouse/flumefemaleemployee
agent.sinks.std2.rolllnterval = 0
agent.sinks.std2.hdfs.fileType = DataStream
agent.channels.mem1.type = memory
agent.channels.meml.capacity = 100
agent.channels.mem2.type = memory
agent.channels.mem2.capacity = 100
agent.sources.tailsrc.channels = mem1 mem2
【讨论】:
如果我遵循文档并且如果我只对源代码使用强制属性。就像“exec”一样,唯一的强制属性是类型和命令。那么它也适用于这两个属性吗?因为我不明白这里其他属性的用途 是的,它有效,但如果你能用外行术语解释这些属性,特别是正则表达式部分,这将非常有帮助,以便将来我可以解决这些问题 它跑了,对我来说已经结束了。工作要做 好的,只要你有时间,你会帮我理解这些属性吗? 好的,但该示例是从获取内容的文件中读取,并从行首查找 1 或 2 个并拆分为 2 个文件。我会给你一个链接阅读正则表达式以上是关于使用flume在两个单独的表中写入hive仓库目录中的数据的主要内容,如果未能解决你的问题,请参考以下文章
Hive:Spark中如何实现将rdd结果插入到hive1.3.1表中
大数据数据仓库-基于大数据体系构建数据仓库(Hive,Flume,Kafka,Azkaban,Oozie,SparkSQL)