Apache Flink TaskExecutor关闭

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink TaskExecutor关闭相关的知识,希望对你有一定的参考价值。

我是Apache Flink的新手,所以我目前正在尝试做一些实验。我正在从Kafka阅读主题,然后将其打印在控制台上。打印大约100k + kafka消息后,它将引发异常。日志输出如下。

我正在使用自定义类,该类扩展了AbstractDeserializationSchema以反序列化kafka记录值。我什至尝试在其中添加一些异常处理,但这不会被触发。

我用来从Kafka使用的代码非常简单:

public class Main {

  private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
  private static final int FLINK_PARALLELISM = 1;

  public static void main(String[] args) {
    LOGGER.info("Starting Flink Kafka Consumer");

    try {

        Properties props = new Properties();
        props.put("bootstrap.servers", Arrays.asList(
                "localhost:9092"
        ));
        props.put("group.id", "test_flink");


        StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        flinkEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        flinkEnv.setParallelism(FLINK_PARALLELISM);

        FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(), props);
        DataStream<String> kafkaStream = flinkEnv.addSource(kafkaConsumer);

        kafkaStream.print();


        flinkEnv.execute("Flink Test");
    } catch (Exception e) {
        LOGGER.error("Exception thrown: {}", e.getMessage());
    }

  }
}

即使在例外之后,输出仍然会从主题追加到文件中。 kafka主题已启动并正在运行,但是无论是否发布任何内容,MiniCluster都将停止。我无法查明问题所在。

有人能指出我正确的方向吗?谢谢

INFO [Source: Custom Source -> Sink: Print to Std. 

Out (1/1)] o.a.f.s.c.k.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test', partition=0}]
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = test_flink
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka commitId : 73be1e1168f91ee2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group test_flink.
INFO [main] o.a.f.r.m.MiniCluster - Shutting down Flink Mini Cluster
INFO [main] o.a.f.r.d.DispatcherRestEndpoint - Shutting down rest endpoint.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopping TaskExecutor akka://flink/user/taskmanager_0.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (eb1f62611a047c5da09d8fa6f4e49084) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.d.i.IOManager - I/O manager removed spill file directory /tmp/flink-io-763eca47-ab9c-4985-aa30-c7ac21442635
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.n.NetworkEnvironment - Shutting down the network environment and its components.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Try to restart or fail the job Flink Test (670e9073fbab507c41a26b5641a265eb) if no longer possible.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state FAILING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Could not restart the job Flink Test (670e9073fbab507c41a26b5641a265eb) because the restart strategy prevented it.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 670e9073fbab507c41a26b5641a265eb.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.StandaloneCompletedCheckpointStore - Shutting down
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.JobLeaderService - Stop job leader service.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_0.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ui
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Closing the SlotManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Suspending the SlotManager.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Shut down complete.
INFO [flink-akka.actor.default-dispatcher-3] o.a.f.r.j.JobMaster - Close ResourceManager connection 3a6541e469014e5685a7510403385dcb: ResourceManager leader changed to new address null.
INFO [PermanentBlobCache shutdown hook] o.a.f.r.b.PermanentBlobCache - Shutting down BLOB cache
INFO [TransientBlobCache shutdown hook] o.a.f.r.b.TransientBlobCache - Shutting down BLOB cache
INFO [BlobServer shutdown hook] o.a.f.r.b.BlobServer - Stopped BLOB server at 0.0.0.0:46065
答案

我使用1.9.0版存在相同的问题,并且似乎直接在构思上执行1.4.1版之后的版本也会遇到相同的问题。当然,这个问题不会发生在每个人身上。您解决了这个问题吗?

以上是关于Apache Flink TaskExecutor关闭的主要内容,如果未能解决你的问题,请参考以下文章

Flink 心跳服务流程

单机Flink TaskExecutor no more allocated slots 问题解决

FlinkFlink 1.12.2 源码浅析 : TaskExecutor

FlinkFlink 1.12.2 启动脚本

FlinkDiscard registration from TaskExecutor SplitFetcher received unexpected exception

Flink1.15源码解析--启动TaskManager