S3 中 spark.catalog.refreshTable(tablename) 的使用

Posted

技术标签:

【中文标题】S3 中 spark.catalog.refreshTable(tablename) 的使用【英文标题】:Usage of spark.catalog.refreshTable(tablename) in S3 【发布时间】:2020-07-20 03:22:48 【问题描述】:

我想在使用函数转换我的 Spark 数据后编写一个 CSV 文件。转换后得到的Spark dataframe看起来不错,但是当我想将其写入CSV文件时,出现错误:

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

但我真的不明白如何使用spark.catalog.refreshTable(tablename) 函数。我尝试在转换和文件写入之间使用它,但它说

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

所以我不知道如何处理它......

#Create the function to resize the images and extract the features with mobilenetV2 model
def red_dim(width, height, nChannels, data):
    #Transform image data to tensorflow compatoble format
    images = []
    for i in range(height.shape[0]):
        x = np.ndarray(
                shape=(height[i], width[i], nChannels[i]),
                dtype=np.uint8,
                buffer=data[i],
                strides=(width[i] * nChannels[i], nChannels[i], 1))
        images.append(preprocess_input(x))
    #Resize images with the chosen size of the model
    images = np.array(tf.image.resize(images, [IMAGE_SIZE, IMAGE_SIZE]))

    #Load the model
    model = load_model('models')
    
    #Predict features for images
    preds = model.predict(images).reshape(len(width), 3 * 3 * 1280)
    
    #Return a pandas series with list of features for all images 
    return pd.Series(list(preds))

#Transform the function to a pandas udf function
#This allow to split the function in multiple chunks
red_dim_udf = pandas_udf(red_dim, returnType=ArrayType(DoubleType()))

#4 actions : 
#   apply the udf function defined just before
#   cast the array of features to a string so it can be written in a csv
#   select only the data that will be witten in the csv
#   write the data -> where the error occurs
results=df.withColumn("dim_red", red_dim_udf(col("image.width"), col("image.height"), \
                                             col("image.nChannels"), \
                                             col("image.data"))) \
          .withColumn("dim_red_string", lit(col("dim_red").cast("string")))
          .select("image.origin", 'dim_red_string')
          .repartition(5).write.csv(S3dir + '/results' + today)

【问题讨论】:

【参考方案1】:

这是一个众所周知的问题,即底层源数据正在更新,而 spark 正在对其进行处理。

我建议您在应用转换之前检查点,即将数据移动/复制到另一个目录。

【讨论】:

您好,感谢您的帮助!所以,如果我理解得很好,在red_dim_udf = pandas_udf(red_dim, returnType=ArrayType(DoubleType())) 之后,我将 red_dim_udf 移动到我的 S3 存储桶的另一个文件夹,然后,我使用这个新文件夹中的变量作为以下行的条目? (只是添加一些信息:我只是不明白如何在转换之前移动我的数据框,因为“df”只是一个包含我所有 Spark 格式图像的 Spark 数据框,“结果”是数据框只是包含图像的来源(作为'df')和从函数计算的特征(所以在转换之后)......我没有其他的。如何更改'df'的目录(如果我理解得很好) 因为这个数据帧只存在于 SparkContext 的内存中(我认为)? 只需使用简单的 FS 复制或使用 spark 读取和写入另一个目录【参考方案2】:

我想我可以结束我的问题,因为我找到了答案

如果您遇到此类错误,也可能是因为您用于制作 Dataframe 的 S3 文件夹中有空间,而 Spark 无法识别文件夹中的空格字符,因此认为该文件夹不存在没有了……

但感谢@Constantine 的帮助!

【讨论】:

酷.. 很高兴知道此火花错误还有其他原因

以上是关于S3 中 spark.catalog.refreshTable(tablename) 的使用的主要内容,如果未能解决你的问题,请参考以下文章

用于 S3 中 PARQUET 格式的 Kafka S3 源连接器

S3 选择定价如何运作? s3 select中返回和扫描的数据是啥意思

指定自定义值以在 Zapier 集成 Gmail 到 S3 中构建 S3 密钥

如何将 Kinesis 流存储到 S3 存储桶中特定文件夹结构中的 S3 存储

在S3中使用S3.Object()。put()在boto3 1.5.36中存储matplotlib图像

使用“aws s3”实用程序在 S3 中获取 1 个月以前的文件列表