Infinispan CacheEntryCreated 没有在重新平衡时触发 primaryOnly = true?

Posted

技术标签:

【中文标题】Infinispan CacheEntryCreated 没有在重新平衡时触发 primaryOnly = true?【英文标题】:Infinispan CacheEntryCreated not firing on Rebalance for primaryOnly=true? 【发布时间】:2022-01-07 19:25:09 【问题描述】:

我们最近开始使用在嵌入式模式下运行的 Infinispan,我们目前使用的是 13.0.0 版本。我们想要的是一个集群存储,它可以保证在任何给定时间只有一个进程正在使用给定的缓存键。即使新节点加入我们的集群或旧节点离开,我们也希望这一保证成立。

我们认为可以做到这一点的方法之一是使用@Listener(primaryOnly = true, observation = Listener.Observation.POST) 设置缓存侦听器,以激活我们想要运行的代码。这种技术适用于我们缓存中第一次创建键。问题是,一旦将密钥存储为主要位置的节点离开集群,新的主要所有者节点似乎没有触发任何事件,通知它现在已经从另一个节点“接管”了这个密钥。

我的问题是,是否有任何方法可以知道某个节点已成为密钥的主要所有者,因为之前的主要所有者节点已离开集群?

为了说明问题,我做了以下项目:https://github.com/radiosphere/infinispan-test。您可以克隆它并签出标签 ***-1 。完成此操作后,请执行以下操作:

    打开三个终端窗口并运行./start-server.sh 8080./start-server.sh 8081./start-server.sh 8082。 运行./set-key.sh 8080 a 1 现在您将在其中一个终端窗口中看到Entry created 终止获得Entry created 日志的进程。 现在我本来希望在另一个窗口中看到一个事件,但我没有看到任何新事件。

【问题讨论】:

【参考方案1】:

CacheEntryCreatedEvents 仅在将密钥插入缓存时触发。将键从一个节点移动到另一个节点不会触发任何事件。

在任何情况下,节点首先接收条目作为备份所有者,然后才成为密钥的主要所有者,因此节点成为密钥的主要所有者在语义上与“创建”非常不同。

如果你听TopologyChangeEvents 并遍历本地节点之前不是主要所有者但现在是主要所有者的所有条目,你可以获得你想要的:

@Listener(sync = false)
public class TopologyUpdateListener 
    @TopologyChanged
    public void onTopologyChange(TopologyChangedEvent<?, ?> e) throws InterruptedException 
       if (!e.isPre()) 
          Cache<?, ?> cache = e.getCache();
          Address localAddress = cache.getAdvancedCache().getRpcManager().getAddress();
          Set<Integer> oldPrimarySegments =
                e.getReadConsistentHashAtStart().getPrimarySegmentsForOwner(localAddress);
          Set<Integer> newPrimarySegments =
                e.getReadConsistentHashAtEnd().getPrimarySegmentsForOwner(localAddress);
          IntSet diff = IntSets.mutableCopyFrom(newPrimarySegments);
          diff.removeAll(oldPrimarySegments);
          cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL)
               .entrySet().stream()
               .filterKeySegments(diff)
               .forEach(entry ->  ... );
       
    

同步侦听器在内部 Infinispan 线程上运行,因此它们永远不应阻塞。 stream() 方法确实会阻塞,因此为了简单起见,我将侦听器设为异步,但使用您自己的执行程序以获得更好的控制几乎总是一个好主意。

您也可以将 stream().filterSegments(diff) 替换为 localPublisher(diff) 并在不阻塞的情况下处理条目,但 CacheCollection.localPublisher() 方法是实验性的,我们计划在 14.0 中将其替换为其他内容。

【讨论】:

感谢您的快速答复。我尝试做一个实现,在大多数情况下它都有效。我发现,当一个新节点加入集群时,现有节点会按预期获得拓扑更改事件,但新节点在第一次加入时似乎没有收到任何事件。这当然会给迁移到新加入者的潜在密钥带来问题。示例可以在带有标签 ***-2 的 repo github.com/radiosphere/infinispan-test 中看到。尝试使用./start-server 8081,使用./create-session 8081 a..d 添加一些会话,然后使用./start-server 8082 另一个问题是我猜一个人会有潜在的竞争条件。假设节点 A 将密钥迁移到 B。我们无法保证节点 B 不会比 A 更快地行动并在节点 B 上关闭之前启动其会话。这只是暂时的重叠,但仍然违规(如果我的假设是正确的)。 确实,当一个新节点加入时,你只能在缓存加入集群后添加一个监听器,此时它很可能已经是某些键的主要所有者。所以你必须在启动缓存后手动调用onTopologyChange(),只有“旧主段”是空集,“新主段”来自cache.getAdvancedCache().getDistributionManager().getCacheTopology() 但是你是对的,有一个竞争条件——事实上我只是在我的例子中添加了withFlags(CACHE_MODE_LOCAL),因为在测试期间,当我启动一个新服务器时,一些键在@987654335 期间移动了@,并且 Infinispan 尝试在新的主要所有者上执行消费者(这不起作用,因为消费者不可序列化)。 恐怕你不能保证旧的主节点在新的主节点启动之前就停止监听,你需要额外的同步(例如,当一个节点成为主节点时,它会发送一个集群执行器任务到旧主节点,并且仅在旧主节点确认它已停止或可能使用集群锁后才启动会话。

以上是关于Infinispan CacheEntryCreated 没有在重新平衡时触发 primaryOnly = true?的主要内容,如果未能解决你的问题,请参考以下文章

Infinispan - 没有删除缓存的选项吗?

使用 Infinispan 和 Wildfly 配置 Hibernate Search

Infinispan/hibernate 2 级缓存更新不是事务性的?

防止Infinispan集群化

Infinispan - ISPN000476:等待请求响应超时

infinispan~介绍