ConcurrentHashMap:按条件删除

Posted

技术标签:

【中文标题】ConcurrentHashMap:按条件删除【英文标题】:ConcurrentHashMap: remove on condition 【发布时间】:2019-04-23 16:36:21 【问题描述】:

我有一个 ConcurrentHashMap 用作内存存储(或缓存,你可能会说)

我想要实现的是:同时检查一个项目是否“准备好”,如果是,则将其从地图中删除(+将其返回给调用者)。没有直接的方法可以让我做到这一点。

我想出的唯一解决方案是有一个ItemContainer,它将包含项目和元数据(isReady 字段)。在每次访问时,我都必须应用 mergecompute 操作。基本上在每次访问/检查时替换对象的容器。

问题:

    我的解决方案看起来合理吗? 有没有实现类似功能的优秀库?

我按要求添加了“样板”代码:

public class Main 

    public static void main(String[] args) 
        Storage storage = new Storage();
        storage.put("1", new Record("s", 100));
        storage.put("2", new Record("s", 4));
        storage.removeIf("1", Main::isReady);
    

    public static boolean isReady(Record record) 
        return record.i > 42;
    

    public static class Record 

        public Record(String s, Integer i) 
            this.s = s;
            this.i = i;
        

        String s;
        Integer i;
    

    public static class Storage 
        ConcurrentHashMap<String, Record> storage = new ConcurrentHashMap<>();

        public void put(String key, Record record) 
            storage.put(key, record);
        

        public Record removeIf(String key, Function<Record, Boolean> condition) 
            return null; // TODO: implement
        
    

其他解决方案(需要权衡):

    始终检查remove(),然后将merge() 返回地图。 使用具有一些合理的项目疏散策略(即 LRU)的缓存并仅检查已疏散的项目。

基于@ernest_k 解决方案:

public Record removeIf(String key, Predicate<Record> condition) 
    AtomicReference<Record> existing = new AtomicReference<>();

    this.storage.computeIfPresent(key, (k, v) -> 
        boolean conditionSatisfied = condition.test(v);

        if (conditionSatisfied) 
            existing.set(v);
            return null;
         else 
            existing.set(null);
            return v;
        
    );

    return existing.get();

【问题讨论】:

描述并没有给我们你当前代码的想法。请张贴代码。 @ernest_k,我真的相信描述足够详细,可以回答我的问题 + 我还没有真正的工作代码。您希望我详细说明某些事情吗? 我可以准备一些代码 “同时检查一个项目是否准备就绪” - 项目Map的键还是值? item 是否必须在它准备好 后尽快从Map 中删除?如果没有,您能否有一些任务在整个地图上进行同步、迭代并删除所有准备就绪items 也许你可以看看 jcache javadoc,见static.javadoc.io/javax.cache/cache-api/1.0.0/javax/cache/… - (你可以使用 registerCacheEntryListener 将监听器添加到缓存中),例如。 Ehcache 是一个 jcache 实现 【参考方案1】:

ConcurrentHashMap 已经为您提供了computeIfPresent 的原子性保证。

如果指定键的值存在,则尝试计算给定键及其当前映射值的新映射。整个方法调用以原子方式执行。在计算过程中,其他线程对该映射的某些尝试更新操作可能会被阻塞,因此计算应该简短而简单,并且不得尝试更新此映射的任何其他映射。

所以你可以使用它:

public Record removeIf(String key, Predicate<Record> condition) 

    AtomicReference<Record> existing = new AtomicReference<>();

    this.storage.computeIfPresent(key, (k, v) -> 
        existing.set(v);
        return condition.test(v) ? null : v;
    );

    return existing.get();

请注意,我使用了Predicate&lt;Record&gt;,因为它应该优于Function&lt;Record, Boolean&gt;

这里将当前值存储在AtomicReference 中的原因是为了确保返回的值与测试谓词所针对的值相同(否则可能会出现竞争条件)。

【讨论】:

