将谷歌存储桶中的所有 .csv 文件读取到一个大熊猫 df 中,然后以 .csv 格式保存到另一个桶中

Posted

技术标签:

【中文标题】将谷歌存储桶中的所有 .csv 文件读取到一个大熊猫 df 中,然后以 .csv 格式保存到另一个桶中【英文标题】:Reading all .csv files from a google storage bucket into one large pandas df, then saving back as .csv to another bucket 【发布时间】:2019-11-11 08:48:46 【问题描述】:

在我的谷歌云函数(Python 3.7 运行时)中,我创建了一个函数,它试图将所有 .csv 文件从谷歌存储桶下载到熊猫数据框 (df) 中。一旦进入数据帧,我将对其进行一些简单的 ETL 工作,然后将其转换回一个大的 .csv 文件以保存到另一个存储桶。 我遇到的问题是,当我将对象(使用 file.download_as_string() 转换为字符串)读入 read_csv() 时,我收到与 IO.StringIO 相关的错误(TypeError: initial_value must be str或无,不是字节)

在 read_csv(io.String.IO(file_contents)....) 中,这与我放置 io.StringIO 方法的位置有关吗?谁能帮我纠正这个错误?

    def stage1slemonthly(data, context, source_bucket = 'my_source_bucket', 
    destination_bucket = 'gs://my destination_bucket'):  


        from google.cloud import storage
        import pandas as pd
        import pyspark
        from pyspark.sql import SQLContext
        import io

        storage_client = storage.Client()

        # source_bucket = data['bucket']
        # source_file = data['name']
        source_bucket = storage_client.bucket(source_bucket)

        # load in the col names
        col_names = ["Customer_Country_Number", "Customer_Name", "Exclude",
             "SAP_Product_Name", "CP_Sku_Code", "Exclude", "UPC_Unit",
             "UPC_Case", "Colgate_Month_Year", "Total_Cases",
             "Promoted_Cases", "Non_Promoted_Cases",
             "Planned_Non_Promoted_Cases", "Exclude",
             "Lead_Measure", "Tons", "Pieces", "Liters",
             "Tons_Conv_Revenue", "Volume_POS_Units", "Scan_Volume",
             "WWhdrl_Volume", "Exclude", "Exclude", "Exclude", "Exclude",
             "Exclude", "Exclude", "Exclude", "Exclude", "Investment_Buy",
             "Exclude", "Exclude", "Gross_Sales", "Claim_Sales",
             "Adjusted_Gross_Sales", "Exclude", "Exclude",
             "Consumer_Investment", "Consumer_Allowance",
             "Special_Pack_FG", "Coupons", "Contests_Offers", 
             "Consumer_Price_Reduction", "Permanent_Price_Reduction",
             "Temporary_Price_Reduction", "TPR_Off_Invoice", "TPR_Scan",
             "TPR_WWdrwl_Exfact", "Every_Day_Low_Price", "Closeouts",
             "Inventory_Price_Reduction", "Exclude", "Customer_Investment",
             "Prompt_Payment", "Efficiency_Drivers", "Efficient_Logistics",
             "Efficient_Management", "Business_Builders_Direct", "Assortment",
             "Customer_Promotions","Customer_Promotions_Terms",
             "Customer_Promotions_Fixed", "Growth_Direct",
             "New_Product_Incentives", "Free_Goods_Direct",
             "Shopper_Marketing", "Business_Builders_Indirect",
             "Middleman_Performance", "Middleman_Infrastructure",
             "Growth_Indirect", "Indirect_Retailer_Investments",
             "Free_Goods_Indirect", "Other_Customer_Investments",
             "Product_Listing_Allowances", "Non_Performance_Trade_Payments",
             "Exclude", "Variable_Rebate_Adjustment", 
             "Overlapping_OI_Adjustment", "Fixed_Accruals",
             "Variable_Accruals", "Total_Accruals", "Gross_To_Net",
             "Invoiced_Sales", "Exclude", "Exclude", "Net_Sales",
             "Exclude", "Exclude", "Exclude", "Exclude", "Exclude",
             "Exclude", "Exclude", "Exclude", "Exclude",
             "Total_Variable_Cost", "Margin", "Exclude"]

        df = pd.DataFrame(columns=[col_names])

        for file in list(source_bucket.list_blobs()):
          file_contents = file.download_as_string() 
          df = df.append(pd.read_csv(io.StringIO(file_contents), header=None, names=[col_names]))

        df = df.reset_index(drop=True)

        # do ETL work here in future

        sc = pyspark.SparkContext.getOrCreate()
        sqlCtx = SQLContext(sc)
        sparkDf = sqlCtx.createDataFrame(df)
        sparkDf.coalesce(1).write.option("header", "true").csv(destination_bucket)

