Java总结——通过Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java总结——通过Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件相关的知识,希望对你有一定的参考价值。

一.通过Callable接口实现多线程

1.Callable接口介绍:

1java.util.concurrent.Callable是一个泛型接口,只有一个call()方法

2call()方法抛出异常Exception异常,且返回一个指定的泛型类对象

2.Callable接口实现多线程的应用场景

1)当父线程想要获取子线程的运行结果时

3.使用Callable接口实现多线程的步骤

1)第一步:创建Callable子类的实例化对象

2)第二步:创建FutureTask对象,并将Callable对象传入FutureTask的构造方法中

(注意:FutureTask实现了Runnable接口和Future接口)

 (3)第三步:实例化Thread对象,并在构造方法中传入FurureTask对象

 (4)第四步:启动线程

1(利用Callable接口实现线程):

利用Callable接口创建子线程类:

package call;

 

import java.util.concurrent.Callable;

/*

 * 实现Callable接口创建子线程,指明范型为返回的数据类型

 * */

public class CallDemo implements Callable<String> {

 

        @Override

        public String call() throws Exception {

            String th_name = Thread.currentThread().getName();

            System.out.println(th_name + "遭遇大规模敌军突袭...");

            System.out.println(th_name + "迅速变换阵型...");

            System.out.println(th_name + "极速攻杀敌军...");

            return "敌军损失惨重,我军大获全胜";

        }

 

}

实线程类:

package call;

 

import java.util.concurrent.ExecutionException;

import java.util.concurrent.FutureTask;

 

public class TestCallable {

 

    public static void main(String[] args) {

            CallDemo cl = new CallDemo();// 实例化Callable子类对象

            FutureTask<String> ft = new FutureTask<String>(cl);// 实例化FutureTask对象,并将Callable子类对象传入FutureTask的构造方法中

            new Thread(ft, "李存孝部队——>").start();// 启动线程

            Thread.currentThread().setName("李存勖部队——>");// 设置父线程名

    try {

            System.out.println(Thread.currentThread().getName() + "休整5000ms");

            Thread.sleep(5000);

    } catch (InterruptedException e) {

            e.printStackTrace();

    }

       System.out.println(Thread.currentThread().getName() + "休整完毕..");

    try {

            String str = ft.get();// 利用FutureTask对象调用get()方法获取子线程的返回值

            System.out.println(Thread.currentThread().getName() + "获取友军消息"

            + str);

    } catch (InterruptedException | ExecutionException e) {

            e.printStackTrace();

        }

    }

 

}

运行结果:

    李存勖部队——>休整5000ms

    李存孝部队——>遭遇大规模敌军突袭...

    李存孝部队——>迅速变换阵型...

    李存孝部队——>极速攻杀敌军...

    李存勖部队——>休整完毕..

    李存勖部队——>获取友军消息敌军损失惨重,我军大获全胜

2匿名类部类实现Callable接口创建子线程):

匿名类部类实现Callable接口创建子线程类并实现:

package call;

 

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.FutureTask;

//匿名类部类实现Callable接口创建子线程

public class AnonyCallable {

 

    public static void main(String[] args) {

    Callable<String> cl = new Callable<String>() {

     

    @Override

    public String call() throws Exception {

            System.out.println(Thread.currentThread().getName() + "正在行军~~~");

            System.out.println(Thread.currentThread().getName() + "遭遇敌军~~~");

            System.out.println(Thread.currentThread().getName() + "奋勇杀敌!!!!");

            return "战斗胜利,俘虏敌军50000人";

            }

 

};

        FutureTask<String> ft = new FutureTask(cl);

        new Thread(ft, "李存孝部队").start();

try {

        Thread.currentThread().setName("李存勖部队");

        Thread.sleep(3000);

        System.out.println(Thread.currentThread().getName() + "休整3000ms");

} catch (InterruptedException e) {

    e.printStackTrace();

}

    System.out.println(Thread.currentThread().getName() + "整顿完毕,等待友军消息...");

try {

            String str = ft.get();

            System.out.println("李存勖部队得知友军消息为:" + str);

} catch (InterruptedException | ExecutionException e) {

    e.printStackTrace();

    }

}

 

}

运行结果:

        李存孝部队正在行军~~~

        李存孝部队遭遇敌军~~~

        李存孝部队奋勇杀敌!!!!

        李存勖部队休整3000ms

        李存勖部队整顿完毕,等待友军消息...

        李存勖部队得知友军消息为:战斗胜利,俘虏敌军50000人

