apache Beam bigtable 可迭代突变

Posted

技术标签:

【中文标题】apache Beam bigtable 可迭代突变【英文标题】:apache beam bigtable Iterable mutation 【发布时间】:2017-11-27 15:09:39 【问题描述】:

我正在将我的谷歌数据流 java 1.9 迁移到梁 2.0,并且我正在尝试使用 BigtableIO.Write

    ....
.apply("", BigtableIO.write()
                .withBigtableOptions(bigtableOptions)
                .withTableId("twoSecondVitals"));

在 BigtableIO 之前的 ParDo 中,我正在努力尝试制作 Iterable。

          try
        Mutation mutation = Mutation.parseFrom(new ObjectMapper().writeValueAsBytes(v));
        Mutation mu[] = mutation;
        Iterable<Mutation> imu = Arrays.asList(mu);
        log.severe("imu");
        c.output(KV.of(ByteString.copyFromUtf8(rowKey+"_"+v.getEpoch()), imu));
      catch (Exception e)
        log.severe(rowKey+"_"+v.getEpoch()+" error:"+e.getMessage());
      

上面的代码抛出如下异常 InvalidProtocolBufferException:协议消息结束组标记与预期标记不匹配

v 是一个对象列表(Vitals.class)。 hbase api 使用 Put 方法来创建突变。如何创建可与 BigtableIO 接收器一起使用的 BigTable 突变?

【问题讨论】:

想通了。 您可以将您的解决方案添加为答案,以便清楚您是如何解决的 - 并在将来帮助其他人。 【参考方案1】:

通过查看 sdk 的测试,我能够找到答案。

            Iterable<Mutation> mutations =
                ImmutableList.of(Mutation.newBuilder()
                .setSetCell(
                        Mutation.SetCell.newBuilder()
                        .setValue(ByteString.copyFrom(new ObjectMapper().writeValueAsBytes(v)))
                        .setFamilyName("vitals")
                ).build());

【讨论】:

以上是关于apache Beam bigtable 可迭代突变的主要内容,如果未能解决你的问题,请参考以下文章

Bigtable IO 连接器是不是有 Apache Beam DynamicDestinations?

Apache Beam CloudBigtableIO 读/写错误处理

数据流和 Bigtable 依赖冲突

Apache Beam -- 简介

Apache Beam 剖析

从 Apache Beam 管道收集输出并将其显示到控制台