将带有换行符的固定长度文本文件作为属性值之一读入 JavaRDD
Posted
技术标签:
【中文标题】将带有换行符的固定长度文本文件作为属性值之一读入 JavaRDD【英文标题】:Read a textfile of fixed length with newline as one of attribute value into a JavaRDD 【发布时间】:2019-07-31 15:47:47 【问题描述】:我有一个宽度为 100 字节的文本文件。以下是结构。我需要读取 JavaRDD 中的数据。
RecType - String 1 Byte
Date - String 8 byte
Productnumber - String 15 byte
TAG - String 11 byte
Filler1 - String 1 byte
Contract - String 11 byte
Code - String 3 byte
Version - String 3 byte
newline - String 1 byte
FILENAME -String 25 byte
Recnumber - String 4 byte
文件中的样本数据
020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00001020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00002020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00003020190718000000000000002CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019051AM00004
如果您注意到每条记录都从一行开始并在下一行结束。从下一个字节开始,下一条记录开始。文件中有4条记录,以字符串020190718开头。
您能帮我如何读取 JavaRDD 中的记录吗?
我在尝试
JavaRDD1 = SparkUtils.getSession().read().textFile(filepath)
javaRDD()
map(x -> return FunctiontoParse(x););
但它一次只考虑一行,而不是读取整条记录。
请帮忙。
【问题讨论】:
【参考方案1】:你可能想see this post. 使用wholeTextFile()
将工作如果一切都很好作为一个字符串。如果您希望它保持二进制,那么您需要将其读取为二进制。我改用了JavaSparkContext.binaryFiles(filepath,numPartitions)
。这会将整个文件读取为字节,并让您根据需要对其进行解析。
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
//from here each file gets on record in the resulting RDD. Each Record is a filename, file_contents pair. Each record has the contents of an entire file.
JavaPairRDD<String, PortableDataStream> rawBinaryInputFiles = jsc.binaryFiles(HDFSinputFolder,numPartitions);
//now to use your function to parse each file. Keep in mind, each record has the contents of an entire file,
//you will need to parse out each record. But since it's fixed width by bytes, it should be pretty simple.
//Create a custom wrapper object to hold the values and populate.
JavaRDD<YourCustomWrapperObject> records = rawBinaryInputFiles.flatMap(new FlatMapFunction<Tuple2<String,PortableDataStream>, YourCustomWrapperObject>()
@Override
public Iterator<YourCustomWrapperObject> call(Tuple2<String, PortableDataStream> t) throws Exception
List<YourCustomWrapperObject> results = new ArrayList<YourCustomWrapperObject>();
byte[] bytes = t._2().toArray(); //convert PortableDataStream to byte array.
//best option here IMO is to create a wrapper object, populate it from the byte array and return it
YourCustomWrapperObject obj = new YourCustomWrapperObject();
//populate....
results.add(obj);
return results;
);
【讨论】:
我对此很陌生。您能否提供一些示例代码。 rawBinaryInputFiles 无法解析数据并且未按预期返回。我使用 readwholefiles 来分隔键值。然后写了一个简单的java程序来分离记录并将一条记录放在一行中。【参考方案2】:在 scala 中运行的版本:
def chunkFile(file: String) : List[(String, String, String, String, String, String, String, String, String, String, String)] =
import scala.collection.mutable.ListBuffer
val rowSize = 84
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String, String, String)]
for(i <- 0 to (file.length / rowSize)-1)
val row = file.substring(i * rowSize, (i+ 1) * rowSize)
val items = (row.substring(0, 1),row.substring(1, 9),row.substring(9, 24),row.substring(24, 35), row.substring(35, 36),row.substring(36, 47), row.substring(47, 50),row.substring(50, 53),row.substring(54, 55),row.substring(55, 80),row.substring(80, 84))
list += items
list.toList
val file = sc.wholeTextFiles("C:/git/files/newline-as-data.txt")
chunkFile(file.collect.map(f => f._2).head).toDF.show
老实说,我会预处理文件并删除换行符,然后您可以将其视为正常加载,而不是像这样需要将整个文件读入内存的东西
【讨论】:
以上是关于将带有换行符的固定长度文本文件作为属性值之一读入 JavaRDD的主要内容,如果未能解决你的问题,请参考以下文章
如何将带有文本信息的 1.3 GB csv 文件读入 Python 的 pandas 对象?