JAVA - Apache BEAM- GCP:GroupByKey 与 Direct Runner 一起工作正常,但与 Dataflow runner 一起失败
Posted
技术标签:
【中文标题】JAVA - Apache BEAM- GCP:GroupByKey 与 Direct Runner 一起工作正常,但与 Dataflow runner 一起失败【英文标题】:JAVA - Apache BEAM- GCP: GroupByKey works fine with Direct Runner but fails with Dataflow runner 【发布时间】:2020-06-23 15:59:29 【问题描述】:我使用 Dataflow 运行器测试了我的代码,但它返回错误:
> Error message from worker: java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException:
> com.fasterxml.jackson.core.JsonParseException: Unrecognized token
> 'WindowReiterable[ ]
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.beam.sdk.util.UserCodeException:
> com.fasterxml.jackson.core.JsonParseException: Unrecognized token
> 'WindowReiterable': was expecting ('true', 'false' or 'null') at
> [Source: (String)"WindowReiterable []
请注意,我在 Direct Runner 中使用了相同的代码,它工作得很好。 有没有人遇到过这个问题?如果是这样,你能告诉我如何解决吗? 或者我应该用另一个函数替换 GroupByKey ...?
代码如下:
PCollection<KV<String, Iterable<String>>> KVElements =
pipeline.apply("Reads the input fixed-width file", TextIO
.read()
.from(options.getPolledFile())).apply("Converts to KV elements, ParDo.of(new DoFn<String, String>()
@ProcessElement
public void processElement(ProcessContext c)
String element = c.element();
String[] columns = (“key;col1;col2;col3”).split(";");
String[] values = element.split(";");
ObjectNode rowToJson = jsonParser.createObjectNode();
for (int i = 0; i < columns.length; i++)
rowToJson.put(columns[i], values[i].trim());
c.output(KV.of(rowToJson.get(“key”).asText(), rowToJson.toString()));
));
PCollection <KV<String, Iterable<String>>> joinedCollection = KVElements.apply(GroupByKey.create());
PCollection <String> joined = (PCollection<String>) joinedCollection.apply("Converts to json string", ParDo.of(new DoFn<KV<String, Iterable<String>>, String>()
@ProcessElement
public void processElement(ProcessContext c) throws IOException
KV<String, Iterable<String>> element = c.element();
JsonNode parsed = jsonParser.readTree(String.valueOf(element.getValue()));
final ObjectMapper mapper = new ObjectMapper();
ObjectNode KVJson = mapper.createObjectNode();
String value = null;
for (int i =0; i<parsed.size();i++)
KVJson.put("col1",parsed.get(i).get("col1"));
KVJson.put("col2",parsed.get(i).get("col2"));
KVJson.put("col3",parsed.get(i).get("col3"));
c.output(KVJson.toString());
));
Apache Beam 版本:2.17.0
【问题讨论】:
你能把你的完整日志错误吗?也许真正的错误是另一个错误。 另外,您能分享一下您正在使用的 apache 梁以及用于执行管道的命令吗? @ebeltrans 我正在使用谷歌云数据流 你能把你的代码和版本的 Apache 梁吗? @sirandy 我添加了代码和版本 【参考方案1】:看起来 ParDo 的定义不正确。在代码sn -p
"Converts to KV elements, ParDo.of(new DoFn<String, String>
应更改以匹配作为输出生成的 KV 结果,如下所示
"Converts to KV elements, ParDo.of(new DoFn<String, KV<String, Iterable<String>>>
【讨论】:
以上是关于JAVA - Apache BEAM- GCP:GroupByKey 与 Direct Runner 一起工作正常,但与 Dataflow runner 一起失败的主要内容,如果未能解决你的问题,请参考以下文章
请求的身份验证范围不足 - GCP 上的 Dataflow/Apache Beam
使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表
GCP Dataflow + Apache Beam - 缓存问题
从 Apache Beam(GCP 数据流)写入 ConfluentCloud