我如何让 pandas 使用 spark 集群
Posted
技术标签:
【中文标题】我如何让 pandas 使用 spark 集群【英文标题】:how do i let pandas working with spark cluster 【发布时间】:2019-01-30 08:10:48 【问题描述】:pandas 的主要问题是它无法处理大型操作数据,大量 CSV 文件内存不足,现在我切换到 Hadoop 中的 pyspark 1.6,我尝试使用 dask.dataframe 但问题仍然存在,有没有为什么让 pandas 与 Hadoop 集群或 pyspark 集群一起使用我想在 pandas 中使用此功能
import pandas as pd
df = pd.read_csv('text1.txt',names =['DATE','IMSI','WEBSITE','LINKUP','LINKDOWN','COUNT','CONNECTION'])
df.columns.str.strip()
df.DATE = pd.to_datetime(df.DATE)
group = df.groupby(['IMSI','WEBSITE']).agg('DATE':[min,max,'count']
,'LINKUP':'sum'
, 'LINKDOWN':'sum'
, 'COUNT':'max'
,'CONNECTION':'sum'
)
group.to_csv('finalinfo.txt', index = True, header = False)
【问题讨论】:
这不是它的工作方式。 Pandas 不会使用 Spark 作为其引擎。您可以使用 Spark 处理您的大文件,将其减少到可以容纳在一台机器上的体积,然后将其转换为 Pandas 数据框以从那里继续。 你能给我一个可以兼容我的代码的例子吗@ernest_k 【参考方案1】:从 HDFS 读取数据,聚合并发送回 pandas。下面的示例使用 inferSchema 根据数据获取列名和类型,但如果您的文件没有标题或者您不喜欢它推断的类型,您可以提供自己的模式。 InferSchema 需要额外的数据传递,因此根据数据大小,您可能需要提供自己的架构:
from pyspark.sql import functions as f
df = spark.read.csv('/hdfs/path/to/text1.txt', header=1, inferSchema=True, sep=';')
df = df.groupBy('IMSI','WEBSITE').agg(f.min('DATE').alias('min of date'),
f.max('DATE').alias('max of date'),
f.count('DATE').alias('count of date'),
f.sum('LINKUP').alias('sum of linkup'),
f.sum('LINKDOWN').alias('sum of linkdown'),
f.count('COUNT').alias('count of count'),
f.sum('CONNECTION').alias('sum of connection'))
pandasDF = df.toPandas()
或者,如果文件对于 pandas 来说仍然很大,您可以使用 spark 保存到 csv。请注意,您无法控制输出文件的名称 - 您只需指定将创建并存储输出的目录位置,文件名将遵循 spark 临时文件命名约定:
df.coalesce(1).write.csv('/hdfs/path/to/output/directory', header=True)
coalesce(1) 用于获取单个文件作为输出,因为 spark 将创建等于分区的文件数(默认为 200 iirc)。为此,未分区的文件必须适合单个工作人员的内存。它仍然太大,不要使用合并。 Spark 会将其保存在多个文件中,然后您可以使用HDFS getmerge 来加入文件。
【讨论】:
以上是关于我如何让 pandas 使用 spark 集群的主要内容,如果未能解决你的问题,请参考以下文章
Zeppelin+Spark+Kubernetes:让 Zeppelin Job 在现有的 Spark 集群上运行
如何让 Zeppelin 在 EMR 集群上干净地重新启动?