在 Spark 中合并 Row()
Posted
技术标签:
【中文标题】在 Spark 中合并 Row()【英文标题】:Combine Row()'s in Spark 【发布时间】:2015-04-10 17:59:36 【问题描述】:看似简单的问题,却找不到答案。
问题:我创建了一个函数,我将传递给 map(),它接受一个字段并从中创建三个字段。我希望 map() 的输出给我一个新的 RDD,包括来自输入 RDD 和新/输出 RDD 的字段。我该怎么做呢?
我是否需要将我的数据的键添加到函数的输出中,以便我可以将更多的输出 RDD 加入到我的原始 RDD 中?这是正确/最佳做法吗?
def extract_fund_code_from_iv_id(holding):
# Must include key of data for later joining
iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
return iv_id
更基本的是,我似乎无法将两个 Row 组合起来。
row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2
这不会像我想要的那样返回一个新的 Row()。
谢谢
【问题讨论】:
print
当然会输出对象的字符串表示,而不是创建新的Row
实例。至于你的第一个问题,你可能想看看 Spark 的 DataFrame API (spark.apache.org/docs/1.3.0/api/python/…),也许你会发现用户定义函数 iteresting
感谢您的回复。我使用了用户定义的函数选项,这些选项非常强大,但无法完成我需要的操作,这迫使我使用 RDD。我没有看到任何关于如何组合 Row RDD 的选项...
问题是,当我在 DataFrame 上运行 .map() 方法时,它会返回一个新的 RDD,但我需要它用旧列返回新的 RDD,我不希望编写一些丑陋的代码来发现 RDD 中的列名是什么,然后编写它。有没有更好的办法?
【参考方案1】:
我真的推荐使用UserDefinedFunction
。
假设您想从 DataFrame df
的 int
类型的列 int_col
中提取多个特征。假设这些功能只是所述列内容的modulo 3
和modulo 2
。
我们将导入UserDefinedFunction
和我们函数的数据类型。
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
然后我们将实现我们的特征提取函数:
def modulo_three(col):
return int(col) % 3
def modulo_two(col):
return int(col) % 2
并将它们变成udf
s:
mod3 = udf(modulo_three, IntegerType())
mod2 = udf(modulo_two, IntegerType())
现在我们将计算所有额外的列并给它们起好听的名字(通过alias
):
new_columns = [
mod3(df['int_col']).alias('mod3'),
mod2(df['int_col']).alias('mod2'),
]
最后我们选择这些列加上之前已经存在的所有列:
new_df = df.select(*df.columns+new_columns)
new_df
现在将有两个额外的列 mod3
和 mod2
。
【讨论】:
非常有帮助。非常感谢您抽出宝贵时间!以上是关于在 Spark 中合并 Row()的主要内容,如果未能解决你的问题,请参考以下文章
当查询从文件中合并时,row_produced 计数在雪花 query_history 视图中代表啥