从 RDD 到联合数据帧 PySpark

Posted

技术标签:

【中文标题】从 RDD 到联合数据帧 PySpark【英文标题】:From RDDs to jointed DataFrames PySpark 【发布时间】:2016-10-14 16:51:38 【问题描述】:

我正在寻找一种按键组合两个 DataFrame 的方法。 我首先从 rdds 创建数据框:

给定:

x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
                ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
               ]
              )

y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
                ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
                ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'),
               ]
              )

我的代码:

df_x = x.toDF(['id', 'countrycode', 'postalcode'])
df_y = y.toDF(['id_gigya', 'krux'])

df = df_x.join(df_y, df_x.id == df_y.id_gigya, 'fullouter')

给出:

[Row(id=u'_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', countrycode=u'TN', postalcode=u'8160', id_gigya=None, krux=None),
 Row(id=None, countrycode=None, postalcode=None, id_gigya=u'_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', krux=u'JmJCFu3N'),
 Row(id=None, countrycode=None, postalcode=None, id_gigya=u'_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', krux=u'KNPQLQth'),
 Row(id=u'_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', countrycode=u'FR', postalcode=u'75001', id_gigya=u'_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', krux=u'KlGZj08d')]

这是完美的,但我想要一个唯一的 id,“id_gigya”或“id”,因为它是相同的 id!

与:

df_x.join(df_y, df_x.id == df_y.id_gigya, 'fullouter').drop(df_y.id_gigya).collect()

Or

df_x.join(df_y, df_x.id == df_y.id_gigya, 'fullouter').drop(df_x.id).collect()

我明白了:

[Row(id=u'_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', countrycode=u'TN', postalcode=u'8160', krux=None),
 Row(id=None, countrycode=None, postalcode=None, krux=u'JmJCFu3N'),
 Row(id=None, countrycode=None, postalcode=None, krux=u'KNPQLQth'),
 Row(id=u'_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', countrycode=u'FR', postalcode=u'75001', krux=u'KlGZj08d')]

[Row(countrycode=u'TN', postalcode=u'8160', id_gigya=None, krux=None),
 Row(countrycode=None, postalcode=None, id_gigya=u'_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', krux=u'JmJCFu3N'),
 Row(countrycode=None, postalcode=None, id_gigya=u'_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', krux=u'KNPQLQth'),
 Row(countrycode=u'FR', postalcode=u'75001', id_gigya=u'_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', krux=u'KlGZj08d')]

无论如何,我的目标是按行拥有一个 id.. 想法?谢谢!

【问题讨论】:

如果提供的答案解决了您的问题,请接受它以关闭问题,否则请在答案下方评论为什么它确实解决了您的问题! 【参考方案1】:

一旦你有了加入的数据集,你可以运行另一个select 来输出特定的列,然后转换为 rdd,映射它以只获取非空 ID:

df.select('id','id_gigya','countrycode','postalcode')\
  .rdd\
  .map(lambda x: Row(id=(x.id if x.id_gigya == None else x.id_gigya), postalcode=x.postalcode, countrycode=x.countrycode))\
  .collect()

哪个输出:

[
  Row(countrycode=u'TN', id=u'_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', postalcode=u'8160'),
  Row(countrycode=None, id=u'_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', postalcode=None), 
  Row(countrycode=u'FR', id=u'_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', postalcode=u'75001'),
  Row(countrycode=None, id=u'_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', postalcode=None)
]

【讨论】:

以上是关于从 RDD 到联合数据帧 PySpark的主要内容,如果未能解决你的问题,请参考以下文章

如何从整数 RDD 创建火花数据帧

使用 pyspark 将作业粘合到联合数据帧

从来自 java 中 InputStream 的字符串创建 Spark RDD 或数据帧

Pyspark 将 rdd 转换为具有空值的数据帧

数据帧到 RDD[Row] 用空值替换空间

spark RDD 上的有序联合