将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery

Posted

技术标签:

【中文标题】将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery【英文标题】:Write a Pandas DataFrame to Google Cloud Storage or BigQuery 【发布时间】:2016-07-18 19:24:33 【问题描述】:

您好,感谢您的时间和考虑。 我正在 Google Cloud Platform / Datalab 中开发 Jupyter Notebook。 我创建了一个 Pandas DataFrame,并希望将此 DataFrame 写入 Google Cloud Storage(GCS) 和/或 BigQuery。我在 GCS 中有一个存储桶,并通过以下代码创建了以下对象:

import gcp
import gcp.storage as storage
project = gcp.Context.default().project_id    
bucket_name = 'steve-temp'           
bucket_path  = bucket_name   
bucket = storage.Bucket(bucket_path)
bucket.exists()  

我根据 Google Datalab 文档尝试了各种方法,但仍然失败。 谢谢

【问题讨论】:

我开发了一个 python 包,专门用于将数据从一个位置(例如 pandas.DataFrame)传输到另一个位置(例如 BigQuery 或 Storage):google-pandas-load.readthedocs.io/en/latest/。此外,它具有 100% 的测试覆盖率。 【参考方案1】:

我认为您需要将其加载到纯字节变量中并在单独的单元格中使用 %%storage write --variable $sample_bucketpath(see the doc)...我仍在弄清楚...但是这与读取 CSV 文件所需的操作大致相反,我不知道它是否对写入有影响,但我必须使用 BytesIO 读取由 %% storage read 命令创建的缓冲区。 . 希望有帮助,告诉我!

【讨论】:

【参考方案2】:

尝试以下工作示例:

from datalab.context import Context
import google.datalab.storage as storage
import google.datalab.bigquery as bq
import pandas as pd

# Dataframe to write
simple_dataframe = pd.DataFrame(data=[1,2,3,4,5,6],columns=['a','b','c'])

sample_bucket_name = Context.default().project_id + '-datalab-example'
sample_bucket_path = 'gs://' + sample_bucket_name
sample_bucket_object = sample_bucket_path + '/Hello.txt'
bigquery_dataset_name = 'TestDataSet'
bigquery_table_name = 'TestTable'

# Define storage bucket
sample_bucket = storage.Bucket(sample_bucket_name)

# Create storage bucket if it does not exist
if not sample_bucket.exists():
    sample_bucket.create()

# Define BigQuery dataset and table
dataset = bq.Dataset(bigquery_dataset_name)
table = bq.Table(bigquery_dataset_name + '.' + bigquery_table_name)

# Create BigQuery dataset
if not dataset.exists():
    dataset.create()

# Create or overwrite the existing table if it exists
table_schema = bq.Schema.from_data(simple_dataframe)
table.create(schema = table_schema, overwrite = True)

# Write the DataFrame to GCS (Google Cloud Storage)
%storage write --variable simple_dataframe --object $sample_bucket_object

# Write the DataFrame to a BigQuery table
table.insert(simple_dataframe)

我使用了this 示例,以及来自datalab github site 的_table.py 文件作为参考。您可以在this 链接找到其他datalab 源代码文件。

【讨论】:

请注意:我相信您需要在 Python 代码之外的单独单元格中执行 %%storage 命令? 这取决于您是要执行行魔术还是单元魔术命令。对于单元魔法,它是 %%storage,对于线魔法,它是 %storage。可以在与其他代码相同的单元格中使用行魔术命令。单元格魔术命令必须与其他代码位于单独的单元格中 感谢您的澄清 非常感谢 Anthonios... 我能够成功创建所有对象(例如,表和架构在我的 BQ 项目/数据集中)。但是,实际上没有向表中写入任何行,也没有生成错误消息。 在 Jupyter Notebook 中在 table.Insert_data(out) 之后生成了一个填充表,此行位于该表的底部:(rows: 0, edw-p19090000:ClickADS2.ADS_Logit1)跨度> 【参考方案3】:

使用谷歌Cloud Datalab documentation

import datalab.storage as gcs
gcs.Bucket('bucket-name').item('to/data.csv').write_to(simple_dataframe.to_csv(),'text/csv')

【讨论】:

【参考方案4】:

将 Pandas DataFrame 写入 BigQuery

