使用 DataFlow 作业加载分区表

Posted

技术标签:

【中文标题】使用 DataFlow 作业加载分区表【英文标题】:Partitioned table loading using DataFlow job 【发布时间】:2017-07-25 12:10:23 【问题描述】:

我想读取文件并需要根据文件字段中存在的日期值将其写入 BigQuery 分区表。例如如果文件包含 2 个日期 7 月 25 日和 26 日,则 DataFlow 应根据文件中存在的数据将该数据写入 2 个分区。

public class StarterPipeline 
  private static final Logger LOG =
      LoggerFactory.getLogger(StarterPipeline.class);

  public static void main(String[] args) 
    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setProject("");
    options.setTempLocation("gs://stage_location/");
    Pipeline p = Pipeline.create(options);

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("id").setType("STRING"));
    fields.add(new TableFieldSchema().setName("name").setType("STRING"));
    fields.add(new TableFieldSchema().setName("designation").setType("STRING"));
    fields.add(new TableFieldSchema().setName("joindate").setType("STRING"));
    TableSchema schema = new TableSchema().setFields(fields);

    PCollection<String> read = p.apply("Read Lines",TextIO.read().from("gs://hadoop_source_files/employee.txt"));

    PCollection<TableRow> rows = read.apply(ParDo.of(new DoFn<String,TableRow>()
      @ProcessElement
      public void processElement(ProcessContext c) 
        String[] data = c.element().split(",");

        c.output(new TableRow().set("id", data[0]).set("name", data[1]).set("designation", data[2]).set("joindate", data[3]));
      
    ));


    rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() 
      public String getDate(String value) 
        return "project:dataset.DataFlow_Test$"+value;
      

      @Override
      public TableDestination apply(ValueInSingleWindow<TableRow> value) 
        TableRow row = value.getValue();
        String date = getDate(row.get("joindate").toString());
        String tableSpec = date;
        String tableDescription = "";
        return new TableDestination(tableSpec, tableDescription);
      
    ).withFormatFunction(new SerializableFunction<TableRow, TableRow>() 
      @Override
      public TableRow apply(TableRow input) 
        // TODO Auto-generated method stub
        return input;
      
    ).withSchema(schema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

    p.run();
  

在上面的程序运行时出现以下错误:线程“主”org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:java.lang.IllegalArgumentException:表引用不在 [project_id]:[dataset_id] 中。 [table_id] 格式:引起:java.lang.IllegalArgumentException:表引用不是 [project_id]:[dataset_id].[table_id] 格式。如果有任何建议,请告诉我

【问题讨论】:

欢迎来到 Stack Overflow!请仅将 cmets 用于实际的 cmets。您应该 edit 您的原始问题以添加更多信息。 【参考方案1】:

Beam 目前不支持日期分区表。有关跟踪此功能的问题,请参阅 BEAM-2390。

【讨论】:

【参考方案2】:

我可以使用以下代码根据数据中存在的日期将数据加载到分区表中:

       rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() 
         @Override
         public TableDestination apply(ValueInSingleWindow<TableRow> value) 
           TableRow row = value.getValue();
           TableReference reference = new TableReference();
           reference.setProjectId("");
           reference.setDatasetId("");

           reference.setTableId("tabelname$" + row.get("datefield").toString());
           return new TableDestination(reference, null);
         
       ).withFormatFunction(new SerializableFunction<TableRow, TableRow>() 
         @Override
         public TableRow apply(TableRow input) 
            LOG.info("format function:"+input.toString());
           return input;
         
       ).withSchema(schema)
           .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
           .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

【讨论】:

以上是关于使用 DataFlow 作业加载分区表的主要内容,如果未能解决你的问题,请参考以下文章

使用 write_truncate 通过 Google Dataflow/Beam 将数据加载到 Biqquery 分区表中

通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表

通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表

加载到 Hive 分区 Parquet 表时内存不足

Sqoop 增量加载到分区的配置单元表中

我想使用 sqoop 导入作业将数据 sqoop 到 hive 列分区表中。我们应该怎么做?