pubSubToBq 从消息中获取属性
Posted
技术标签:
【中文标题】pubSubToBq 从消息中获取属性【英文标题】:pubSubToBq getAttribute from message 【发布时间】:2021-06-11 07:51:29 【问题描述】:我想从 pubsub 消息中提取属性并将其用作 BQ 目标表名称的一部分。 这是我处理每个 pubsub 消息:
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>>
@ProcessElement
public void processElement(ProcessContext context)
PubsubMessage message = context.element();
assert message != null;
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
这是我将所有消息转换为 TableRow 以插入 BQ:
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple>
private final Options options;
PubsubMessageToTableRow(Options options)
this.options = options;
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input)
PCollectionTuple jsonToTableRowOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
最后一个过程,我将所有的 TableRow 上传到 BQ:
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
WriteResult writeResult =
convertedTableRows.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(**SHOULD BE MESSAGE ATTRIBUTE HERE**));
【问题讨论】:
【参考方案1】:您可以在 BigQueryIO to
字段中使用 DynamicDestinations。
【讨论】:
以上是关于pubSubToBq 从消息中获取属性的主要内容,如果未能解决你的问题,请参考以下文章
如何从Typescript中以#或@开头的属性获取JSON数据