​​>

更新 @Anthonios Partheniou 的回答。 现在的代码有点不同 - 截至 11 月 。 29 2017

定义 BigQuery 数据集

将包含project_iddataset_id 的元组传递给bq.Dataset

# define a BigQuery dataset    
bigquery_dataset_name = ('project_id', 'dataset_id')
dataset = bq.Dataset(name = bigquery_dataset_name)

定义 BigQuery 表

将包含project_iddataset_id 和表名的元组传递给bq.Table

# define a BigQuery table    
bigquery_table_name = ('project_id', 'dataset_id', 'table_name')
table = bq.Table(bigquery_table_name)

创建数据集/表并写入BQ中的表

# Create BigQuery dataset
if not dataset.exists():
    dataset.create()

# Create or overwrite the existing table if it exists
table_schema = bq.Schema.from_data(dataFrame_name)
table.create(schema = table_schema, overwrite = True)

# Write the DataFrame to a BigQuery table
table.insert(dataFrame_name)

【讨论】:

1.11.2 上的 exists() 函数对我来说在 python 中对于 google-cloud-bigquery 不存在【参考方案5】:

对于使用 Dask 的任务,我有一个更简单的解决方案。您可以将您的 DataFrame 转换为 Dask DataFrame,可以将其写入 Cloud Storage 上的 csv

import dask.dataframe as dd
import pandas
df # your Pandas DataFrame
ddf = dd.from_pandas(df,npartitions=1, sort=True)
dd.to_csv('gs://YOUR_BUCKET/ddf-*.csv', index=False, sep=',', header=False,  
                               storage_options='token': gcs.session.credentials)  

【讨论】:

【参考方案6】:

自 2017 年以来,Pandas 有一个 Dataframe to BigQuery 函数pandas.DataFrame.to_gbq

documentation 有一个例子:

import pandas_gbq as gbq gbq.to_gbq(df, 'my_dataset.my_table', projectid, if_exists='fail')

参数if_exists可以设置为'fail'、'replace'或'append'

另见example。

【讨论】:

【参考方案7】:

上传到谷歌云存储而不写入临时文件,只使用标准 GCS 模块

from google.cloud import storage
import os
import pandas as pd

# Only need this if you're running this code locally.
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'/your_GCP_creds/credentials.json'

df = pd.DataFrame(data=[1,2,3,4,5,6],columns=['a','b','c'])

client = storage.Client()
bucket = client.get_bucket('my-bucket-name')
    
bucket.blob('upload_test/test.csv').upload_from_string(df.to_csv(), 'text/csv')

【讨论】:

非常感谢这个不使用其他模块和现有存储桶。 如果您只想将文件推送到 GCS 上的存储桶,那么这是一个更合适的解决方案。如果您想推出 json 格式,这也可以使用:bucket.blob('upload_test/test.json').upload_from_string(df.to_json(), 'text/json') 如果您不希望索引作为文件中的列,请使用 df.to_csv(index=False)【参考方案8】:

我花了很多时间寻找最简单的方法来解决这个问题:

import pandas as pd

df = pd.DataFrame(...)

df.to_csv('gs://bucket/path')

【讨论】:

这非常简单。只需确保还安装gcsfs 作为先决条件(尽管它会提醒您)。如果您在 2020 年或更晚时间来到这里,只需跳过复杂性并执行此操作。【参考方案9】:

Google storage

def write_df_to_gs(df, gs_key):
    df.to_csv(gs_key)    

BigQuery

def upload_df_to_bq(df, project, bq_table):
    df.to_gbq(bq_table, project_id=project, if_exists='replace')

【讨论】:

【参考方案10】:

在 GCS 中保存 parquet 文件并使用服务帐户进行身份验证:

df.to_parquet("gs://<bucket-name>/file.parquet",
               storage_options="token": <path-to-gcs-service-account-file>

【讨论】:

以上是关于将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

将pandas的DataFrame数据写入MySQL数据库 + sqlalchemy

将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery

Pandas dataframe数据写入文件和数据库

将pandas DataFrame写入sql时出现无效列名错误

将 Pandas DataFrame 写入换行符分隔的 JSON

将 Pandas dataframe.groupby 结果写入 S3 存储桶