高并发多线程安全之信号量线程组守护线程线程栅栏等的分析

Posted 踩踩踩从踩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发多线程安全之信号量线程组守护线程线程栅栏等的分析相关的知识,希望对你有一定的参考价值。

高并发多线程安全之原子性问题、CAS机制及问题解决方案

多线程编程 之java内存模型(JMM)与可见性问题

前言

前两篇文章主要分析的是 线程安全中java内存模型;以及常见的线程安全三大问题:原子性、可见性、有序性问题

这篇文章主要做信号量、线程组、守护线程、线程栅栏等的分析,如何应用在实际开发过程中。

线程组

ThreadGroup的提出是为了方便线程的管理,通过它可以批量设定一组线程的属性,比如
setDaemon,设置未处理异常的处理方法,设置统一的安全策略等等;也可以通过线程组方便
的获得线程的一些信息, 尽量不要使用,会带来线程安全问题
线程组的好处:在于多个线程在一个组时,方便批量的操作,处理异常,设置守护线程。
主要是为了建立关系的,在开发过程中用到的很少,基本也不使用 

启动线程的几种方式

启动线程最终只有一种方式  :new thread().start();

Runnable只是Thread要执行的逻辑
无论是runnable.run 等  都是 thread进行创建的Callable<T>
FutureTask也是Thread要执行的逻辑,只是封装了获取结果的功能

Runnable

class Task implements Runnable{
    @Override
    public void run() {
        System.out.println("run 方法开始执行。。。");
        LockSupport.parkNanos(1000* 1000 * 1000 * 5L);
    }
}

 public static void main(String args[]){
        Runnable cmd = new Task();
        cmd.run();
        System.out.println("5s 后才输出消息");
    }
当前线程是阻塞住的,一定要thread开启新的线程

Callable方式

默认jdk中thread只会接受runnable作为任务,而callable怎么传进去,要把他封装程futuretask传进去
class CallTask implements Callable<String> {

    @Override
    public String call() throws Exception {
        LockSupport.parkNanos(1000 * 1000 *1000 * 5L);
        return "Kody";
    }
}

	CallTask cTask = new CallTask();
		FutureTask<String> ft = new FutureTask<String>(cTask);

		new Thread(ft).start();

		try {
			System.out.println("进入get方法,阻塞等待结果。。。");
			String result = ft.get();

			System.out.println(result);
		} catch (Exception e) {
			e.printStackTrace();
		}

deamon线程

定义

是指在程序运行的时候在后台提供一种通用服务的线程, 进程结束时,会杀死所有守护线程。


用户线程

android中的service后台服务,但android不一样是,内存不足会根据优先级杀死service后台服务
非守护线程就是用户线程线。

结束

没有非守护线程还在运行时,进程结束

注意

  • thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。你不能把正在运行的常规线程设置为守护线程。
  • 在Daemon线程中产生的新线程也是Daemon的。
  • 守护线程应该永远不去访问固有资源,如文件、数据库,因为它会在任何时候甚至
    在一个操作的中间发生中断。

CountDownLatch

countDownLatch是在java1.5被引入,认为是倒计时开关;

概念

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

应用场景

一般经典场景例如
  • 多线程去爬取网页地区酒店数据,有一千三百多个地区,则需要同时等待这些地区任务在多线程下爬取成功,才能结束任务。
  • 启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  static AtomicLong num = new AtomicLong(0);

    public static void main(String args[]) throws InterruptedException {

        CountDownLatch ctl = new CountDownLatch(11);

        for (int i=0; i< 10; i++){
            new Thread(){
                @Override
                public void run() {
                    for (int j=0; j< 10000000; j++){
                        num.getAndIncrement();
                    }
                    ctl.countDown();
                }
            }.start();
        }

        ctl.await();     //设置开关,设置门栓
        System.out.println(num.get());
    }

不足点

CountDownLatch是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

FutureTask

FutureTask做了什么操作得到返回值,可以写个小demo去理解一下;

首先FutureTask是可以直接放到thread中,那肯定是实现了ruannble接口的

package org.cao.learn;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.LockSupport;

public class MyFutureTask<T> implements Runnable {

	private volatile boolean isEnding = false;

	private T result;
	private Callable<T> callble;

	LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue();

	public MyFutureTask(Callable<T> callble) {
		super();
		this.callble = callble;
	}

	@Override
	public void run() {
		try {
			result = callble.call();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			isEnding = true;
		}

		while (true) {
			Thread waiter = waiters.poll();
			if (waiter == null) {
				break;
			}
			LockSupport.unpark(waiter);
		}

	}

	public T get() {
		if (!isEnding) {
			waiters.offer(Thread.currentThread());
		}

		while (!isEnding) {
			LockSupport.park();
		}
		return result;
	}

}

这是自己实现的一个futuretask,和jdk提供的futuretask还是很大区别的,jdk提供的futuretask

维护了下面的状态,

 private volatile int state;

private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

只会运行一次,futuretask运行的时候,维护的这个状态

        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;

并将线程封装为WaitNode 对象 操作

  private volatile WaitNode waiters;

Semaphore

Semaphore是一个计数信号量,常用于限制可以访问某些资源(物理或逻辑的)线程数目。是一种用来控制并发量的共享锁。

应用场景

常用作那些资源有明确访问数量限制的场景,常用于限流 。

数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量

代码实例

package org.cao.learn;

import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.LockSupport;

public class MySemaphore {
	public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(6);
		for (int i = 0; i < 1000; i++) {
			new Thread() {
				@Override
				public void run() {

					try {
						semaphore.acquire();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}

					LockSupport.parkNanos(10000 * 1000 * 1000);
					System.out.println("query.....");

					semaphore.release();
				};
			}.start();
		}
	}
}

由此得到的结果是,并发控制,只允许6个线程同时操作数据。

CyclicBarrier

循环栅栏,可以循环利用的屏障。 
例如:在进行某个活动前需要等待人全部都齐了才开始。

代码例子

 for (int i=0; i< 100; i++){
            new Thread(){
                @Override
                public void run() {
                    try {
                        //LockSupport.parkNanos(1000 * 1000 * 1000 * 10L);
                        barrier.await();    //
                        System.out.println("上到摩天轮...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
            LockSupport.parkNanos(1000 * 1000 * 1000L);
        }

以上是关于高并发多线程安全之信号量线程组守护线程线程栅栏等的分析的主要内容,如果未能解决你的问题,请参考以下文章

Thread多线程速查手册

并发编程之多线程

Java——多线程高并发系列之JUC三大辅助类(CountDownLatchCyclicBarrierSemaphore)

Java——多线程高并发系列之JUC三大辅助类(CountDownLatchCyclicBarrierSemaphore)

并发编程目录

Python并发编程之多线程