GCP Dataflow- 从存储中读取 CSV 文件并写入 BigQuery

Posted

技术标签:

【中文标题】GCP Dataflow- 从存储中读取 CSV 文件并写入 BigQuery【英文标题】:GCP Dataflow- read CSV file from Storage and write into BigQuery 【发布时间】:2017-10-06 15:55:12 【问题描述】:

我在 Storage 中有一个 CSV 文件,我想读取它并将其写入 BigQuery Table。这是我的 CSV 文件,其中第一行是标题:

GroupName,Groupcode,GroupOwner,GroupCategoryID
System Administrators,sysadmin,13456,100
Independence High Teachers,HS Teachers,,101
John Glenn Middle Teachers,MS Teachers,13458,102
Liberty Elementary Teachers,Elem Teachers,13559,103
1st Grade Teachers,1stgrade,,104
2nd Grade Teachers,2nsgrade,13561,105
3rd Grade Teachers,3rdgrade,13562,106
Guidance Department,guidance,,107
Independence Math Teachers,HS Math,13660,108
Independence English Teachers,HS English,13661,109
John Glenn 8th Grade Teachers,8thgrade,,110
John Glenn 7th Grade Teachers,7thgrade,13452,111
Elementary Parents,Elem Parents,,112
Middle School Parents,MS Parents,18001,113
High School Parents,HS Parents,18002,114

这是我的代码:

    public class StorgeBq 

        public static class StringToRowConverter extends DoFn<String, TableRow> 

            private String[] columnNames;

            private boolean isFirstRow = true;

            @ProcessElement
            public void processElement(ProcessContext c) 
                TableRow row = new TableRow();

                String[] parts = c.element().split(",");

                if (isFirstRow) 
                    columnNames = Arrays.copyOf(parts, parts.length);
                    isFirstRow = false;
                 else 
                    for (int i = 0; i < parts.length; i++) 
                        row.set(columnNames[i], parts[i]);
                    
                    c.output(row);
                
            
        

        public static void main(String[] args) 

            DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                      .as(DataflowPipelineOptions.class);
                    options.setZone("europe-west1-c");
                    options.setProject("mydata-dev");
                    options.setRunner(DataflowRunner.class);
                    Pipeline p = Pipeline.create(options);

            p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv"))
            .apply("ConverToBqRow",ParDo.of(new StringToRowConverter()))
            .apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows()
                    .to("mydata-dev:DF_TEST.dataflow_table")
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER));
            p.run().waitUntilFinish();
        


有一些问题: 1)当作业开始执行时,我看到有一个名为“DropInputs”的进程,我没有在我的代码中定义它!并在所有任务之前开始运行,Why??

2) 为什么管道不从第一个任务“ReadLines”开始? 3)在日志文件中,我看到在“WriteToBq”任务中,它试图找到其中一个数据作为字段,例如“1st Grade Teachers”不是字段而是“GroupName”的数据:

"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.",

【问题讨论】:

你有工作ID吗?我认为 DropInputs 不应该出现在这里。 【参考方案1】:

您的代码中有几个问题。但是,首先,关于“DropInputs”阶段——你可以放心地忽略它。这是this 错误报告的结果。我仍然不明白为什么需要显示它(这也让我们的很多用户感到困惑),我希望 Google 员工能够参与其中。在我看来,这只是混乱。

好的,现在到你的代码:

    您假设读取的第一行将是您的标题。这是一个不正确的假设。数据流并行读取,因此标题行可能随时到达。不要使用boolean 标志来检查,而是每次在ParDo 中检查string 值本身,例如if (c.element.contains("GroupName") then.. 您缺少 BigQuery 表架构。您需要将 withSchema(..) 添加到您的 BigQuery 接收器。这是来自我的一个公共管道的example。

【讨论】:

谢谢,但你能告诉我,当我想在现有表中写入 BigQuery 时,应该如何在不添加“withSchema(..)”的情况下写入?因为这是一个示例 csv,但我应该为几个 CSV 文件执行此任务,每个文件都有大约 500 个列,并且总是添加模式并不容易。顺便问一下,您有读取 CSV 并写入 BigQuery 的示例吗? 我已经成功将CSV文件写入BQ,唯一的问题是写入BQ而不使用“withSchema”或从BQ获取它而不是编写代码的简单方法,你呢有什么想法吗? 请为此打开一个单独的问题。

以上是关于GCP Dataflow- 从存储中读取 CSV 文件并写入 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 python 将字典写入 Dataflow 中的 Bigquery

使用 GCP Cloud DataFlow 读取 BigTable 并转换为通用记录

Dataflow SQL (GCP) 不支持使用 STRUCT 的嵌套行

在 Dataflow Python 中从 PubSub 读取 AVRO 消息

从存储中读取 JSON 数组并发送到 GCP PubSub

如何从 GCP 存储桶中读取 Apache Beam 中的多个文件