线程同步

Posted myarticles

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程同步相关的知识,希望对你有一定的参考价值。

join()

线程加塞,它的作用是能够阻塞当前线程,等待执行了join()方法的线程执行完毕,再继续执行当前线程。

public class JoinDemo {
    public void a(Thread joinThread) {
        System.out.println("a线程执行。。。");
        joinThread.start();
        try {
            joinThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("a线程执行完毕。。。");
    }

    public void b() {
        System.out.println("加塞线程执行。。。");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("加塞线程执行完毕。。。");
    }

    public static void main(String[] args) {
        JoinDemo joinDemo = new JoinDemo();
        Thread joinThread = new Thread(joinDemo::b);
        new Thread(() -> joinDemo.a(joinThread)).start();
    }
}

join()方法源码

// 假设在主线程中执行了t.join()方法(t是一个线程对象)
// 1. 需要弄明白一件事:谁拿了这个锁? 主线程拿了这个锁
// 2. 这个锁是谁的? 是t线程的(t是一个对象,t对象的锁)
// 3. 在这个方法中,当前的对象是t
public final synchronized void join(long millis) throws InterruptedException {
        long base = System.currentTimeMillis();   // 获取当前的时间
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {   // 相当于this.isAlive(),它检测的是当前对象t线程是否处于活动状态
                wait(0);   // 执行wait(0),这个wait()是让拿到锁的线程等待,也就是主线程
            }
        } else {
            while (isAlive()) {  // 同样检测t线程是否活动
                long delay = millis - now;   // 延时多久
                if (delay <= 0) {
                    break;
                }
                wait(delay);  // 等待延时的秒数
                now = System.currentTimeMillis() - base;
            }
        }
    }

java并发工具类

本节主要讲的是juc包下的一些工具类

CountDownLatch

CountDownLatch是用于使一个线程等待其他线程各自执行完毕后再执行。

使用实例:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(10);   // 设置一个长度为10的计数器

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep((long) Math.random() * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread() + " 执行完毕");
                countDownLatch.countDown();    // 每执行完一个线程 计数器减一
            }).start();
        }
        countDownLatch.await();   // 在这里阻塞等待其他线程执行完毕
        System.out.println("主线程执行完毕");
    }
}

CyclicBarrier

这个类的作用是让一组线程互相等待,直到达到某个公共的点,才能继续往下执行。

import java.util.Random;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        // 第二个参数是一个Runnable接口,用于所有线程到达后执行,交给最后一个到达的线程执行
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> System.out.println("好,人到齐了,现在开始开会"));

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(new Random().nextInt(4000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread() + "到达了。。。");
                // 在这里执行cyclicBarrier的等待,如果这里不能执行,程序就会一直卡住
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore

Semaphore实现的东西很简单,就是控制线程的并发数量,也就是同一时间只允许指定个数的线程运行

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(10);  // 设置同一时间只允许10个线程执行
        while (true) {
            new Thread(() -> {
                // 拿到许可
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 执行代码
                System.out.println(Thread.currentThread() + " 执行了。。。");
                // 释放许可
                semaphore.release();
            }).start();
        }
    }
}

Exchanger

用于两个线程之间交换数据, 只能是两个线程,他不支持更多的线程之间互换数据。

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            System.out.println("a开始抓取数据。。。");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String res = "123456";
            // 将数据交换
            try {
                // 将自己的数据交换出去,并且拿到另一个线程交换过来的数据
                String value = exchanger.exchange(res);
                System.out.println("a比对结果:" + value.equals(res));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            System.out.println("b开始抓取数据。。。");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String res = "12345";
            // 将数据交换
            try {
                String value = exchanger.exchange(res);
                System.out.println("b比对结果:" + value.equals(res));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

在上面的代码中,第一个线程采集数据用了1秒,第二个线程采集数据用了2秒,那么按理说,在执行exchange()方法交换数据的时候只有第二个线程才能拿到数据,因为第一个线程执行exchange()方法的时候第二个线程还在采集数据,理应拿不到数据。可是结果是它们都拿到了数据,那么说明在执行exchange()方法后会进行等待,一直到两个线程都到达同一个点了才会交换数据。

以上是关于线程同步的主要内容,如果未能解决你的问题,请参考以下文章

起底多线程同步锁(iOS)

多线程编程

第十次总结 线程的异步和同步

详解C++多线程

进程线程同步异步

配置 kafka 同步刷盘