无法从数据流中的 GCS 读取我的配置文本文件(列名)

Posted

技术标签:

【中文标题】无法从数据流中的 GCS 读取我的配置文本文件(列名)【英文标题】:Unable to read my config text file(Column Names) from GCS in dataflow 【发布时间】:2018-06-22 17:03:12 【问题描述】:

我在 GCS 中有一个源 CSV 文件(没有标题)以及标题配置 CSV 文件(仅包含列名)。我在 Bigquery 中也有静态表。我想通过使用列标题映射(配置文件)将源文件加载到静态表中。

我之前尝试过不同的方法(我在同一文件中维护包含标题和数据的源文件,然后尝试从源文件中拆分标题,然后使用标题列映射将这些数据插入 Bigquery。我注意到这种方法是不可能,因为数据流将数据洗牌到多个工作节点中。所以我放弃了这种方法。

下面的代码我使用了硬编码的列名。我正在寻找从外部配置文件中读取列名的方法(我想让我的代码成为动态的)。

package com.coe.cog;

import java.io.BufferedReader;
import java.util.*;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;

public class SampleTest 
            private static final Logger LOG = LoggerFactory.getLogger(SampleTest.class);

            public static TableReference getGCDSTableReference() 
                        TableReference ref = new TableReference();
                        ref.setProjectId("myownproject");
                        ref.setDatasetId("DS_Employee");
                        ref.setTableId("tLoad14");
                        return ref;
            

            static class TransformToTable extends DoFn<String, TableRow> 
                        @ProcessElement
                        public void processElement(ProcessContext c) 

                            String csvSplitBy = ",";

                            String lineHeader = "ID,NAME,AGE,SEX"; // Hard code column name but i want to read these header from GCS file.

                            String[] colmnsHeader = lineHeader.split(csvSplitBy); //Only Header array

                            String[] split = c.element().split(csvSplitBy); //Data section

                                TableRow row = new TableRow();


                                for (int i = 0; i < split.length; i++)                                  

                                  row.set(colmnsHeader[i], split[i]);

                                

                              c.output(row);
                           // 
                            

            

            public interface MyOptions extends PipelineOptions 

                        /*
                        * Param
                        *
                        */

            

            public static void main(String[] args) 

                        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

                        options.setTempLocation("gs://demo-bucket-data/temp");

                        Pipeline p = Pipeline.create(options);

                        PCollection<String> lines = p.apply("Read From Storage", TextIO.read().from("gs://demo-bucket-data/Demo/Test/SourceFile_WithOutHeader.csv"));

                        PCollection<TableRow> rows = lines.apply("Transform To Table",ParDo.of(new TransformToTable()));

                        rows.apply("Write To Table",BigQueryIO.writeTableRows().to(getGCDSTableReference())                                              
                                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

                        p.run();
            

源文件:

1,John,25,M 
2,Smith,30,M
3,Josephine,20,F

配置文件(仅标题):

ID,NAME,AGE,SEX

【问题讨论】:

【参考方案1】:

你有几个选择:

    使用 Dataflow/Beam side input 将配置/头文件读入某种集合,例如一个ArrayList。它将可供集群中的所有工作人员使用。然后,您可以使用 side input 将架构动态分配给使用 DynamicDestinations 的 BigQuery 表。 在进入您的 Dataflow 管道之前,直接调用 GCS api 来获取您的配置/头文件,对其进行解析,然后将结果设置为您的管道。

【讨论】:

感谢您的及时回复。让我检查一下并回复你。 似乎文件中的值顺序正在改变。我需要以 json 格式读取配置文件。当我通过 sideinputs 获取它时,我无法解析它,原因:文件结构正在改变。【参考方案2】:

使用 Beam 的 FileSystems API 从 GCS 读取配置文件是另一种方法。

优点:

无需其他依赖项,它包含在 beam API 中。 使用 GCP 的客户端库可能会导致依赖版本问题。 我们可以在任何转换中使用梁的 FileSystems API。

这是一个读取文件的sn-p。

//filePath format: gs://bucket/file

    public static String loadSchema(String filePath) 
            MatchResult.Metadata metadata;
            try 
                metadata = FileSystems.matchSingleFileSpec(filePath);  // searching 
             catch (IOException e) 
                throw new RuntimeException(e);
            
            String schema;
    
            try 
               // reading file
                schema = CharStreams.toString(
                    Channels.newReader(
                            FileSystems.open(metadata.resourceId()),
                            StandardCharsets.UTF_8.name()
                    )
                );
             catch (IOException e) 
                throw new RuntimeException(e);
            
            // returning content as string. We can process it now. 
            return schema;
         

侧输入的缺点

文件的方向发生变化。 Json 等多行文件很难解析。

Side Input 可用于单行静态值。

【讨论】:

以上是关于无法从数据流中的 GCS 读取我的配置文本文件(列名)的主要内容,如果未能解决你的问题,请参考以下文章

GCS - 从 Google Cloud Storage 直接读取文本文件到 python

是否可以使用 UDF 从 BigQuery 读取 gcs 对象的元数据

从 Dataflow 中的 GCS 读取时如何获取正在处理的文件名?

apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python

读取数据时出错,错误消息:CSV 表引用列位置 174,但从位置开始的行:136868 仅包含 94 列

使用 Dataflow 管道 (python) 将多个 Json zip 文件从 GCS 加载到 BigQuery