pubSubToBq 从消息中获取属性

Posted

技术标签:

【中文标题】pubSubToBq 从消息中获取属性【英文标题】:pubSubToBq getAttribute from message 【发布时间】:2021-06-11 07:51:29 【问题描述】:

我想从 pubsub 消息中提取属性并将其用作 BQ 目标表名称的一部分。 这是我处理每个 pubsub 消息:

  static class PubsubMessageToFailsafeElementFn
      extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> 
    @ProcessElement
    public void processElement(ProcessContext context) 
      PubsubMessage message = context.element();
      assert message != null;
      context.output(
          FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
    
  

这是我将所有消息转换为 TableRow 以插入 BQ:

  static class PubsubMessageToTableRow
      extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> 

    private final Options options;

    PubsubMessageToTableRow(Options options) 
      this.options = options;
    

    @Override
    public PCollectionTuple expand(PCollection<PubsubMessage> input) 

      PCollectionTuple jsonToTableRowOut =
          input
              // Map the incoming messages into FailsafeElements so we can recover from failures
              // across multiple transforms.

              .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
              .apply(
                      "JsonToTableRow",
                      FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
                              .setSuccessTag(TRANSFORM_OUT)
                              .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                              .build());

      // Re-wrap the PCollections so we can return a single PCollectionTuple
      return PCollectionTuple.of(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
              .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));

最后一个过程,我将所有的 TableRow 上传到 BQ:

    PCollectionTuple convertedTableRows =
        messages
            /*
             * Step #2: Transform the PubsubMessages into TableRows
             */
            .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));

    WriteResult writeResult =
        convertedTableRows.get(TRANSFORM_OUT)
            .apply(
                "WriteSuccessfulRecords",
                BigQueryIO.writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .to(**SHOULD BE MESSAGE ATTRIBUTE HERE**));

【问题讨论】:

【参考方案1】:

您可以在 BigQueryIO to 字段中使用 DynamicDestinations。

【讨论】:

以上是关于pubSubToBq 从消息中获取属性的主要内容,如果未能解决你的问题,请参考以下文章

如何从Typescript中以#或@开头的属性获取JSON数据

从 Visual Studio 属性窗口中删除方法

在流分析中获取消息对象属性

突然从实体框架中获取异常

消息:试图在 Codeigniter 3 中获取非对象的属性

如何解决消息:试图在 Codeigniter 中获取非对象错误的属性?