如何将数据块中的数据从熊猫数据帧加载到火花数据帧
Posted
技术标签:
【中文标题】如何将数据块中的数据从熊猫数据帧加载到火花数据帧【英文标题】:How to load data in chunks from a pandas dataframe to a spark dataframe 【发布时间】:2016-07-30 23:37:18 【问题描述】:我已经使用类似这样的方法通过 pyodbc 连接读取数据块:
import pandas as pd
import pyodbc
conn = pyodbc.connect("Some connection Details")
sql = "SELECT * from TABLES;"
df1 = pd.read_sql(sql,conn,chunksize=10)
现在我想使用类似的东西将所有这些块读入一个单一的火花数据帧:
i = 0
for chunk in df1:
if i==0:
df2 = sqlContext.createDataFrame(chunk)
else:
df2.unionAll(sqlContext.createDataFrame(chunk))
i = i+1
问题是当我执行df2.count()
时,我得到的结果为 10,这意味着只有 i=0 的情况有效。这是 unionAll 的错误吗?我在这里做错了吗??
【问题讨论】:
【参考方案1】:.unionAll()
的文档声明它返回一个新的数据帧,因此您必须分配回df2
数据帧:
i = 0
for chunk in df1:
if i==0:
df2 = sqlContext.createDataFrame(chunk)
else:
df2 = df2.unionAll(sqlContext.createDataFrame(chunk))
i = i+1
此外,您可以改用 enumerate()
来避免自己管理 i
变量:
for i,chunk in enumerate(df1):
if i == 0:
df2 = sqlContext.createDataFrame(chunk)
else:
df2 = df2.unionAll(sqlContext.createDataFrame(chunk))
此外,.unionAll()
的文档指出 .unionAll()
已被弃用,现在您应该使用.union()
,它的作用类似于 SQL 中的 UNION ALL:
for i,chunk in enumerate(df1):
if i == 0:
df2 = sqlContext.createDataFrame(chunk)
else:
df2 = df2.union(sqlContext.createDataFrame(chunk))
编辑:
此外,我将停止进一步说,但在我进一步说之前:正如@zero323 所说,我们不要在循环中使用.union()
。让我们改为:
def unionAll(*dfs):
' by @zero323 from here: http://***.com/a/33744540/42346 '
first, *rest = dfs # Python 3.x, for 2.x you'll have to unpack manually
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]),
first.schema
)
df_list = []
for chunk in df1:
df_list.append(sqlContext.createDataFrame(chunk))
df_all = unionAll(df_list)
【讨论】:
此外,在循环中使用union(All)
之前,您真的应该三思而后行:) 提示:那里有像血统这样的东西。
@zero323:我建议 OP 创建一个列表,然后将该列表传递给 .union()
...
这仅适用于 RDD,因此您需要更多代码。你也可以截断,但它在 Python 中很难看。
@zero323:请查看编辑并让我知道您的想法。谢谢。
我很确定union
没有可变参数版本。您可以检查***.com/a/33744540/1560062 中的最后一个 sn-p,它仍然不是最理想的,但不使用私有 API。还有这个github.com/high-performance-spark/…以上是关于如何将数据块中的数据从熊猫数据帧加载到火花数据帧的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Google Cloud Storage 中的千兆字节数据加载到 pandas 数据帧中?