将 CoGroupByKey 与自定义类型一起使用会导致 Coder 错误
Posted
技术标签:
【中文标题】将 CoGroupByKey 与自定义类型一起使用会导致 Coder 错误【英文标题】:Using CoGroupByKey with custom type ends up in a Coder error 【发布时间】:2018-03-07 07:45:44 【问题描述】:我想加入两个 PCollection(分别来自不同的输入)并按照此处描述的步骤“使用 CoGroupByKey 加入”部分来实现: https://cloud.google.com/dataflow/model/group-by-key
就我而言,我想加入 GeoIP 的“区块”信息和“位置”信息。所以我将 Block 和 Location 定义为一个自定义类,然后写如下:
final TupleTag<Block> t1 = new TupleTag<Block>();
final TupleTag<Location> t2 = new TupleTag<Location>();
PCollection<KV<Long, CoGbkResult>> coGbkResultColl = KeyedPCollectionTuple.of(t1, kvGeoNameIDBlock)
.and(t2, kvGeoNameIDLocation).apply(CoGroupByKey.<Long>create());
键具有 Long 类型的值。我以为已经完成了,但是当我运行mvn compile
时,它会输出以下错误:
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on project xxxx: An exception occured while executing the Java class. null: InvocationTargetException: Unable to return a default Coder for Extract GeoNameID-Block KV/ParMultiDo(ExtractGeoNameIDBlock).out0 [PCollection]. Correct one of the following root causes:
[ERROR] No Coder has been manually specified; you may do so using .setCoder().
[ERROR] Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<java.lang.Long, com.xxx.platform.geoip2.Block>: Unable to provide a Coder for com.xxx.platform.geoip2.Block.
[ERROR] Building a Coder using a registered CoderProvider failed.
[ERROR] See suppressed exceptions for detailed failures.
[ERROR] Using the default output Coder from the producing PTransform failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<java.lang.Long, com.xxx.platform.geoip2.Block>: Unable to provide a Coder for com.xxx.platform.geoip2.Block.
输出错误的确切 DoFn 是 ExtractGeoNameIDBlock
,它只是创建其键(待连接)和自身的键值对。
// ExtractGeoNameIDBlock creates KV collection while reading from block CSV
static class ExtractGeoNameIDBlock extends DoFn<String, KV<Long, Block>>
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception
String line = c.element();
if (!line.startsWith("network,")) // exclude headerline
Block b = new Block();
b.loadFromCsvLine(line);
if (b.getGeonameId() != null)
c.output(KV.of(b.getGeonameId(), b));
loadFromCsvLine
只需解析 CSV 行,将字段转换为每个对应的类型并分配给其私有字段。
所以看起来我需要为我的自定义类设置一些编码器才能使其工作。 我找到了一个引用编码器的文档,但仍然不确定如何实现我的。 https://cloud.google.com/dataflow/model/data-encoding
有没有什么真实的例子可以用来为我的自定义类创建自定义编码器?
[2017 年 9 月 26 日 13:02 更新] 我加了
CoderRegistry cr = p.getCoderRegistry();
cr.registerCoderForClass(Block.class, AvroCoder.of(Block.class));
然后报错了
java.lang.NullPointerException: in com.xxx.platform.geoip2.Block in long null of long in field representedCountryGeonameId of com.xxx.platform.geoip2.Block
[2017 年 9 月 26 日 14:05 更新] 我改变了这样的实现:
@DefaultCoder(AvroCoder.class)
public class Block
private static final Logger LOG = LoggerFactory.getLogger(Block.class);
@Nullable
public String network;
@Nullable
public Long registeredCountryGeonameId;
:
:
(将@Nullable 设置为所有属性)
但仍然出现此错误:
(22eeaf3dfb26f8cc): java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null Long
at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:191)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn.processElement(CoGroupByKey.java:185)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null Long
at org.apache.beam.sdk.coders.VarLongCoder.encode(VarLongCoder.java:51)
at org.apache.beam.sdk.coders.VarLongCoder.encode(VarLongCoder.java:35)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
at com.google.cloud.dataflow.worker.ShuffleSink$ShuffleSinkWriter.encodeToChunk(ShuffleSink.java:320)
at com.google.cloud.dataflow.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:216)
at com.google.cloud.dataflow.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:178)
at com.google.cloud.dataflow.worker.util.common.worker.WriteOperation.process(WriteOperation.java:80)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.worker.ReifyTimestampAndWindowsParDoFnFactory$ReifyTimestampAndWindowsParDoFn.processElement(ReifyTimestampAndWindowsParDoFnFactory.java:68)
at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn.processElement(CoGroupByKey.java:185)
at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233)
at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
at com.bandainamcoent.platform.GeoIpPopulateTable$ExtractGeoNameIDBlock.processElement(GeoIpPopulateTable.java:79)
at com.bandainamcoent.platform.GeoIpPopulateTable$ExtractGeoNameIDBlock$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233)
at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:187)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336)
at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294)
at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
谢谢。
【问题讨论】:
你能分享更多的管道吗?该错误表明问题出在名为“ExtractGeoNameIDBlock”的 ParDo 中,而不是 CoGroupByKey。 谢谢@BenChambers,我添加了代码。但我认为无论如何,当我在 DoFn 中使用自定义类时,我需要将 Coder 添加到我的自定义类中,因为在管道的某些过程中,它可能会将数据输出到文件中,因此需要对其进行编码/解码。我的理解正确吗? 【参考方案1】:您的自定义类 Block
似乎没有指定编码器。您可以创建自己的Coder
,或使用通用的AvroCoder
之一。您还应该使用CoderRegistry
注册它,以便管道知道如何编码Block
s。
【讨论】:
感谢您的帖子!我更新了我的问题。在我指定 AvroCoder 后,它会为某些可能为 NULL 的字段输出 NullPointerException。有没有办法明确告诉 AvroCoder 某些字段可以为 NULL? 啊..也许这是相关的。 ***.com/a/33443609/2543803让我先试试这个。 嗨,我可以使用 AvroCoder 成功运行我的管道!非常感谢您的帮助:-)【参考方案2】:我终于在 2017 年 9 月 26 日 14:05 更新中使用 AvroCoder + Nullable 注释实现了它 在我的问题中。
我看到的最后一个错误只是因为我的数据实际上有一个我没想到的空值。在我的 Java 代码中处理空值后,一切正常。
我认为关于另一个问题的这篇文章对这个问题非常有用: https://***.com/a/32342403/2543803
【讨论】:
以上是关于将 CoGroupByKey 与自定义类型一起使用会导致 Coder 错误的主要内容,如果未能解决你的问题,请参考以下文章
将 IdentityServer4 与自定义配置 DBContext 一起使用