使用 DataFlow 作业加载分区表
Posted
技术标签:
【中文标题】使用 DataFlow 作业加载分区表【英文标题】:Partitioned table loading using DataFlow job 【发布时间】:2017-07-25 12:10:23 【问题描述】:我想读取文件并需要根据文件字段中存在的日期值将其写入 BigQuery 分区表。例如如果文件包含 2 个日期 7 月 25 日和 26 日,则 DataFlow 应根据文件中存在的数据将该数据写入 2 个分区。
public class StarterPipeline
private static final Logger LOG =
LoggerFactory.getLogger(StarterPipeline.class);
public static void main(String[] args)
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("");
options.setTempLocation("gs://stage_location/");
Pipeline p = Pipeline.create(options);
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("id").setType("STRING"));
fields.add(new TableFieldSchema().setName("name").setType("STRING"));
fields.add(new TableFieldSchema().setName("designation").setType("STRING"));
fields.add(new TableFieldSchema().setName("joindate").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
PCollection<String> read = p.apply("Read Lines",TextIO.read().from("gs://hadoop_source_files/employee.txt"));
PCollection<TableRow> rows = read.apply(ParDo.of(new DoFn<String,TableRow>()
@ProcessElement
public void processElement(ProcessContext c)
String[] data = c.element().split(",");
c.output(new TableRow().set("id", data[0]).set("name", data[1]).set("designation", data[2]).set("joindate", data[3]));
));
rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>()
public String getDate(String value)
return "project:dataset.DataFlow_Test$"+value;
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value)
TableRow row = value.getValue();
String date = getDate(row.get("joindate").toString());
String tableSpec = date;
String tableDescription = "";
return new TableDestination(tableSpec, tableDescription);
).withFormatFunction(new SerializableFunction<TableRow, TableRow>()
@Override
public TableRow apply(TableRow input)
// TODO Auto-generated method stub
return input;
).withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
在上面的程序运行时出现以下错误:线程“主”org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:java.lang.IllegalArgumentException:表引用不在 [project_id]:[dataset_id] 中。 [table_id] 格式:引起:java.lang.IllegalArgumentException:表引用不是 [project_id]:[dataset_id].[table_id] 格式。如果有任何建议,请告诉我
【问题讨论】:
欢迎来到 Stack Overflow!请仅将 cmets 用于实际的 cmets。您应该 edit 您的原始问题以添加更多信息。 【参考方案1】:Beam 目前不支持日期分区表。有关跟踪此功能的问题,请参阅 BEAM-2390。
【讨论】:
【参考方案2】:我可以使用以下代码根据数据中存在的日期将数据加载到分区表中:
rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>()
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value)
TableRow row = value.getValue();
TableReference reference = new TableReference();
reference.setProjectId("");
reference.setDatasetId("");
reference.setTableId("tabelname$" + row.get("datefield").toString());
return new TableDestination(reference, null);
).withFormatFunction(new SerializableFunction<TableRow, TableRow>()
@Override
public TableRow apply(TableRow input)
LOG.info("format function:"+input.toString());
return input;
).withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
【讨论】:
以上是关于使用 DataFlow 作业加载分区表的主要内容,如果未能解决你的问题,请参考以下文章
使用 write_truncate 通过 Google Dataflow/Beam 将数据加载到 Biqquery 分区表中
通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表