如何使用给定的reduce函数基于pyspark中的字段合并多个JSON数据行

Posted

技术标签:

【中文标题】如何使用给定的reduce函数基于pyspark中的字段合并多个JSON数据行【英文标题】:How to merge multiple JSON data rows based on a field in pyspark with a given reduce function 【发布时间】:2016-08-10 22:56:50 【问题描述】:

如何使用下面的合并功能与 pyspark 合并 JSON 数据行,如下所示?

注意:假设这只是一个细节示例,我有 1000 行数据要合并。什么是最高效的解决方案?无论好坏,我都必须使用 pyspark。

输入:

data = [
    'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe',
    'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe',
    'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe',
    ...  More ...
]

期望的输出:

combined_result = [
    'name': 'Joe Schmoe': 'addresses': [('20080411204445', '100 Sunder Ct'), ('20040218165319', '100 Lee Ave')],
    'name': 'John Doe': 'addresses': [('20120309173318', '1818 Westminster')],
    ... More ...
]

合并功能:

def reduce_on_name(a, b):
    '''Combines two JSON data rows based on name'''
    merged = 
    if a['name'] == b['name']:
        addresses = (a['timestamp'], a['address']), (b['timestamp'], b['address'])
        merged['name'] = a['name']
        merged['addresses'] = addresses
    return merged

【问题讨论】:

感谢您的回复;加入是唯一的机制吗?什么时候使用其他东西有意义?我对火花完全陌生。数据框甚至可能不是正确使用的术语... 也许不是。所以你想要像*ByKey 这样的操作,名字是一个键? 这就是我的怀疑。我一直在研究 combineByKey 和 groupByKey,但不清楚它们如何与上面的代码匹配。 【参考方案1】:

我想应该是这样的:

sc.parallelize(data).groupBy(lambda x: x['name']).map(lambda t: 'name':t[0],'addresses':[(x['timestamp'], x['address']) for x in t[1]]).collect()

【讨论】:

有没有办法利用 python 函数实现可重用性? 其实,我可以做到这一点。我给你的答案竖起大拇指,因为我认为你确实回答了它。【参考方案2】:

好吧,使用 maxymoo 的示例,我将自己的可重用代码放在一起。这不是我想要的,但它让我更接近我想要解决这个特殊问题的方式:没有 lambdas 和可重用的代码。

#!/usr/bin/env pyspark
# -*- coding: utf-8 -*-
data = [
    'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe',
    'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe',
    'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe',
]


def combine(field):
    '''Returns a function which reduces on a specific field

    Args:
        field(str): data field to use for merging

    Returns:
        func: returns a function which supplies the data for the field
    '''

    def _reduce_this(data):
        '''Returns the field value using data'''
        return data[field]

    return _reduce_this


def aggregate(*fields):
    '''Merges data based on a list of fields

    Args:
        fields(list): a list of fields that should be used as a composite key

    Returns:
       func: a function which does the aggregation
    '''

    def _merge_this(iterable):
        name, iterable = iterable
        new_map = dict(name=name, window=dict(max=None, min=None))
        for data in iterable:
            for field, value in data.iteritems():
                if field in fields:
                    new_map[field] = value
                else:
                    new_map.setdefault(field, set()).add(value)
        return new_map

    return _merge_this

# sc provided by pyspark context
combined = sc.parallelize(data).groupBy(combine('name'))
reduced = combined.map(aggregate('name'))
output = reduced.collect()

【讨论】:

以上是关于如何使用给定的reduce函数基于pyspark中的字段合并多个JSON数据行的主要内容,如果未能解决你的问题,请参考以下文章

了解 PySpark Reduce()

pyspark中的多个MAP函数

基于另一列中的值的一列上的pyspark滞后函数

pyspark reduce键是一个元组值嵌套列表

pyspark如何检查给定的spark数据帧是不是已使用inferSchema = True创建

如何编写 pyspark map-reduce 来计算日期之前的事件数