二.生产者——消费者问题

  1. 生产者线程不断生产,消费者线程不断取走生产者生产的产品

  2. Object中的几个方法支持:

    (1)wait():线程等待,当前线程进入调用对象的线程——等待池

    (2)Notify():唤醒一个等待线程

    (3)notifyAll():唤醒全部的等到线程

    注意:以上三个方法都必须在同步机制中调用

    3(生产者消费者问题(一对一)):

    早餐基础类:

    package one2one.producer;

    // 早餐基础类

    public class Breakfast {

       private String food;   // 吃的

       private String drink;  // 喝的

       private boolean flag=false;

       

       public synchronized void makeBreakfast(String food,String drink){

       if(flag){

       try {

       wait();   // 生产者线程进入同步对象维护的“线程等待池”

       } catch (InterruptedException e) {

       e.printStackTrace();

       }

       }

       this.food=food;

       try {

       Thread.sleep(1000);   // 休眠,但不释放“锁”

       } catch (InterruptedException e) {

       e.printStackTrace();

       }

       this.drink=drink;

       flag=true;

       notify();

       }

       

       public synchronized void eatBreakfast(){

       if(!flag){

       try {

       wait();   // 消费者线程进入同步对象维护的“线程等待池”,而且当前线程释放"锁"

       } catch (InterruptedException e) {

       e.printStackTrace();

       }

       }

       try {

       Thread.sleep(1000);

       } catch (InterruptedException e) {

       e.printStackTrace();

       }

       System.out.println(this.food+"=============>"+this.drink);

       flag=false;

       notify();

       }

    }

    生产者线程类:

    package one2one.producer;

    // 生产者线程

    public class Producer implements Runnable{

        private Breakfast bf;

        public Producer(Breakfast bf){

         this.bf=bf;

        }

    @Override

    public void run() {

            for (int i = 1; i <=7; i++) {

    if(i%2==0){

    this.bf.makeBreakfast("bread","milk");

    }else{

    this.bf.makeBreakfast("馒头","稀饭");

    }

    }  

    }

     

    }

    消费者线程类:

    package one2one.producer;

    // 消费者线程

    public class Consumer implements Runnable{

        private Breakfast bf;

        public Consumer(Breakfast bf){

         this.bf=bf;

        }

    @Override

    public void run() {

    for (int i = 1; i <=7; i++) {

    System.out.println("星期"+i+"早餐种类:food======>drink");

    this.bf.eatBreakfast();

    }

    }

     

    }

    测试类:

    package one2one.producer;

     

    public class Test {

     

    public static void main(String[] args) {

    Breakfast bf=new Breakfast();

    new Thread(new Producer(bf)).start();   // 启动生产者线程

    new Thread(new Consumer(bf)).start();   // 启动消费者线程

     

    }

     

    }

    运行结果:

    星期1早餐种类:food======>drink

    馒头=============>稀饭

    星期2早餐种类:food======>drink

    bread=============>milk

    星期3早餐种类:food======>drink

    馒头=============>稀饭

    星期4早餐种类:food======>drink

    bread=============>milk

    星期5早餐种类:food======>drink

    馒头=============>稀饭

    星期6早餐种类:food======>drink

    bread=============>milk

    星期7早餐种类:food======>drink

    馒头=============>稀饭

  3. (生产者消费者问题many2many

    生产消费基础类:

    package manytomany.product;

     

    public class Product {

    private int count = 0;// 商品数量

    private int MAX = 10;// 最大库存

     

    // 生产商品

    public synchronized void makeProduct() {

    String thread_name = Thread.currentThread().getName();// 获取生产者线程名

    if (count > MAX) {

    System.out.println("货物已满" + thread_name + "停止生产...");

    try {

    notifyAll(); // 唤醒所有的消费者线程

    wait(); // 生产者线程停止生产

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    } else {

    try {

    Thread.sleep(1000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    count++; // 生产者线程生产商品

    System.out.println(thread_name + "生产了产品,目前商品总量:" + count);

    notifyAll();// 唤醒所有消费者线程,模拟消费

    }

    }

     

    // 消费商品

    public synchronized void buyProduct() {

    String thread_name = Thread.currentThread().getName();// 获取线程名

    if (count <= 0) {

    System.out.println("已无货," + thread_name + "停住消费...");

    try {

    notifyAll();// 唤醒生产者线程 生产商品

    wait();// 消费者线程休眠,停止消费

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    } else {

    try {

    Thread.sleep(1000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    count--;// 开始 消费商品

    System.out.println(thread_name + "消费了一个商品,目前商品数量为:" + count);

    }

    }

     

    }

     

    生产者线程:

    package manytomany.product;

     

    public class Producer implements Runnable {

    private Product product; // 获取Product对象

     

    public Producer(Product product) {

    this.product = product;

    }

     

    @Override

    public void run() {

    while (true) {

    product.makeProduct();// 调用生产方法

    }

    }

     

    }

    消费者线程:

    package manytomany.product;

     

    public class Consumer implements Runnable{

    private Product product;

     

    public Consumer(Product product) {

    this.product = product;

    }

     

    @Override

    public void run() {

    while(true){

    product.buyProduct();//调用消费方法

    }

     

    }

     

     

    }

    测试类:

    package manytomany.product;

     

    public class Test {

     

    public static void main(String[] args) {

    Product pro = new Product();

    new Thread(new Producer(pro), "生产者1号——>").start();

    new Thread(new Producer(pro), "生产者2号——>").start();

    new Thread(new Producer(pro), "生产者3号——>").start();

    new Thread(new Producer(pro), "生产者4号——>").start();

    new Thread(new Consumer(pro), "消费者A——>").start();

    new Thread(new Consumer(pro), "消费者B——>").start();

    new Thread(new Consumer(pro), "消费者C——>").start();

    new Thread(new Consumer(pro), "消费者D——>").start();

    new Thread(new Consumer(pro), "消费者E——>").start();

     

    }

     

    }

    运行结果(截取部分):

    消费者E——>消费了一个商品,目前商品数量为:7

    消费者E——>消费了一个商品,目前商品数量为:6

    消费者E——>消费了一个商品,目前商品数量为:5

    消费者A——>消费了一个商品,目前商品数量为:4

    消费者A——>消费了一个商品,目前商品数量为:3

    消费者A——>消费了一个商品,目前商品数量为:2

    消费者A——>消费了一个商品,目前商品数量为:1

    消费者A——>消费了一个商品,目前商品数量为:0

    已无货,消费者A——>停住消费...

    已无货,消费者C——>停住消费...

    已无货,消费者D——>停住消费...

    已无货,消费者B——>停住消费...

    生产者1号——>生产了产品,目前商品总量:1

    生产者1号——>生产了产品,目前商品总量:2

    生产者1号——>生产了产品,目前商品总量:3

    生产者1号——>生产了产品,目前商品总量:4

    生产者1号——>生产了产品,目前商品总量:5

     

     

     

  4. 多线程下载(复制)文件

    1.使用RandomAccessFileInputStreamskip(long n)方法使每个线程负责文件的每一部分读写。

    (开启6个线程断点下载(复制)电影).

    下载复制线程:

    package download;

     

    import java.io.*;

     

    public class DownloadRunnable implements Runnable{

     

    private File srcFile;   // 源文件路径

    private long startPos;   // 每个线程的开始下载位置

    private long partTask;   // 每个线程的下载任务

    private RandomAccessFile raf;  // 用来写入

    public DownloadRunnable(File srcFile,long startPos,long partTask,RandomAccessFile raf){

    this.srcFile=srcFile;

    this.startPos=startPos;

    this.partTask=partTask;

    this.raf=raf;

    }

     

    @Override

    public void run() {

    System.out.println(Thread.currentThread().getName()+"准备从第"+startPos+"个字节开始读...");

    InputStream input=null;

    try {

    input=new FileInputStream(srcFile);

    input.skip(startPos);  // 跳过输入流的startPos个字节

    byte[] b=new byte[1024*1024*10];

    int len=0;

    int count=0;   // 用来记录已经读写的字节数

    while((len=input.read(b))!=-1 && count<partTask){

    raf.write(b, 0, len);

    count+=len;

    }

    System.out.println(Thread.currentThread().getName()+"已经写入了"+count+"个字节");

    } catch (Exception e) {

    e.printStackTrace();

    }finally{

    try {

    input.close();

    raf.close();

    } catch (IOException e) {

    e.printStackTrace();

    }

    }

    }

     

    }

    开启线程类:

    package dowload;

     

    import java.io.*;

     

    public class TestDown {

     

    public static void main(String[] args) {

    File srcFile = new File("e:" + File.separator + "哈利波特" + File.separator

    + "哈利波特与死亡圣器上.mkv");

    long pathTask = srcFile.length() / 6;

    for (int i = 0; i < 6; i++) {

    RandomAccessFile raf = null;

    try {

    raf = new RandomAccessFile("d:" + File.separator + "Movies.mkv", "rw");

    long startPos = pathTask * i;

    raf.seek(startPos);

    new Thread(

    new DownloadRunnable(srcFile, startPos, pathTask, raf),

    "第" + (i + 1) + "条下载线程——>").start();

    } catch (Exception e) {

    e.printStackTrace();

    }

    }

    }

     

    }

     

    【本次总结完毕】

     

     

     

     

     

     

     

     

     

     

     

     

     

     

 

以上是关于Java总结——通过Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件的主要内容,如果未能解决你的问题,请参考以下文章

Callable,阻塞队列,线程池问题

Java多线程之Callable接口的实现

java 多线程-实现Callable接口

java多线程 -- 创建线程的第三者方式 实现Callable接口

JAVA多线程实现和应用总结

Java多线程之Callable接口的实现