Java代码竞争条件多线程?

Posted

技术标签:

【中文标题】Java代码竞争条件多线程?【英文标题】:Java code race condition multi-threaded? 【发布时间】:2017-01-13 14:43:32 【问题描述】:

我有一些代码,我想知道在多线程环境中我是否会丢失数据...

示例代码如下:

public class TestingJavaThreading 
    private final Map<String, Set<String>> data = Maps.newConcurrentMap();
    private final HttpClient client;
    private final AsyncDataProvider provider;
    private final String baseUrl;

    // This method is called first...
    public void init(String code) 
        // We initialise the set to ensure it doesn't throw a null pointer exception or something weird...
        data.put(code, Sets.newConcurrentHashSet());

        // We tell the provider we're interested in data...
        provider.subscribeToDataFrom(code);

        // This HTTP call may take long time, and we can't afford losing data, that's why we subscribed beforehand in the previous line...
        List<String> elements = client.request(baseUrl + code);

        // We add all of the new elements, meanwhile some elements may have been added by "onMessageFromProvider"
        data.get(code).addAll(elements);

        data.get(code)
            .stream()
            .map( /* some transformations here, whatever... */)
            .forEach(e -> System.out.println(e));

        // Now we've printed the merged data from "onMessageFromProvider" + the HTTP call
        // We remove the element from the map, so now we only receive data from "onMessageFromProvider"
        data.remove(code); 
    

    public void onMessageFromProvider(String code, String element) 
        final Set<String> newSet = data.computeIfPresent(code, (k, v) -> 
          v.add(element);
          return v;
        );

        if (newSet == null) 
            // Do something else...
        
    

基本上,被调用的初始方法是init。步骤如下:

    初始化 CHM 以保证它包含数据 我们有一个提供程序可以实时向我们提供有关该元素的信息,但它不提供过去的数据。当数据来自提供者时,它调用方法“onMessageFromProvider” 为了获取项目之前的数据,我们需要进行单独的 HTTP 调用,然后将来自“onMessageFromProvider”的数据与 HTTP 调用的结果合并。完成后,我们可以完全依赖“onMessageFromProvider”所做的任何事情 获得 HTTP 调用的结果后,我们将其与来自“onMessageFromProvider”的数据合并,同时应用转换,并打印生成的合并集 现在我们删除了映射键,因此我们可以完全依赖“onMessageFromProvider”所做的任何事情

这是否会导致在第 (3) 步运行时可能丢失数据?我该如何解决?为了尽可能少地依赖synchronished,我应该在哪里放置更多代码?

所以恢复,我的目标是永远不会丢失数据,我想确保我的算法能 100% 保证这一点。

很抱歉这篇冗长的帖子,希望它有意义。

更新

根据输入,我正在使用真实示例更新代码,目前如下所示:

public class Main 

  public static void main(String[] args) throws InterruptedException 
    new Main().init("X");
  

  public void init(String code) throws InterruptedException 
    subscribeToDataFrom(code);

    CompletableFuture
        .supplyAsync(getDataFromHttpRequest());
  

  private Supplier<Set<String>> getDataFromHttpRequest() 
    return () -> 
      Set<String> resultsToReturn = Sets.newHashSet();
      try 
        resultsToReturn.add("B");
        resultsToReturn.add("C");
        resultsToReturn.add("D");
        resultsToReturn.add("E");
        resultsToReturn.add("F");
        Thread.sleep(1000); // Simulate it is a slow request...
       catch (Exception ex) 

      return resultsToReturn;
    ;
  

  private void subscribeToDataFrom(String code) 
    Runnable r = () -> 
      while (true) 
        onMessageFromProvider(code, UUID.randomUUID().toString());
      
    ;

    new Thread(r).start();
    new Thread(r).start();
    new Thread(r).start();
    new Thread(r).start();
    new Thread(r).start();
  

  public void onMessageFromProvider(String code, String element) 
    // Here how do I create the completable future for usage in the previous CompletableFuture????

    final Set<String> newSet = data.computeIfPresent(code, (k, v) -> 
      v.add(element);
      return v;
    );

    if (newSet == null) 
      System.out.println("Ok, now I can do something different with: " + element);
    
  

【问题讨论】:

【参考方案1】:

CompletableFuture 类,具有组合和执行不同任务的方法

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

为非异步方法的依赖完成提供的操作可以由完成当前 CompletableFuture 的线程或完成方法的任何其他调用者执行。 所有没有显式 Executor 参数的异步方法都使用 ForkJoinPool.commonPool() 执行(除非它不支持至少两个并行级别,在这种情况下,将创建一个新线程来运行每个任务)。为了简化监控、调试和跟踪,所有生成的异步任务都是标记接口 CompletableFuture.AsynchronousCompletionTask 的实例。 所有 CompletionStage 方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖的影响。

例如:

  CompletableFuture<String> httpTask = CompletableFuture.supplyAsync(
                                 () -> new HttpTask(httpClient));

  Set<String> result = httpTask.thenApply(elements -> //onMessageProvider method       
                   // maybe you can create a Callable class with the logic)

          .thenApply(mergedElements -> //remove code).get();
          //or try another method


  class HttpTask extends Callable<List<String>>

    private HttpClient client;

    public HttpTask(HttpClient client)
        this.client = client;
    

    @Override
    public List<String> call() throws Exception 
      return client.httpCall(...);
    
   

【讨论】:

嘿!看起来和听起来都很棒,但不确定如何在我的代码中使用它,以及它有什么帮助?你是说我应该创建一个private final CompletableFuture 并为它提供东西吗?但是我仍然需要data,并且需要对其进行同步? 您介意添加一个与我更相关的示例,以便我理解吗?顺便说一句,现在阅读更多关于CompletableFuture 感谢您的编辑,但我仍然不明白这将如何避免或帮助我的比赛条件(如果有的话)? 使用 completableFuture,forkjoin 负责管理任务,所以你不会有竞争条件 好的,我将重写我的示例并展示给您。如果你告诉我它有效,那么我会坚持下去:P

以上是关于Java代码竞争条件多线程?的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程中的竞争条件锁以及同步的概念

java 如何编写多线程的代码

编写高效的Java代码:常用的优化技巧之并发编程技巧

编写高效的Java代码:常用的优化技巧之并发编程技巧

Java并发编程笔记 并发概览

Java多线程 synchronized 锁方法和块使用详解 锁竞争本质原理 只与锁对象有关与位置无关