Akka & Java - 遇到死信

Posted

技术标签:

【中文标题】Akka & Java - 遇到死信【英文标题】:Akka & Java - dead letters encountered 【发布时间】:2017-05-10 14:54:41 【问题描述】:

我正在尝试创建一个 Akka Actors 系统,其中每个演员都可以创建几个新的子演员。 例如:演员 A 读取一个 200K 行的输入文件,每行将把该文本行分配给演员 B。根据某些业务逻辑,演员 B 将创建演员 C,依此类推。

在调度 Actor B 时,我遇到了不一致的行为,有时我收到以下错误:

Message [java.lang.String] without sender to Actor[akka://My-Akka/user/$a/$b#304462457] was not delivered. [1] dead letters encountered. This logging can be turned off

错误可以在读取(并调度 100 行或 1000 行)后开始。每次运行都不一样。

课程从一些大师班开始:

ActorSystem system = ActorSystem.create("My-Akka");
ActorRef actorA= system.actorOf(Props.create(ActorA.class));    
actorA.tell("some text", ActorRef.noSender());

在 Actor A 中,我正在创建一个路由器:

    Router router;

    List<Routee> routees = new ArrayList<Routee>();
    for (int i = 0; i < 5; i++) 
        ActorRef r = getContext().actorOf(Props.create(ActorB.class));
        getContext().watch(r);
        routees.add(new ActorRefRoutee(r));
    
    router = new Router(new RoundRobinRoutingLogic(), routees);

在 Actor A 的 createReceive 函数中:

    @Override
public Receive createReceive() 
    return receiveBuilder()
            .match(String.class, message -> 
                        ....
                        String line;
                        while ((line = br.readLine()) != null) 
                            router.route(line, getSender());
                        
                    
                

            )
            .match(Terminated.class, message -> 
                router = router.removeRoutee(message.actor());
                ActorRef r = getContext().actorOf(Props.create(ActorB.class));
                getContext().watch(r);
                router = router.addRoutee(new ActorRefRoutee(r));
            )
            .build();

还有 Actor B 的 createReceive 函数:

    @Override
public Receive createReceive() 
    return receiveBuilder()
            .match(String.class, s -> 
                System.out.println("got message " + s);
            )
            .matchAny(o -> logger.info("received unknown message"))
            .build();

我想我在这里遗漏了一些东西。为什么看起来每次演员 A 都能够在到达死信之前向演员 B 发送不同数量的消息。 是不是因为主程序在所有演员都完成工作之前就结束了?如果是这种情况,需要改变什么?

谢谢

【问题讨论】:

【参考方案1】:

似乎问题是由于在 Junit 上下文中运行程序导致在特定时间关闭系统并杀死参与者。

一旦我在常规模式下运行程序,问题就消失了。

【讨论】:

以上是关于Akka & Java - 遇到死信的主要内容,如果未能解决你的问题,请参考以下文章

Akka Scala 演员中的死信

在线程“ main”中获取Akka流代码时出错,异常java.lang.NoClassDefFoundError:scala / Function1 $ class

SpringBoot整合RabbitMQ实现死信队列

使用Flink时遇到的坑

利用Akka并行执行SparkSQL任务

如何理解 AKKA 中使用的这种 CCAS 锁定机制?