在 Spark 中合并 Row()

Posted

技术标签:

【中文标题】在 Spark 中合并 Row()【英文标题】:Combine Row()'s in Spark 【发布时间】:2015-04-10 17:59:36 【问题描述】:

看似简单的问题,却找不到答案。

问题:我创建了一个函数,我将传递给 map(),它接受一个字段并从中创建三个字段。我希望 map() 的输出给我一个新的 RDD,包括来自输入 RDD 和新/输出 RDD 的字段。我该怎么做呢?

我是否需要将我的数据的键添加到函数的输出中,以便我可以将更多的输出 RDD 加入到我的原始 RDD 中?这是正确/最佳做法吗?

def extract_fund_code_from_iv_id(holding):
    # Must include key of data for later joining
    iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
    return iv_id

更基本的是,我似乎无法将两个 Row 组合起来。

row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2

这不会像我想要的那样返回一个新的 Row()。

谢谢

【问题讨论】:

print 当然会输出对象的字符串表示,而不是创建新的Row 实例。至于你的第一个问题,你可能想看看 Spark 的 DataFrame API (spark.apache.org/docs/1.3.0/api/python/…),也许你会发现用户定义函数 iteresting 感谢您的回复。我使用了用户定义的函数选项,这些选项非常强大,但无法完成我需要的操作,这迫使我使用 RDD。我没有看到任何关于如何组合 Row RDD 的选项... 问题是,当我在 DataFrame 上运行 .map() 方法时,它会返回一个新的 RDD,但我需要它用旧列返回新的 RDD,我不希望编写一些丑陋的代码来发现 RDD 中的列名是什么,然后编写它。有没有更好的办法? 【参考方案1】:

我真的推荐使用UserDefinedFunction

假设您想从 DataFrame dfint 类型的列 int_col 中提取多个特征。假设这些功能只是所述列内容的modulo 3modulo 2

我们将导入UserDefinedFunction 和我们函数的数据类型。

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

然后我们将实现我们的特征提取函数:

def modulo_three(col):
    return int(col) % 3

def modulo_two(col):
    return int(col) % 2

并将它们变成udfs:

mod3 = udf(modulo_three, IntegerType())
mod2 = udf(modulo_two, IntegerType())

现在我们将计算所有额外的列并给它们起好听的名字(通过alias):

new_columns = [
    mod3(df['int_col']).alias('mod3'),
    mod2(df['int_col']).alias('mod2'),
]

最后我们选择这些列加上之前已经存在的所有列:

new_df = df.select(*df.columns+new_columns)

new_df 现在将有两个额外的列 mod3mod2

【讨论】:

非常有帮助。非常感谢您抽出宝贵时间!

以上是关于在 Spark 中合并 Row()的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 中合并多行

当查询从文件中合并时,row_produced 计数在雪花 query_history 视图中代表啥

在 Spark 中合并数据框

spark 实现大表数据合并

无法在 Spark 中合并两个 CassandraJavaRDD<CassandraRow>

合并在Apache spark中具有不同列名的两个数据集