如何在 Python 中的 Spark Dataframe 上应用任何类型的地图转换

Posted

技术标签:

【中文标题】如何在 Python 中的 Spark Dataframe 上应用任何类型的地图转换【英文标题】:How to apply any sort of Map Transformation on Spark Dataframe in Python 【发布时间】:2021-01-27 09:14:54 【问题描述】:

我使用的是 Spark Structure Streaming,代码如下:

 def convert_timestamp_to_datetime(timestamp):
    return datetime.fromtimestamp(timestamp)


 def extract():
       spark = SparkSession \
         .builder \
         .appName("StructuredNetworkWordCount") \
         .getOrCreate()

    json_schema = \
         StructType() \
        .add(StructField("TIMESTAMP", FloatType(), True)) \
        .add(StructField("index", IntegerType(), True)) \
        .add(StructField("CUSTOMER_ID", StringType(), True)) \
        .add(StructField("CODE_ID", StringType(), True)) \
        .add(StructField("PROCESS", StringType(), True))

     my_df = spark \
         .readStream \
         .format("kafka") \
         .option("kafka.bootstrap.servers", "localhost:9092") \
         .option("subscribe", "simple_json_12_10trx") \
         .option("startingOffsets", "earliest") \
         .load()
     my_df = my_df.select(from_json(col('value').cast('string'), json_schema).alias("json"))
convert_timestamp_datetime_udf = udf(lambda x: convert_timestamp_to_datetime(x), TimestampType())
      return my_df.select('json.*', convert_timestamp_datetime_udf('json.TIMESTAMP').alias('DATETIME'))
  
 def transform_load(my_df, epoch_id):
       update_obj = my_df.groupBy('CUSTOMER_ID').agg(F.count('CUSTOMER_ID').alias('count_t'),F.collect_set('CODE_ID').alias('unique_CODE'))
update_obj.show()
update(update_obj)


 if __name__ == '__main__':
     start = time.time()
     df = extract()
     query = df.writeStream \
        .outputMode('append')\
        .foreachBatch(transform_load)\
        .start() \
        .awaitTermination()

我想访问分布式 Spark Dataframe 的每一行。所以,我必须使用地图转换。我只是添加了这个简单的代码来测试 Spark Map。但是,我在控制台中没有收到任何输出。事实上,func 并没有运行。

 def func(df):
      df.take(3)

 def update(df):
      df.rdd.map(func,preservesPartitioning=False)

请您指导我这里出了什么问题?

非常感谢。

【问题讨论】:

【参考方案1】:

问题解决了。

我忘记在地图后使用action。另外,我不能使用df.take(3),因为func 中没有任何数据框,它是rdd,它没有take 属性。我像这样更改代码:

 def func(x):
    print(x.CUSTOMER_ID)

 def update(df):
    df.rdd.map(func,preservesPartitioning=False).count()

count() 是我用来查看地图结果的操作。

【讨论】:

以上是关于如何在 Python 中的 Spark Dataframe 上应用任何类型的地图转换的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区

如何在 HDP 中的 zeppelin-spark2 中将库安装到 python

Python 如何与 Spark 中的 JVM 交互

如何在 Python 中的 Spark Dataframe 上应用任何类型的地图转换

如何使用 Python 对 Spark 中的 LIBSVM 文件进行特征选择和缩减?

如何在 python 中消除 apache spark 数据帧中的标头和尾标