我如何让 pandas 使用 spark 集群

Posted

技术标签:

【中文标题】我如何让 pandas 使用 spark 集群【英文标题】:how do i let pandas working with spark cluster 【发布时间】:2019-01-30 08:10:48 【问题描述】:

pandas 的主要问题是它无法处理大型操作数据,大量 CSV 文件内存不足,现在我切换到 Hadoop 中的 pyspark 1.6,我尝试使用 dask.dataframe 但问题仍然存在,有没有为什么让 pandas 与 Hadoop 集群或 pyspark 集群一起使用我想在 pandas 中使用此功能

import pandas as pd
df = pd.read_csv('text1.txt',names =['DATE','IMSI','WEBSITE','LINKUP','LINKDOWN','COUNT','CONNECTION'])
df.columns.str.strip()
df.DATE = pd.to_datetime(df.DATE)
group = df.groupby(['IMSI','WEBSITE']).agg('DATE':[min,max,'count']
    ,'LINKUP':'sum'
    , 'LINKDOWN':'sum'
    , 'COUNT':'max'
    ,'CONNECTION':'sum'
            )
group.to_csv('finalinfo.txt', index = True, header = False)

【问题讨论】:

这不是它的工作方式。 Pandas 不会使用 Spark 作为其引擎。您可以使用 Spark 处理您的大文件,将其减少到可以容纳在一台机器上的体积,然后将其转换为 Pandas 数据框以从那里继续。 你能给我一个可以兼容我的代码的例子吗@ernest_k 【参考方案1】:

从 HDFS 读取数据,聚合并发送回 pandas。下面的示例使用 inferSchema 根据数据获取列名和类型,但如果您的文件没有标题或者您不喜欢它推断的类型,您可以提供自己的模式。 InferSchema 需要额外的数据传递,因此根据数据大小,您可能需要提供自己的架构:

from pyspark.sql import functions as f

df = spark.read.csv('/hdfs/path/to/text1.txt', header=1, inferSchema=True, sep=';') 
df = df.groupBy('IMSI','WEBSITE').agg(f.min('DATE').alias('min of date'),
                                      f.max('DATE').alias('max of date'),
                                      f.count('DATE').alias('count of date'),
                                      f.sum('LINKUP').alias('sum of linkup'),
                                      f.sum('LINKDOWN').alias('sum of linkdown'),
                                      f.count('COUNT').alias('count of count'),
                                      f.sum('CONNECTION').alias('sum of connection'))
pandasDF = df.toPandas()

或者,如果文件对于 pandas 来说仍然很大,您可以使用 spark 保存到 csv。请注意,您无法控制输出文件的名称 - 您只需指定将创建并存储输出的目录位置,文件名将遵循 spark 临时文件命名约定:

df.coalesce(1).write.csv('/hdfs/path/to/output/directory', header=True)

coalesce(1) 用于获取单个文件作为输出,因为 spark 将创建等于分区的文件数(默认为 200 iirc)。为此,未分区的文件必须适合单个工作人员的内存。它仍然太大,不要使用合并。 Spark 会将其保存在多个文件中,然后您可以使用HDFS getmerge 来加入文件。

【讨论】:

以上是关于我如何让 pandas 使用 spark 集群的主要内容,如果未能解决你的问题,请参考以下文章

Zeppelin+Spark+Kubernetes:让 Zeppelin Job 在现有的 Spark 集群上运行

如何让 Zeppelin 在 EMR 集群上干净地重新启动?

Spark 独立集群如何在工作节点上管理多个执行程序?

如何将 Cassandra 设置为我的 Spark 集群的分布式存储(文件系统)

如何在 Windows 机器上设置 Spark 集群?

如何将 Spark EMR 集群与 AWS elasticsearch 集群连接起来