flink - 创建临时表时出错
Posted
技术标签:
【中文标题】flink - 创建临时表时出错【英文标题】:flink - An error occurred creating the temp table 【发布时间】:2016-10-01 09:56:40 【问题描述】:团队, 我在 flink 中收到以下错误。
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.count(DataSet.java:394)
at org.apache.flink.graph.Graph.numberOfVertices(Graph.java:1256)
at com.phoenix.etl.job.AdJob.constructFlinkPlan(AdJob.java:330)
at com.phoenix.etl.job.AdJob.execute(AdJob.java:97)
at com.phoenix.etl.job.BatchJob.run(BatchJob.java:38)
at com.phoenix.etl.job.AdJob.main(AdJob.java:90)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: An error occurred creating the temp table.
at org.apache.flink.runtime.operators.TempBarrier.getIterator(TempBarrier.java:98)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver.run(JoinWithSolutionSetFirstDriver.java:155)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Cannot instantiate tuple.
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:70)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:30)
at org.apache.flink.runtime.operators.TempBarrier$TempWritingThread.run(TempBarrier.java:176)
Caused by: java.lang.NullPointerException
at java.util.Objects.requireNonNull(Objects.java:203)
at org.apache.flink.types.Either$Right.<init>(Either.java:148)
at org.apache.flink.types.Either.Right(Either.java:46)
at org.apache.flink.api.java.typeutils.runtime.EitherSerializer.createInstance(EitherSerializer.java:73)
at org.apache.flink.api.java.typeutils.runtime.EitherSerializer.createInstance(EitherSerializer.java:37)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:64)
... 2 more
Flink 版本 - 1.1.0
请您对此有所了解。我在进行以顶点为中心的图遍历时得到了这个。
顶点是 Tuple2 的 DataSet 并且 edge 是 Tuple3 的 DataSet
对于顶点和边,我可以使用 collect 打印值。
问候, 萨吉夫
【问题讨论】:
如果您发布一些代码 sn-ps,则更容易弄清楚您在做什么以及导致问题的原因。没有代码,这几乎是疯狂的猜测。 我现在可以找出问题所在。当我使用下面的代码时,它抛出了上述错误。 public void compute(Vertex对你来说可能为时已晚,但对其他与 flink 1.2 有同样问题的人有帮助,我刚刚找到了这个帖子,因为我遇到了同样的问题,你的 cmets 帮助了我
您似乎无法使用接口,compute(Vertex<String, VertexData> vertex, MessageIterator<Collection<MessageData>> messages)
或 compute(Vertex<String, VertexData> vertex, MessageIterator<List<MessageData>> messages)
出现同样的错误
但它适用于compute(Vertex<String, VertexData> vertex, MessageIterator<LinkedList<MessageData>> messages)
【讨论】:
以上是关于flink - 创建临时表时出错的主要内容,如果未能解决你的问题,请参考以下文章
ORACLE:在 dbms_redefinition 后删除临时表时出错