PySpark - RDD 到 JSON

Posted

技术标签:

【中文标题】PySpark - RDD 到 JSON【英文标题】:PySpark - RDD to JSON 【发布时间】:2018-06-25 14:57:39 【问题描述】:

我有一个 Hive 查询,它以这种格式返回数据:

ip, category, score
1.2.3.4, X, 5
10.10.10.10, A, 2
1.2.3.4, Y, 2
12.12.12.12, G, 10
1.2.3.4, Z, 9
10.10.10.10, X, 3

在 PySpark 中,我通过 hive_context.sql(my_query).rdd 获得此信息

每个 ip 地址可以有多个分数(因此有多个行)。我想以 json/array 格式获取这些数据,如下所示:


    "ip": "1.2.3.4",
    "scores": [
        
            "category": "X",
             "score": 10
        ,
        
            "category": "Y",
             "score": 2
        ,
        
            "category": "Z",
             "score": 9
        ,
    ],
    "ip": "10.10.10.10",
    "scores": [
        
            "category": "A",
             "score": 2
        ,
        
            "category": "X",
             "score": 3
        ,
    ],
     "ip": "12.12.12.12",
    "scores": [
        
            "category": "G",
             "score": 10
        ,
    ],

请注意,RDD 不一定是排序的,RDD 很容易包含几亿行。我是 PySpark 的新手,所以任何关于如何有效进行此操作的指示都会有所帮助。

【问题讨论】:

【参考方案1】:

groupByip 然后将分组的 RDD 转换为您需要的:

rdd.groupBy(lambda r: r.ip).map(
  lambda g: 
    'ip': g[0], 
    'scores': ['category': x['category'], 'score': x['score'] for x in g[1]]
).collect()

# ['ip': '1.2.3.4', 'scores': ['category': 'X', 'score': 5, 'category': 'Y', 'score': 2, 'category': 'Z', 'score': 9], 'ip': '12.12.12.12', 'scores': ['category': 'G', 'score': 10], 'ip': '10.10.10.10', 'scores': ['category': 'A', 'score': 2, 'category': 'X', 'score': 3]]

【讨论】:

以上是关于PySpark - RDD 到 JSON的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Pyspark 将一个 rdd 映射到另一个?

pyspark中的RDD到DataFrame(来自rdd的第一个元素的列)

pyspark 行列表的 RDD 到 DataFrame

PySpark - ALS 输出中的 RDD 到 DataFrame

从 RDD 到联合数据帧 PySpark

PySpark 重新分区 RDD 元素