flink:在等待数据被管道确认时中断
Posted
技术标签:
【中文标题】flink:在等待数据被管道确认时中断【英文标题】:flink: Interrupted while waiting for data to be acknowledged by pipeline 【发布时间】:2021-06-23 06:23:45 【问题描述】:我在做 flink CDC + iceberg 的 POC。我按照这个 debezium 教程将 cdc 发送到 kafka - https://debezium.io/documentation/reference/1.4/tutorial.html。 我的 flink 工作运行良好,并将数据写入配置单元表以进行插入。但是当我向 mysql 表发起更新/删除查询时,我开始在我的 flink 作业中收到此错误。我还附上了撤回流的输出
更新查询 - UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
1> (true,1001,Sally,Thomas,sally.thomas@acme.com)
1> (true,1002,George,Bailey,gbailey@foobar.com)
1> (true,1003,Edward,Walker,ed@walker.com)
1> (true,1004,Anne,Kretchmar,annek@noanswer.org)
1> (true,1005,Sarah,Thompson,kitt@acme.com)
1> (false,1004,Anne,Kretchmar,annek@noanswer.org)
1> (true,1004,Anne Marie,Kretchmar,annek@noanswer.org)
错误堆栈跟踪
15:27:42.163 [Source: TableSourceScan(table=[[default_catalog, default_database, topic_customers]], fields=[id, first_name, last_name, email]) -> SinkConversionToTuple2 -> (Map -> Map -> IcebergStreamWriter, Sink: Print to Std. Out) (3/4)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:886) ~[hadoop-hdfs-client-2.10.1.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:749) ~[hadoop-hdfs-client-2.10.1.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:859) ~[hadoop-hdfs-client-2.10.1.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:818) ~[hadoop-hdfs-client-2.10.1.jar:?]
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[hadoop-common-2.10.1.jar:?]
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[hadoop-common-2.10.1.jar:?]
at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64) ~[iceberg-flink-runtime-0.11.0.jar:?]
这是我的代码,topic_customers
是正在监听 cdc 事件的 Kafka 动态表
Table out = tEnv.sqlQuery("select * from topic_customers");
DataStream<Tuple2<Boolean, Row>> dsRow = tEnv.toRetractStream(out, Row.class);
DataStream<Row> dsRow2 = dsRow.map((MapFunction<Tuple2<Boolean, Row>, Row>) x -> x.f1);
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier);
FlinkSink.forRow(dsRow2,TableSchema.builder()
.field("id", DataTypes.BIGINT())
.field("first_name", DataTypes.STRING())
.field("last_name", DataTypes.STRING())
.field("email", DataTypes.STRING())
.build())
.tableLoader(tableLoader)
//.overwrite(true)
.equalityFieldColumns(Collections.singletonList("id"))
.build();
【问题讨论】:
【参考方案1】:我通过迁移到 iceberg v2 规范解决了这个问题。你可以参考这个PR:https://github.com/apache/iceberg/pull/2410
【讨论】:
以上是关于flink:在等待数据被管道确认时中断的主要内容,如果未能解决你的问题,请参考以下文章