使用flume将csv文件传输到hdfs,并将它们转换为avro
Posted
技术标签:
【中文标题】使用flume将csv文件传输到hdfs,并将它们转换为avro【英文标题】:Transfering csv files into hdfs, with converting them to avro, using flume 【发布时间】:2017-01-27 15:49:14 【问题描述】:我是大数据的新手,我的任务是使用 Flume 将 csv 文件传输到 HDFS,但它也应该将这些 csv 转换为 avro。我尝试使用以下水槽配置来做到这一点:
a1.channels = dataChannel
a1.sources = dataSource
a1.sinks = dataSink
a1.channels.dataChannel.type = memory
a1.channels.dataChannel.capacity = 1000000
a1.channels.dataChannel.transactionCapacity = 10000
a1.sources.dataSource.type = spooldir
a1.sources.dataSource.spoolDir = spool_dir
a1.sources.dataSource.fileHeader = true
a1.sources.dataSource.fileHeaderKey = file
a1.sources.dataSource.basenameHeader = true
a1.sources.dataSource.basenameHeaderKey = basename
a1.sources.dataSource.interceptors.attach-schema.type = static
a1.sources.dataSource.interceptors.attach-schema.key = flume.avro.schema.url
a1.sources.dataSource.interceptors.attach-schema.value = path_to_schema_in_hdfs
a1.sinks.dataSink.type = hdfs
a1.sinks.dataSink.hdfs.path = sink_path
a1.sinks.dataSink.hdfs.format = text
a1.sinks.dataSink.hdfs.inUsePrefix = .
a1.sinks.dataSink.hdfs.filePrefix = drone
a1.sinks.dataSink.hdfs.fileSuffix = .avro
a1.sinks.dataSink.hdfs.rollSize = 180000000
a1.sinks.dataSink.hdfs.rollCount = 100000
a1.sinks.dataSink.hdfs.rollInterval = 120
a1.sinks.dataSink.hdfs.idleTimeout = 3600
a1.sinks.dataSink.hdfs.fileType = DataStream
a1.sinks.dataSink.serializer = avro_event
带有flume默认架构的avro文件的输出。我也尝试使用AvroEventSerializer
,但我遇到了很多不同的错误,我解决了所有错误,除了这个:
ERROR hdfs.HDFSEventSink: process failed
java.lang.ExceptionInInitializerError
at org.apache.hadoop.hdfs.DFSOutputStream.computePacketChunkSize(DFSOutputStream.java:1305)
at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1243)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1266)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1101)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1059)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:232)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:75)
感谢您的帮助。
【问题讨论】:
【参考方案1】:抱歉配置中的错误。我修复了它们并找到了将 css 转换为 avro 的方法。我这样修改了AvroEventSerializer
:
public void write(Event event) throws IOException
if (dataFileWriter == null)
initialize(event);
String[] items = new String(event.getBody()).split(",");
city.put("deviceID", Long.parseLong(items[0]));
city.put("groupID", Long.parseLong(items[1]));
city.put("timeCounter", Long.parseLong(items[2]));
city.put("cityCityName", items[3]);
city.put("cityStateCode", items[4]);
city.put("sessionCount", Long.parseLong(items[5]));
city.put("errorCount", Long.parseLong(items[6]));
dataFileWriter.append(citi);
这里是city
定义:
private GenericRecord city = null;
如果您知道更好的方法,请回复
【讨论】:
以上是关于使用flume将csv文件传输到hdfs,并将它们转换为avro的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Flume 在源上执行预处理并将真实文件名保留在 hdfs 接收器中
通过 Apache Flume 将日志文件从本地文件系统移动到 HDFS 时出错
如何使用 Flume 按年和月对 txt/csv 文件中的数据进行分区?是不是可以使 HDFS 路径动态化?