null 在一种情况下返回:key 尚未映射到 storagecomputeIfAbsent 根本不会运行)。如果key已经在storage中,则无论是否满足条件都返回当前值(这部分很容易改变)。 @Delark 你说得对,每个线程都有自己的existing 对象。但这并不能使他们免受潜在的竞争条件的影响。如果existing.set(v)condition.test(v) 以非原子方式调用(@​​987654336@ 后跟put/replace),使用相同键运行的两个线程可能返回不一致的结果(因为第二个线程可能运行@ 987654339@ 在第一个更改了storage 中的映射之后)。 AtomicReference 是一种解决方法,仅用于修改 lambda 表达式中的局部变量,仅此而已。 你是对的,我的错误假设(我误读了你的句子)是因为潜在的竞争条件,你正在使用 AtomicReference 持有者,实际上你的论点是“(不阅读)当前映射的值在同一个块中”是潜在竞争条件的罪魁祸首,这是我完全同意的一个论点,尽管您使用 AtomicReference 是为了防止竞争条件,但实际上它比初始化通用数组更容易包装(甚至可能更便宜(?)...但我不确定)然后我们将结果分配给索引 0。 ... 我认为同样有效的是使用简单的通用持有者,它甚至不需要是 volatile 和同步的,因为读取是以原子方式执行的(在putIfPresent()) 强制延迟下一行 (return existing.get();) 直到事务完成...也许该值应该是 volatile 的,但写入不需要同步,因为它(持有者)受制于地图的内锁。有趣的是,Effective Java - item 79 明确... @Delark 感谢您的评论。我没有给AtomicReference 多想。我想有更简单或更优化的选择。如果您确定其中一个(例如单例数组或其他持有者),当然欢迎您编辑答案。【参考方案2】:

使用AtomicReference 的answer 本质上是对“为什么lambdas 中使用的变量需要有效地最终”问题的一种变通方法。对于这个问题,这是一个不错的解决方法,但是AtomicReference 会有一点开销。另一种选择是使用数组引用,尽管我知道这是不受欢迎的(但是,我不明白为什么在这种情况下它会产生意想不到的副作用?)

唉,这是另一个。我认为一个循规蹈矩的解决方案是这样的:

public class MyMapClass<K,V>         
    private final ConcurrentMap<K,V> storage = new ConcurrentHashMap<>();
    
    public V removeIf(K key, final Predicate<V> condition) 
        BiFunctionForConditionalRemove bf = new BiFunctionForConditionalRemove(condition);
        storage.computeIfPresent(key, bf);
        return bf.existingValue;
    
        
    private class BiFunctionForConditionalRemove implements BiFunction<K, V, V> 
        private V existingValue;
        private final Predicate<V> condition;

        public BiFunctionForConditionalRemove(Predicate<V> condition) 
            this.condition = condition;
        

        @Override
        public V apply(K existingKey, V existingValue) 
            this.existingValue = existingValue;
            return condition.test(existingValue) ? null : existingValue;
        
    

这肯定要冗长得多,但我认为这比每次调用都使用临时的AtomicReference 快一点。 (只是一个疯狂的说法,我没有测量它)。假设为每次调用使用一次性的 BiFunction 实例比为每次调用使用一次性的 AtomicReference 更快。事实上,与使用 Lambda 相比,我认为使用 BiFunction 的专用版本不会增加开销。 JVM 也一样。

旁注:我不喜欢使用 using-computeIfPresent-trick 进行条件删除的一件事是,我无法在 ConcurrentHashMap 的源代码中找到 JDK 开发人员已经预料到这种情况的证据,尽管它会起作用的。如果您的大多数用例没有达到条件(意味着没有删除),那么ConcurrentHashMap 仍然会在地图中执行(不必要的)替换操作。它似乎没有检查newVal == oldVal 是否会盲目地继续操作。这意味着即使在不需要的地方,您也会在地图上锁定争用。我希望我在这方面是错的。如果没有,我研究过的 JDK8 肯定还有改进的空间。

【讨论】:

以上是关于ConcurrentHashMap:按条件删除的主要内容,如果未能解决你的问题,请参考以下文章

Java集合---ConcurrentHashMap原理分析

R语言怎么按条件删除某些行?

Java从ConcurrentHashMap中删除特定项

按条件删除行[重复]

按where条件删除组中的记录?

python 按条件删除行