从 foreach 内部调用 Pyspark 保存不起作用

Posted

技术标签:

【中文标题】从 foreach 内部调用 Pyspark 保存不起作用【英文标题】:Pyspark saving is not working when called from inside a foreach 【发布时间】:2019-06-27 03:08:56 【问题描述】:

我正在构建一个从 Azure EventHub 接收消息并保存到数据块增量表中的管道。

我所有的静态数据测试都很顺利,请看下面的代码:

body = 'A|B|C|D\n"False"|"253435564"|"14"|"2019-06-25 04:56:21.713"\n"True"|"253435564"|"13"|"2019-06-25 04:56:21.713"\n"
tableLocation = "/delta/tables/myTableName"

spark = SparkSession.builder.appName("CSV converter").getOrCreate()    
csvData = spark.sparkContext.parallelize(body.split('\n'))

df = spark.read \
.option("header", True) \
.option("delimiter","|") \
.option("quote", "\"") \
.option("nullValue", "\\N") \
.option("inferShema", "true") \
.option("mergeSchema", "true") \
.csv(csvData)

df.write.format("delta").mode("append").save(tableLocation)

但是在我的例子中,每个 eventthub 消息都是一个 CSV 字符串,它们可能来自许多来源。所以每条消息都必须单独处理,因为每条消息最终都可能保存在不同的增量表中。

当我尝试在 foreach 语句中执行相同的代码时,它不起作用。日志中没有显示错误,我找不到任何保存的表。

所以也许我在调用 foreach 时做错了什么。请看下面的代码:

def SaveData(row):
   ...
   The same code above

dfEventHubCSV.rdd.foreach(SaveData)

我尝试在流式上下文中执行此操作,但很遗憾我遇到了同样的问题。

foreach 中的什么使其行为不同?

下面是我正在运行的完整代码:

import pyspark.sql.types as t
from pyspark.sql import SQLContext

--row contains the fields Body and SdIds
--Body: CSV string
--SdIds: A string ID 
def SaveData(row):

  --Each row data that is going to be added to different tables
  rowInfo = GetDestinationTableData(row['SdIds']).collect()  
  table = rowInfo[0][4]
  schema = rowInfo[0][3]
  database = rowInfo[0][2]     
  body = row['Body']

  tableLocation = "/delta/" + database + '/' + schema + '/' + table
  checkpointLocation = "/delta/" + database + '/' + schema + "/_checkpoints/" + table

  spark = SparkSession.builder.appName("CSV").getOrCreate()    
  csvData = spark.sparkContext.parallelize(body.split('\n'))

  df = spark.read \
  .option("header", True) \
  .option("delimiter","|") \
  .option("quote", "\"") \
  .option("nullValue", "\\N") \
  .option("inferShema", "true") \
  .option("mergeSchema", "true") \
  .csv(csvData)

  df.write.format("delta").mode("append").save(tableLocation)

dfEventHubCSV.rdd.foreach(SaveData)

【问题讨论】:

您好 Flavio,如果您能发布确切的实现将会非常有帮助 @AlexandrosBiratsis 完整代码太大,无法在此处发布,但我会编辑帖子并为您提供此功能的所有代码 【参考方案1】:

好吧,总而言之,它一如既往地非常简单,但我没有看到这一点。

基本上,当您执行 foreach 并且您要保存的数据框构建在循环内时。 worker 与驱动不同,在保存时不会自动设置“/dbfs/”路径,所以如果你不手动添加“/dbfs/”,它会将数据本地保存在worker中,你永远不会找到保存的数据。

这就是我的循环不起作用的原因。

【讨论】:

以上是关于从 foreach 内部调用 Pyspark 保存不起作用的主要内容,如果未能解决你的问题,请参考以下文章

如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

如何从 pyspark 数据框中更快地保存 csv 文件?

从 Pyspark 在 HDFS 中保存文件

如何从函数内部的 .forEach 内部完全退出

从目录读取镶木地板文件时,pyspark不保存

在 pyspark 中使用 foreach()