无限期地等待可能永远不会到达的消息
Posted
技术标签:
【中文标题】无限期地等待可能永远不会到达的消息【英文标题】:Waiting indefinitely for a message that may never arrive 【发布时间】:2015-12-09 21:48:43 【问题描述】:我有一个 Java 类型的 Actor,负责对可能暂时不可用的外部资源进行过滤/重试逻辑。 Actor的字段和常用方法有:
public class MyActorImpl implements MyActor
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait)
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput)
operation(opInput, DEFAULTWAIT);
// called internally
public void operation(OpInput opInput, long currentWait);
actor 有几个操作,它们或多或少都具有相同的重试/缓冲逻辑;每个操作都有自己的[op]Wait
和[op]Buffer
字段。
-
父actor调用
void operation(OpInput opInput)
上述方法调用void operation(OpInput opInput, long currentWait)
,使用DEFAULTWAIT
作为第二个参数
如果currentWait
参数不等于opWait
,则输入存储在opBuffer
,否则将输入发送到外部资源。
如果外部资源返回成功,则将opWait
设置为DEFAULTWAIT
,并通过operation(opInput)
方法将opBuffer
的内容发回。如果外部资源(或者更可能是网络)返回错误,我会更新 opWait = updateWait(opWait)
并使用 opWait
毫秒的延迟在 actor 系统调度程序上调度 operation(opInput, opWait)
。
即我正在使用actor系统调度程序来实现指数退避;我正在使用currentWait
参数来标识我正在重试的消息,并正在缓冲其他消息,直到外部资源成功处理了主要消息。
问题是,如果预定的operation(opInput, currentWait)
消息丢失,那么我将永远缓冲消息,因为currentWait == opWait
防护将失败所有其他消息。我可以使用spring-retry 之类的东西来实现指数退避,但我看不到合并操作的重试循环的方法,这意味着我可以在每个重试循环中使用一个线程(而使用演员系统的调度程序不会对系统造成更大的压力)。
我正在寻找一种更容错的方法来在参与者和外部资源之间的接口上实现缓冲和指数退避,而不必为任务分配太多资源。
【问题讨论】:
【参考方案1】:如果我对您的理解正确,如果唯一的问题是丢失预定的消息,您为什么不对特定消息使用Reliable Proxy Pattern 之类的东西,然后如果它失败了opWait = DEFAULTWAIT;
我得到了关于你的代码的一些东西,当你说public void operation(OpInput opInput)
被外部调用时,我不明白你的意思。你的意思是这个方法是在与网络交互,它使用了有时不可用的资源?
如果可以的话,我可以提出一个替代方案。据我了解,您的主要问题是您的资源有时不可用,因此您使用某种等待逻辑实现了某种队列/缓冲区,以便消息一旦再次可用就会被处理,不幸的是涉及一些可能丢失并导致无限等待的消息。我认为您可以使用带有超时的 Futures 来实现您想要的。如果未来在一定时间内没有完成,则重试,最多重试 3 次。您甚至可以根据服务器负载以及完成消息所需的时间来调整此时间。希望对您有所帮助。
【讨论】:
“外部调用”是指该方法被另一个参与者调用,而“内部调用”方法仅通过self.operation(...)
调用。并感谢您使用未来而不是预定消息的建议,这大大简化了代码以上是关于无限期地等待可能永远不会到达的消息的主要内容,如果未能解决你的问题,请参考以下文章