在 BigQuery Apache Beam 中访问 TableRow 列
Posted
技术标签:
【中文标题】在 BigQuery Apache Beam 中访问 TableRow 列【英文标题】:Accessing TableRow columns in BigQuery Apache Beam 【发布时间】:2018-04-17 16:49:24 【问题描述】:我正在尝试
1.从 Cloud Pub/Sub 中读取 JSON 事件
2.每 15 分钟使用文件加载将事件从 Cloud Pub/Sub 加载到 BigQuery,以节省流式插入的费用。
3.目的地将根据 JSON 事件中的“user_id”和“campaign_id”字段而有所不同,“user_id”将是数据集名称,“campaign_id”将是表名。分区名称来自事件时间戳。
4.所有表的架构保持不变。
我是 Java 和 Beam 的新手。我认为我的代码大部分都在做我想做的事情,我在这里只需要一点帮助。
但我无法访问 JSON 消息中的“campaign_id”和“user_id”字段。 所以,我的事件没有路由到正确的表。
package ...;
import com.google.api.services.bigquery.model.TableSchema;
import javafx.scene.control.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND;
public class ClickLogConsumer
private static final int BATCH_INTERVAL_SECS = 15 * 60;
private static final String PROJECT = "pure-app";
public static PTransform<PCollection<String>, PCollection<com.google.api.services.bigquery.model.TableRow>> jsonToTableRow()
return new JsonToTableRow();
private static class JsonToTableRow
extends PTransform<PCollection<String>, PCollection<com.google.api.services.bigquery.model.TableRow>>
@Override
public PCollection<com.google.api.services.bigquery.model.TableRow> expand(PCollection<String> stringPCollection)
return stringPCollection.apply("JsonToTableRow", MapElements.<String, com.google.api.services.bigquery.model.TableRow>via(
new SimpleFunction<String, com.google.api.services.bigquery.model.TableRow>()
@Override
public com.google.api.services.bigquery.model.TableRow apply(String json)
try
InputStream inputStream = new ByteArrayInputStream(
json.getBytes(StandardCharsets.UTF_8.name()));
//OUTER is used here to prevent EOF exception
return TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER);
catch (IOException e)
throw new RuntimeException("Unable to parse input", e);
));
public static void main(String[] args) throws Exception
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(PubsubIO.readStrings().withTimestampAttribute("timestamp").fromTopic("projects/pureapp-199410/topics/clicks"))
.apply(jsonToTableRow())
.apply("WriteToBQ",
BigQueryIO.writeTableRows()
.withMethod(FILE_LOADS)
.withWriteDisposition(WRITE_APPEND)
.withCreateDisposition(CREATE_IF_NEEDED)
.withTriggeringFrequency(Duration.standardSeconds(BATCH_INTERVAL_SECS))
.withoutValidation()
.to(new DynamicDestinations<TableRow, String>()
@Override
public String getDestination(ValueInSingleWindow<TableRow> element)
String tableName = "campaign_id"; // JSON message in Pub/Sub has "campaign_id" field, how do I access it here?
String datasetName = "user_id"; // JSON message in Pub/Sub has "user_id" field, how do I access it here?
Instant eventTimestamp = element.getTimestamp();
String partition = new SimpleDateFormat("yyyyMMdd").format(eventTimestamp);
return String.format("%s:%s.%s$%s", PROJECT, datasetName, tableName, partition);
@Override
public TableDestination getTable(String table)
return new TableDestination(table, null);
@Override
public TableSchema getSchema(String destination)
return getTableSchema();
));
pipeline.run();
我根据阅读得到了上面的代码:
1.https://medium.com/myheritage-engineering/kafka-to-bigquery-load-a-guide-for-streaming-billions-of-daily-events-cbbf31f4b737
2.https://shinesolutions.com/2017/12/05/fun-with-serializable-functions-and-dynamic-destinations-in-cloud-dataflow/
3.https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.html
4.BigQueryIO - Write performance with streaming and FILE_LOADS
5.Inserting into BigQuery via load jobs (not streaming)
更新
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND;
public class ClickLogConsumer
private static final int BATCH_INTERVAL_SECS = 15 * 60;
private static final String PROJECT = "pure-app";
public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow()
return new JsonToTableRow();
private static class JsonToTableRow
extends PTransform<PCollection<String>, PCollection<TableRow>>
@Override
public PCollection<TableRow> expand(PCollection<String> stringPCollection)
return stringPCollection.apply("JsonToTableRow", MapElements.<String, com.google.api.services.bigquery.model.TableRow>via(
new SimpleFunction<String, TableRow>()
@Override
public TableRow apply(String json)
try
InputStream inputStream = new ByteArrayInputStream(
json.getBytes(StandardCharsets.UTF_8.name()));
//OUTER is used here to prevent EOF exception
return TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER);
catch (IOException e)
throw new RuntimeException("Unable to parse input", e);
));
public static void main(String[] args) throws Exception
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(PubsubIO.readStrings().withTimestampAttribute("timestamp").fromTopic("projects/pureapp-199410/topics/clicks"))
.apply(jsonToTableRow())
.apply(BigQueryIO.write()
.withTriggeringFrequency(Duration.standardSeconds(BATCH_INTERVAL_SECS))
.withMethod(FILE_LOADS)
.withWriteDisposition(WRITE_APPEND)
.withCreateDisposition(CREATE_IF_NEEDED)
.withSchema(new TableSchema().setFields(
ImmutableList.of(
new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
new TableFieldSchema().setName("exchange").setType("STRING"))))
.to((row) ->
String datasetName = row.getValue().get("user_id").toString();
String tableName = row.getValue().get("campaign_id").toString();
return new TableDestination(String.format("%s:%s.%s", PROJECT, datasetName, tableName), "Some destination");
)
.withTimePartitioning(new TimePartitioning().setField("timestamp")));
pipeline.run();
【问题讨论】:
【参考方案1】:怎么样:String tableName = element.getValue().get("campaign_id").toString()
和数据集也是如此。
此外,为了插入按时间分区的表,我强烈建议使用 BigQuery 的基于列的分区,而不是在表名中使用分区装饰器。请参阅the javadoc 中的“将历史数据加载到时间分区的 BigQuery 表中” - 您需要一个时间戳列。 (请注意,javadoc 有一个错字:“time”与“timestamp”)
【讨论】:
以上是关于在 BigQuery Apache Beam 中访问 TableRow 列的主要内容,如果未能解决你的问题,请参考以下文章
在 BigQuery Apache Beam 中访问 TableRow 列
使用 apache beam Json Time Partitioning 在 bigquery 中创建时间分区表
使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS