我将如何在 Dask 中进行 Spark 爆炸?

Posted

技术标签:

【中文标题】我将如何在 Dask 中进行 Spark 爆炸?【英文标题】:How would I do a Spark explode in Dask? 【发布时间】:2017-09-11 18:05:20 【问题描述】:

我是新手,所以请耐心等待。

我有一个 JSON 文件,其中每一行都有以下架构:


 'id': 2,
 'version': 7.3,
 'participants': range(10)

participants 是一个嵌套字段。

input_file = 'data.json'   
df = db.read_text(input_file).map(json.loads)

我可以:df.pluck(['id', 'version'])df.pluck('participants').flatten()

但我怎样才能做相当于 Spark 爆炸的操作,我可以同时选择 idversion 并展平 participants

所以输出将是:

'id': 2, 'version': 7.3, 'participants': 0
'id': 2, 'version': 7.3, 'participants': 1
'id': 2, 'version': 7.3, 'participants': 2
'id': 2, 'version': 7.3, 'participants': 3
...

【问题讨论】:

为了完整起见,您能否输入您期望的输出,因为不是每个人都熟悉explode 你是对的@mdurant!我已经编辑了帖子 关于这个问题的任何更新?很想知道是否可以以这种方式使用 flatten 【参考方案1】:

可以编写一个自定义函数来读取和转换带有dask.bag.from_sequence的文件行

def mapper(row, denest_field):
    js = json.loads(row)
    for v in js[denest_field]:
        yield 'id': js['id'], denest_field: v, 'version': js['version']


def yield_unnested(fname, denest_field):
    with open (fname) as f:
        for row in f:
            yield from mapper(row, denest_field)

我保存了一个名为 'data.json' 的文件,其内容如下

"id": 2, "version": 7.3, "participants": [0,1,2,3,4,5,6,7,9,9]

然后用from_sequence阅读

df = db.from_sequence(yield_unnested('data.json', 'participants'))
list(df) # outputs:

['id': 2, 'participants': 0, 'version': 7.3,
 'id': 2, 'participants': 1, 'version': 7.3,
 'id': 2, 'participants': 2, 'version': 7.3,
 'id': 2, 'participants': 3, 'version': 7.3,
 'id': 2, 'participants': 4, 'version': 7.3,
 'id': 2, 'participants': 5, 'version': 7.3,
 'id': 2, 'participants': 6, 'version': 7.3,
 'id': 2, 'participants': 7, 'version': 7.3,
 'id': 2, 'participants': 9, 'version': 7.3,
 'id': 2, 'participants': 9, 'version': 7.3]

请注意,我是 dask 新手,这可能不是最有效的处理方式。

【讨论】:

以上是关于我将如何在 Dask 中进行 Spark 爆炸?的主要内容,如果未能解决你的问题,请参考以下文章

dask 如何处理大于内存的数据集

Dask核心功能介绍及与Spark的比较

Dask核心功能介绍及与Spark的比较

如何使用 Scala 在 Spark 中爆炸嵌套结构

使用 Python 将 Dask 数据帧转换为 Spark 数据帧

Spark 结构化流/Spark SQL 中的条件爆炸