在单个 Spark 提交作业中分别处理多个文件

Posted

技术标签:

【中文标题】在单个 Spark 提交作业中分别处理多个文件【英文标题】:Processing multiple files separately in single spark submit job 【发布时间】:2017-07-17 09:00:14 【问题描述】:

我有以下目录结构:

/数据/模型A

/数据/模型B

/数据/模型C ..

这些文件中的每一个都有格式(id、score)的数据,我必须分别为它们做以下操作-

1) 按分数分组并按降序排列分数(DF_1: score,count)

2) 从 DF_1 计算每个排序的分数组的累积频率(DF_2:score, count, cumFreq)

3) 从 DF_2 中选择介于 5-10 之间的累积频率 (DF_3: score, cumFreq)

4) 从 DF_3 中选择最低分数(DF_4: score)

5) 从文件中选择所有分数大于 DF_4 分数的 id 并保存

我可以通过将目录读取为 wholeTextFile 并为所有模型创建一个公共数据框,然后在模型上使用 group by 来做到这一点。

我想做-

val scores_file = sc.wholeTextFiles("/data/*/")
val scores = scores_file.map line => 
  //step 1
  //step 2
  //step 3 
  //step 4
  //step 5 : save as line._1
   

这将有助于分别处理每个文件,并避免分组。

【问题讨论】:

【参考方案1】:

假设您的模型是离散值并且您知道然后您可以将所有模型定义到一个列表中

val model = List("modelA", "modelB", "modelC", ... )

你可以有以下方法:

model.forEach( model => 
  val scoresPerModel = sc.textFile(model);
  scoresPerModel.map  line => 
    // business logic here
   
)

如果您在计算业务逻辑之前不知道模型,您必须使用 Hadoop 文件系统 API 读取并从那里提取模型。

private val fs = 
    val conf = new org.apache.hadoop.conf.Configuration()
    FileSystem.get(conf)
  
fs.listFiles(new Path(hdfsPath)) 

【讨论】:

以上是关于在单个 Spark 提交作业中分别处理多个文件的主要内容,如果未能解决你的问题,请参考以下文章

是否可以在单个变量(批处理文件)中分配多个值?如果没有,有没有办法以更有效的方式运行它?

如何在单个 Spark 作业中调用多个 writeStream 操作?

如何在单个 Spark 作业中摄取不同的 Spark 数据帧

spark结构化流作业如何处理流-静态DataFrame连接?

SparkR 作业处理依赖项

对于 YARN 中的单个队列,如何将 state=RUNNING 中的 spark 应用程序数量限制为 1?