PySpark - RDD 到 JSON
Posted
技术标签:
【中文标题】PySpark - RDD 到 JSON【英文标题】:PySpark - RDD to JSON 【发布时间】:2018-06-25 14:57:39 【问题描述】:我有一个 Hive 查询,它以这种格式返回数据:
ip, category, score
1.2.3.4, X, 5
10.10.10.10, A, 2
1.2.3.4, Y, 2
12.12.12.12, G, 10
1.2.3.4, Z, 9
10.10.10.10, X, 3
在 PySpark 中,我通过 hive_context.sql(my_query).rdd
获得此信息
每个 ip 地址可以有多个分数(因此有多个行)。我想以 json/array 格式获取这些数据,如下所示:
"ip": "1.2.3.4",
"scores": [
"category": "X",
"score": 10
,
"category": "Y",
"score": 2
,
"category": "Z",
"score": 9
,
],
"ip": "10.10.10.10",
"scores": [
"category": "A",
"score": 2
,
"category": "X",
"score": 3
,
],
"ip": "12.12.12.12",
"scores": [
"category": "G",
"score": 10
,
],
请注意,RDD 不一定是排序的,RDD 很容易包含几亿行。我是 PySpark 的新手,所以任何关于如何有效进行此操作的指示都会有所帮助。
【问题讨论】:
【参考方案1】:groupBy
ip
然后将分组的 RDD 转换为您需要的:
rdd.groupBy(lambda r: r.ip).map(
lambda g:
'ip': g[0],
'scores': ['category': x['category'], 'score': x['score'] for x in g[1]]
).collect()
# ['ip': '1.2.3.4', 'scores': ['category': 'X', 'score': 5, 'category': 'Y', 'score': 2, 'category': 'Z', 'score': 9], 'ip': '12.12.12.12', 'scores': ['category': 'G', 'score': 10], 'ip': '10.10.10.10', 'scores': ['category': 'A', 'score': 2, 'category': 'X', 'score': 3]]
【讨论】:
以上是关于PySpark - RDD 到 JSON的主要内容,如果未能解决你的问题,请参考以下文章
pyspark中的RDD到DataFrame(来自rdd的第一个元素的列)