将带有换行符的固定长度文本文件作为属性值之一读入 JavaRDD

Posted

技术标签:

【中文标题】将带有换行符的固定长度文本文件作为属性值之一读入 JavaRDD【英文标题】:Read a textfile of fixed length with newline as one of attribute value into a JavaRDD 【发布时间】:2019-07-31 15:47:47 【问题描述】:

我有一个宽度为 100 字节的文本文件。以下是结构。我需要读取 JavaRDD 中的数据。

RecType - String 1 Byte
Date    - String 8 byte
Productnumber - String 15 byte
TAG           - String 11 byte
Filler1       - String 1 byte
Contract      - String 11 byte
Code          - String 3 byte
Version       - String 3 byte
newline       - String 1 byte
FILENAME      -String  25 byte
Recnumber     - String  4 byte

文件中的样本数据

020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00001020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00002020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00003020190718000000000000002CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019051AM00004

如果您注意到每条记录都从一行开始并在下一行结束。从下一个字节开始,下一条记录开始。文件中有4条记录,以字符串020190718开头。

您能帮我如何读取 JavaRDD 中的记录吗?

我在尝试

JavaRDD1 = SparkUtils.getSession().read().textFile(filepath)
         javaRDD()
         map(x -> return FunctiontoParse(x);); 

但它一次只考虑一行,而不是读取整条记录。

请帮忙。

【问题讨论】:

【参考方案1】:

你可能想see this post. 使用wholeTextFile() 将工作如果一切都很好作为一个字符串。如果您希望它保持二进制,那么您需要将其读取为二进制。我改用了JavaSparkContext.binaryFiles(filepath,numPartitions)。这会将整个文件读取为字节,并让您根据需要对其进行解析。

JavaSparkContext jsc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
//from here each file gets on record in the resulting RDD.  Each Record is a filename, file_contents pair.  Each record has the contents of an entire file.
JavaPairRDD<String, PortableDataStream> rawBinaryInputFiles = jsc.binaryFiles(HDFSinputFolder,numPartitions);
//now to use your function to parse each file.  Keep in mind, each record has the contents of an entire file, 
//you will need to parse out each record.  But since it's fixed width by bytes, it should be pretty simple.  
//Create a custom wrapper object to hold the values and populate.

JavaRDD<YourCustomWrapperObject> records =  rawBinaryInputFiles.flatMap(new FlatMapFunction<Tuple2<String,PortableDataStream>, YourCustomWrapperObject>() 

    @Override
    public Iterator<YourCustomWrapperObject> call(Tuple2<String, PortableDataStream> t) throws Exception 
        List<YourCustomWrapperObject> results = new ArrayList<YourCustomWrapperObject>();
        byte[] bytes = t._2().toArray(); //convert PortableDataStream to byte array.
        //best option here IMO is to create a wrapper object, populate it from the byte array and return it
        YourCustomWrapperObject obj = new YourCustomWrapperObject();
        //populate....
        results.add(obj);
        return results;
    
);

【讨论】:

我对此很陌生。您能否提供一些示例代码。 rawBinaryInputFiles 无法解析数据并且未按预期返回。我使用 readwholefiles 来分隔键值。然后写了一个简单的java程序来分离记录并将一条记录放在一行中。【参考方案2】:

在 scala 中运行的版本:



def chunkFile(file: String) : List[(String, String, String, String, String, String, String, String, String, String, String)] = 

    import scala.collection.mutable.ListBuffer

    val rowSize = 84
    val list = new ListBuffer[(String, String, String, String, String, String, String, String, String, String, String)]
    for(i <- 0 to (file.length / rowSize)-1)
        val row = file.substring(i * rowSize, (i+ 1) * rowSize)
        val items = (row.substring(0, 1),row.substring(1, 9),row.substring(9, 24),row.substring(24, 35), row.substring(35, 36),row.substring(36, 47), row.substring(47, 50),row.substring(50, 53),row.substring(54, 55),row.substring(55, 80),row.substring(80, 84))
        list += items
    

    list.toList


val file = sc.wholeTextFiles("C:/git/files/newline-as-data.txt")
chunkFile(file.collect.map(f => f._2).head).toDF.show

老实说,我会预处理文件并删除换行符,然后您可以将其视为正常加载,而不是像这样需要将整个文件读入内存的东西

【讨论】:

以上是关于将带有换行符的固定长度文本文件作为属性值之一读入 JavaRDD的主要内容,如果未能解决你的问题,请参考以下文章

关于c++文件流读入和写入的问题

将文本附加到带有换行符的文本区域

我应该使用哪个函数将非结构化文本文件读入 R? [关闭]

如何将带有文本信息的 1.3 GB csv 文件读入 Python 的 pandas 对象?

JS有啥好的处理字符串固定位置加入<br />,就是将一段文本实行自动换行

如何让带有换行符的文本在div中也能换行显示