将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery
Posted
技术标签:
【中文标题】将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery【英文标题】:Write a Pandas DataFrame to Google Cloud Storage or BigQuery 【发布时间】:2016-07-18 19:24:33 【问题描述】:您好,感谢您的时间和考虑。 我正在 Google Cloud Platform / Datalab 中开发 Jupyter Notebook。 我创建了一个 Pandas DataFrame,并希望将此 DataFrame 写入 Google Cloud Storage(GCS) 和/或 BigQuery。我在 GCS 中有一个存储桶,并通过以下代码创建了以下对象:
import gcp
import gcp.storage as storage
project = gcp.Context.default().project_id
bucket_name = 'steve-temp'
bucket_path = bucket_name
bucket = storage.Bucket(bucket_path)
bucket.exists()
我根据 Google Datalab 文档尝试了各种方法,但仍然失败。 谢谢
【问题讨论】:
我开发了一个 python 包,专门用于将数据从一个位置(例如 pandas.DataFrame)传输到另一个位置(例如 BigQuery 或 Storage):google-pandas-load.readthedocs.io/en/latest/。此外,它具有 100% 的测试覆盖率。 【参考方案1】:我认为您需要将其加载到纯字节变量中并在单独的单元格中使用 %%storage write --variable $sample_bucketpath(see the doc)...我仍在弄清楚...但是这与读取 CSV 文件所需的操作大致相反,我不知道它是否对写入有影响,但我必须使用 BytesIO 读取由 %% storage read 命令创建的缓冲区。 . 希望有帮助,告诉我!
【讨论】:
【参考方案2】:尝试以下工作示例:
from datalab.context import Context
import google.datalab.storage as storage
import google.datalab.bigquery as bq
import pandas as pd
# Dataframe to write
simple_dataframe = pd.DataFrame(data=[1,2,3,4,5,6],columns=['a','b','c'])
sample_bucket_name = Context.default().project_id + '-datalab-example'
sample_bucket_path = 'gs://' + sample_bucket_name
sample_bucket_object = sample_bucket_path + '/Hello.txt'
bigquery_dataset_name = 'TestDataSet'
bigquery_table_name = 'TestTable'
# Define storage bucket
sample_bucket = storage.Bucket(sample_bucket_name)
# Create storage bucket if it does not exist
if not sample_bucket.exists():
sample_bucket.create()
# Define BigQuery dataset and table
dataset = bq.Dataset(bigquery_dataset_name)
table = bq.Table(bigquery_dataset_name + '.' + bigquery_table_name)
# Create BigQuery dataset
if not dataset.exists():
dataset.create()
# Create or overwrite the existing table if it exists
table_schema = bq.Schema.from_data(simple_dataframe)
table.create(schema = table_schema, overwrite = True)
# Write the DataFrame to GCS (Google Cloud Storage)
%storage write --variable simple_dataframe --object $sample_bucket_object
# Write the DataFrame to a BigQuery table
table.insert(simple_dataframe)
我使用了this 示例,以及来自datalab github site 的_table.py 文件作为参考。您可以在this 链接找到其他datalab
源代码文件。
【讨论】:
请注意:我相信您需要在 Python 代码之外的单独单元格中执行 %%storage 命令? 这取决于您是要执行行魔术还是单元魔术命令。对于单元魔法,它是 %%storage,对于线魔法,它是 %storage。可以在与其他代码相同的单元格中使用行魔术命令。单元格魔术命令必须与其他代码位于单独的单元格中 感谢您的澄清 非常感谢 Anthonios... 我能够成功创建所有对象(例如,表和架构在我的 BQ 项目/数据集中)。但是,实际上没有向表中写入任何行,也没有生成错误消息。 在 Jupyter Notebook 中在 table.Insert_data(out) 之后生成了一个填充表,此行位于该表的底部:(rows: 0, edw-p19090000:ClickADS2.ADS_Logit1)跨度> 【参考方案3】:使用谷歌Cloud Datalab documentation
import datalab.storage as gcs
gcs.Bucket('bucket-name').item('to/data.csv').write_to(simple_dataframe.to_csv(),'text/csv')
【讨论】:
【参考方案4】:将 Pandas DataFrame 写入 BigQuery
>更新 @Anthonios Partheniou 的回答。 现在的代码有点不同 - 截至 11 月 。 29 2017
定义 BigQuery 数据集
将包含project_id
和dataset_id
的元组传递给bq.Dataset
。
# define a BigQuery dataset
bigquery_dataset_name = ('project_id', 'dataset_id')
dataset = bq.Dataset(name = bigquery_dataset_name)
定义 BigQuery 表
将包含project_id
、dataset_id
和表名的元组传递给bq.Table
。
# define a BigQuery table
bigquery_table_name = ('project_id', 'dataset_id', 'table_name')
table = bq.Table(bigquery_table_name)
创建数据集/表并写入BQ中的表
# Create BigQuery dataset
if not dataset.exists():
dataset.create()
# Create or overwrite the existing table if it exists
table_schema = bq.Schema.from_data(dataFrame_name)
table.create(schema = table_schema, overwrite = True)
# Write the DataFrame to a BigQuery table
table.insert(dataFrame_name)
【讨论】:
1.11.2
上的 exists()
函数对我来说在 python 中对于 google-cloud-bigquery
不存在【参考方案5】:
对于使用 Dask 的任务,我有一个更简单的解决方案。您可以将您的 DataFrame 转换为 Dask DataFrame,可以将其写入 Cloud Storage 上的 csv
import dask.dataframe as dd
import pandas
df # your Pandas DataFrame
ddf = dd.from_pandas(df,npartitions=1, sort=True)
dd.to_csv('gs://YOUR_BUCKET/ddf-*.csv', index=False, sep=',', header=False,
storage_options='token': gcs.session.credentials)
【讨论】:
【参考方案6】:自 2017 年以来,Pandas 有一个 Dataframe to BigQuery 函数pandas.DataFrame.to_gbq
documentation 有一个例子:
import pandas_gbq as gbq
gbq.to_gbq(df, 'my_dataset.my_table', projectid, if_exists='fail')
参数if_exists
可以设置为'fail'、'replace'或'append'
另见example。
【讨论】:
【参考方案7】:上传到谷歌云存储而不写入临时文件,只使用标准 GCS 模块
from google.cloud import storage
import os
import pandas as pd
# Only need this if you're running this code locally.
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'/your_GCP_creds/credentials.json'
df = pd.DataFrame(data=[1,2,3,4,5,6],columns=['a','b','c'])
client = storage.Client()
bucket = client.get_bucket('my-bucket-name')
bucket.blob('upload_test/test.csv').upload_from_string(df.to_csv(), 'text/csv')
【讨论】:
非常感谢这个不使用其他模块和现有存储桶。 如果您只想将文件推送到 GCS 上的存储桶,那么这是一个更合适的解决方案。如果您想推出 json 格式,这也可以使用:bucket.blob('upload_test/test.json').upload_from_string(df.to_json(), 'text/json') 如果您不希望索引作为文件中的列,请使用df.to_csv(index=False)
【参考方案8】:
我花了很多时间寻找最简单的方法来解决这个问题:
import pandas as pd
df = pd.DataFrame(...)
df.to_csv('gs://bucket/path')
【讨论】:
这非常简单。只需确保还安装gcsfs
作为先决条件(尽管它会提醒您)。如果您在 2020 年或更晚时间来到这里,只需跳过复杂性并执行此操作。【参考方案9】:
致Google storage
:
def write_df_to_gs(df, gs_key):
df.to_csv(gs_key)
致BigQuery
:
def upload_df_to_bq(df, project, bq_table):
df.to_gbq(bq_table, project_id=project, if_exists='replace')
【讨论】:
【参考方案10】:在 GCS 中保存 parquet 文件并使用服务帐户进行身份验证:
df.to_parquet("gs://<bucket-name>/file.parquet",
storage_options="token": <path-to-gcs-service-account-file>
【讨论】:
以上是关于将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章
将pandas的DataFrame数据写入MySQL数据库 + sqlalchemy
将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery
将pandas DataFrame写入sql时出现无效列名错误