将内部文件转换为数据帧到另一个数据帧或 RDD

Posted

技术标签:

【中文标题】将内部文件转换为数据帧到另一个数据帧或 RDD【英文标题】:convert files inside into dataframe to another dataframe or RDD 【发布时间】:2019-09-03 22:49:02 【问题描述】:

我有一个每 5、10 或 20 分钟生成文件的进程。然后另一个进程列出绝对路径并每小时将它们保存在一个文件中。 结构如下

logan@Everis-PC  ~/Datasets/dev/path > cat path1
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_D200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_S200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_V200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_D200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_S200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_V200_20190809.DAT

我的代码如下

val pathFile = "/home/logan/Datasets/dev/path"

sc.wholeTextFiles(pathFile).collect.foreach 
       hdfspartition =>

       val a = sc.parallelize(Seq(hdfspartition._2)).toDF
       a.show(false)

     

但我得到了一个数据框,其中的数据位于一行中。

+--------------------------------------------------------------------------------+
|value                                                                           |
+--------------------------------------------------------------------------------+
|/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_D200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_S200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_V200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_D200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_S200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_V200_20190809.DAT
|
+------------------------------------------------------------------------------+

您好,我需要提取在“pathFile”中找到的文件的内容。 pathFile" 具有包含更多文件列表的文件。.DAT 文件 (/../../novum_dev/in/TasPo_20190801_001808_D200_20190809.DAT) 具有要分析的数据。 我试图将第一个 DF (wholeTextFiles) 转换为字符串数组,然后转换为由 (,) 分割的字符串

sc.wholeTextFiles(pathFile).collect.foreach 
   hdfspartition =>
  val fa = hdfspartition._2.split("\\r?\\n")   
   val fs = fa.mkString(",")    
    val cdr = sc.textFile(fs).map(line => line.split("|", -1))
    .map(x => Row.fromSeq(x))

【问题讨论】:

【参考方案1】:

你应该使用spark.read.format("text"):

import org.apache.spark.sql._

val spark = SparkSession.builder.getOrCreate();   
val pathFile = "/home/logan/Datasets/dev/path"
val dataset = spark.read.format("text").load(pathFile)

dataset.show()

【讨论】:

您好,我需要提取在“pathFile”中找到的文件的内容。 pathFile" 具有包含更多文件列表的文件。.DAT 文件 (/../../novum_dev/in/TasPo_20190801_001808_D200_20190809.DAT) 具有要分析的数据。我试图将第一个 DF (wholeTextFiles) 转换为字符串数组,然后由 (,) sc.wholeTextFiles(pathFile).collect.foreach hdfspartition => val fa = hdfspartition._2.split("\\r?\\n") val fs = fa.mkString 拆分为字符串(",") val cdr = sc.textFile(fs).map(line => line.split("|", -1)) .map(x => Row.fromSeq(x))

以上是关于将内部文件转换为数据帧到另一个数据帧或 RDD的主要内容,如果未能解决你的问题,请参考以下文章

如何将火花流输出转换为数据帧或存储在表中

如何用从一个数据帧到另一个数据帧的值替换字符串

将列表写入 pandas 数据帧到 csv,从 csv 读取数据帧并再次转换为列表而无需字符串

如何将csv文件转换为rdd

从长数据帧到宽数组的快速转换

将数据保存到HDFS的格式是什么?