S3 中 spark.catalog.refreshTable(tablename) 的使用
Posted
技术标签:
【中文标题】S3 中 spark.catalog.refreshTable(tablename) 的使用【英文标题】:Usage of spark.catalog.refreshTable(tablename) in S3 【发布时间】:2020-07-20 03:22:48 【问题描述】:我想在使用函数转换我的 Spark 数据后编写一个 CSV 文件。转换后得到的Spark dataframe看起来不错,但是当我想将其写入CSV文件时,出现错误:
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
但我真的不明白如何使用spark.catalog.refreshTable(tablename)
函数。我尝试在转换和文件写入之间使用它,但它说
AttributeError: 'DataFrame' object has no attribute '_get_object_id'
所以我不知道如何处理它......
#Create the function to resize the images and extract the features with mobilenetV2 model
def red_dim(width, height, nChannels, data):
#Transform image data to tensorflow compatoble format
images = []
for i in range(height.shape[0]):
x = np.ndarray(
shape=(height[i], width[i], nChannels[i]),
dtype=np.uint8,
buffer=data[i],
strides=(width[i] * nChannels[i], nChannels[i], 1))
images.append(preprocess_input(x))
#Resize images with the chosen size of the model
images = np.array(tf.image.resize(images, [IMAGE_SIZE, IMAGE_SIZE]))
#Load the model
model = load_model('models')
#Predict features for images
preds = model.predict(images).reshape(len(width), 3 * 3 * 1280)
#Return a pandas series with list of features for all images
return pd.Series(list(preds))
#Transform the function to a pandas udf function
#This allow to split the function in multiple chunks
red_dim_udf = pandas_udf(red_dim, returnType=ArrayType(DoubleType()))
#4 actions :
# apply the udf function defined just before
# cast the array of features to a string so it can be written in a csv
# select only the data that will be witten in the csv
# write the data -> where the error occurs
results=df.withColumn("dim_red", red_dim_udf(col("image.width"), col("image.height"), \
col("image.nChannels"), \
col("image.data"))) \
.withColumn("dim_red_string", lit(col("dim_red").cast("string")))
.select("image.origin", 'dim_red_string')
.repartition(5).write.csv(S3dir + '/results' + today)
【问题讨论】:
【参考方案1】:这是一个众所周知的问题,即底层源数据正在更新,而 spark 正在对其进行处理。
我建议您在应用转换之前检查点,即将数据移动/复制到另一个目录。
【讨论】:
您好,感谢您的帮助!所以,如果我理解得很好,在red_dim_udf = pandas_udf(red_dim, returnType=ArrayType(DoubleType()))
之后,我将 red_dim_udf 移动到我的 S3 存储桶的另一个文件夹,然后,我使用这个新文件夹中的变量作为以下行的条目?
(只是添加一些信息:我只是不明白如何在转换之前移动我的数据框,因为“df”只是一个包含我所有 Spark 格式图像的 Spark 数据框,“结果”是数据框只是包含图像的来源(作为'df')和从函数计算的特征(所以在转换之后)......我没有其他的。如何更改'df'的目录(如果我理解得很好) 因为这个数据帧只存在于 SparkContext 的内存中(我认为)?
只需使用简单的 FS 复制或使用 spark 读取和写入另一个目录【参考方案2】:
我想我可以结束我的问题,因为我找到了答案
如果您遇到此类错误,也可能是因为您用于制作 Dataframe 的 S3 文件夹中有空间,而 Spark 无法识别文件夹中的空格字符,因此认为该文件夹不存在没有了……
但感谢@Constantine 的帮助!
【讨论】:
酷.. 很高兴知道此火花错误还有其他原因以上是关于S3 中 spark.catalog.refreshTable(tablename) 的使用的主要内容,如果未能解决你的问题,请参考以下文章
用于 S3 中 PARQUET 格式的 Kafka S3 源连接器
S3 选择定价如何运作? s3 select中返回和扫描的数据是啥意思
指定自定义值以在 Zapier 集成 Gmail 到 S3 中构建 S3 密钥
如何将 Kinesis 流存储到 S3 存储桶中特定文件夹结构中的 S3 存储