如何在谷歌云数据流管道中传递动态参数

Posted

技术标签:

【中文标题】如何在谷歌云数据流管道中传递动态参数【英文标题】:how to pass dynamic parameters in google cloud dataflow pipeline 【发布时间】:2018-06-06 16:54:46 【问题描述】:

我编写了代码,将 CSV 文件从 GCS 注入到 BigQuery,其中包含硬编码的 ProjectID、数据集、表名称、GCS Temp 和暂存位置。

我正在寻找应该阅读的代码

项目ID 数据集 表名 GCS 温度和分段位置参数

来自BigQuery table(Dynamic parameters)

代码:-

public class DemoPipeline 

public static TableReference getGCDSTableReference() 
    TableReference ref = new TableReference();
    ref.setProjectId("myprojectbq");
    ref.setDatasetId("DS_Emp");
    ref.setTableId("emp");
    return ref;

static class TransformToTable extends DoFn<String, TableRow> 
    @ProcessElement
    public void processElement(ProcessContext c) 

        String input = c.element();

        String[] s = input.split(",");
        TableRow row = new TableRow();

        row.set("id", s[0]);
        row.set("name", s[1]);
        c.output(row);

    

public interface MyOptions extends PipelineOptions 

    /*
     * Param
     * 
     */



public static void main(String[] args) 

    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
    options.setTempLocation("gs://demo-xxxxxx/temp");
    Pipeline p = Pipeline.create(options);

    PCollection<String> lines = p.apply("Read From Storage", TextIO.read().from("gs://demo-xxxxxx/student.csv"));

    PCollection<TableRow> rows = lines.apply("Transform To Table",ParDo.of(new TransformToTable()));

    rows.apply("Write To Table",BigQueryIO.writeTableRows().to(getGCDSTableReference())
            //.withSchema(BQTableSemantics.getGCDSTableSchema())
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

    p.run();


【问题讨论】:

我不太明白这个问题。您想使用 BigQuery 作为源,并根据您从其他源处理的元素从特定表和/或数据集加载吗?或者将其用作接收器并根据您从其他来源处理的元素写入特定表和/或数据集? 感谢亚历克斯的回复。我的要求是将 CSV 文件从 GCS 加载到 BigQuery,而无需在 Java 代码中硬编码项目 ID/数据集/表名称。我想从外部存储或动态参数(模板)中读取这些参数。请多多指教。 @Kannan 只需使用配置文件 @Haris Nadeem ,如果您提供一些示例以及如何从 GCS 读取配置文件,将不胜感激。我的要求是从 GCS 读取源 CSV 文件并与来自 GCS 的配置 CSV 文件(我将维护列名)进行比较,然后将其加载到 Bigquery 中。提前致谢。 您可以在此处找到配置文件的示例:mkyong.com/java/java-properties-file-examples,然后您只需将配置文件与您的工作打包 【参考方案1】:

即使要从包含其他数据的初始表(项目 ID/数据集/表名称)中读取,您也需要在某处硬编码此类信息。 Haris 推荐的属性文件是一个不错的方法,请查看以下建议:

    Java Properties file。在必须更改或调整参数时使用。通常,不需要重新编译的更改。这是一个必须存在或附加到您的 java 类的文件。从 GCS 读取此文件是可行的,但这是一个奇怪的选择。

    管道执行参数。自定义参数可以解决您的问题,请查看Creating Custom Options 了解如何完成,here is a small example。

【讨论】:

以上是关于如何在谷歌云数据流管道中传递动态参数的主要内容,如果未能解决你的问题,请参考以下文章

如何在谷歌云构建中将参数传递给 docker run

Bigquery 如何使用存储在谷歌云中的数据?

Python:在谷歌云数据存储模拟器中保存数据

如何在谷歌云存储中启用实时对象访问分析?

如何在谷歌大查询中从谷歌云存储上传表格

如何以编程方式在谷歌云运行 api 中获取当前项目 ID