当我运行它时,我收到以下错误消息...

回溯(最近一次调用最后一次):文件“/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py”,第 383 行,在 run_background_function _function_handler.invoke_user_function(event_object)文件“/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py”,第 217 行,invoke_user_function return call_user_function(request_or_event) 文件“/env/local/lib/python3.7 /site-packages/google/cloud/functions/worker.py”,第 214 行,在 call_user_function event_context.Context(**request_or_event.context)) 文件“/user_code/main.py”,第 56 行,在 stage1slemonthly df = df .append(pd.read_csv(io.StringIO(file_contents), header=None, names=[col_names])) TypeError: initial_value must be str or None, not bytes

【问题讨论】:

【参考方案1】:

您收到此错误是因为 file.download_as_string() return type 是 bytes 而不是 str,并且您不能将 io.StringIObytes 参数一起使用 (initial_value=file_contents)。

此外,col_names 在这里被定义为一个数组,所以写pd.DataFrame(columns=[col_names])pd.read_csv(..., names=[col_names]) 是不正确的:你应该使用col_names 而不是[col_names]

无论如何,从 Google Cloud Storage 读取 CSV 文件似乎不是正确的方法。你宁愿写:

from google.cloud import storage
import pandas as pd
import io

storage_client = storage.Client()

source_bucket = storage_client.bucket(source_bucket)

col_names = ["Customer_Country_Number", "Customer_Name", ...]

df = pd.DataFrame(columns=col_names)

for file in list(source_bucket.list_blobs()):
    file_path="gs:///".format(file.bucket.name, file.name)
    df = df.append(pd.read_csv(file_path, header=None, names=col_names))

# the rest of your code

确实,你可以read files directly from GCS 和pandasread_csv 方法,而不是下载文件来加载它,但你需要先安装gcsfs (pip3 install gcsfs)。

【讨论】:

感谢您提供上述答案 - 代码运行没有任何错误,并按预期将 csv 输出生成到新的 gcs 存储桶中。不幸的是,我现在遇到的问题是原始“源存储桶”中的 csv 数据没有将任何实际数据拉入 pandas df。这可能与 pd.read_csv 读取 file_path 的方式有关吗? 在我的代码中,为了确保问题与数据从 pandas df 输出到 csv(然后到谷歌云存储)的方式无关,我创建了一个额外的随机 df 并附加这对于由 read_csv 创建的 df-csv 仅使用随机生成的 df 完美输出。这意味着它与我的代码在开始时读取 .csv 文件的方式有关。 我不确定我是否理解。例如source_bucket中有2个csv文件(每个文件一行),你是说只有最后加载的csv文件的行在df中(for循环之后)?跨度> 抱歉,我没有很好地解释我的新问题。为了说明我的问题,在我的谷歌存储桶 source_bucket 中,我有 6 个结构相同的 csv 文件,我正在尝试读取并附加到 df = df.append(pd.read_csv(file_path, header=None, names=col_names)) 中,以获得一个大的 df。使用 pyspark,我将这个大的 df 写入 .csv 文件并上传到 destination_bucket 。代码执行没有错误,但是当我在 destination_bucket 打开新创建的 .csv 文件时,那里没有数据行(在列标题 col_names 上) norbjd - 我查看了您对我的问题的原始回复,并注意到我忘记从我的 FOR 循环中删除 io.StringIO。已将其删除,代码现在可以正常运行。谢谢!

以上是关于将谷歌存储桶中的所有 .csv 文件读取到一个大熊猫 df 中,然后以 .csv 格式保存到另一个桶中的主要内容,如果未能解决你的问题,请参考以下文章

如何从 S3 存储桶中读取最后修改的 csv 文件?

如何从 S3 存储桶中仅读取最近 7 天的 csv 文件

将 GCS 存储桶中的 .csv 文件中的数据加载到 Cloud SQL 表中:

如何在谷歌存储桶中列出所有上传到存储桶的文件?

将数据从谷歌数据存储复制到 CSV

将谷歌云存储文件夹挂载到谷歌人工智能平台作业