线程池实现文件复制操作

Posted 曦系

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池实现文件复制操作相关的知识,希望对你有一定的参考价值。

项目:使用线程池实现大目录拷贝

       创建一个线程的综合资源,让需要执行的任务挂载在线程池中。如果线程池有空闲的线程,就可以安排线程去执行任务,如果线程池没有空闲的线程,就安排任务等待,直到有线程空闲出来。

 

线程池步骤:

  1. 初始化线程池资源
  2. 向线程池中加入线程
  3. 向线程池中添加任务               ---如果有空闲的线程,空闲线程可以自动获取任务并运行
  4. 销毁线程池。

源码分享:thread_pool.c

/*
    设计思路:
        1、预处理板块:构建任务结点结构体、线程池结构体,文件路径结构体
        2、void handler(void *arg)	//解除死锁,防止死锁
        3、void *routine(void *arg)//提取调用任务链表的任务结点
        4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
        5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
        6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)  //添加任务
        7、void *copyfile(void * arg)  //复制文件函数
        8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
        9、bool destroy_pool(thread_pool *pool)	 //摧毁线程池

        main()
        {
            if-判断输入命令是否规范规范
            初始化线程池
            创建对象文件,存储源文件和复制文件路径
            判断文件属性,执行拷贝操作    
            销毁线程池

        }




*/




#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <dirent.h>
#include <stdbool.h>
#include <string.h>
#include <errno.h>

//********************预处理板块**************************
#define MAX_WAITING_TASKS	1000  //等待任务最大数目
#define MAX_ACTIVE_THREADS	200	  //最大线程数

//任务结构体
typedef struct task
{
    void *(*task)(void *arg);//定义任务函数指针以及参数
	void *arg;			 	 //函数参数(指针传参)
	struct task *next;       //结点内当然有指向下一结点的指针
    /*
         任务链表的结点内包含三个元素:
            1、函数指针,指向任务函数(属于data)
            2、函数所对应的参数,这里用指针传参
            3、结点内指向下一结点的指针 next
     */
}task;

typedef struct thread_pool
{
    /* data */
    pthread_mutex_t lock;		//互斥锁
	pthread_cond_t  cond;		//条件变量  跟互斥锁是搭配使用
	
    task *task_list;		    //一个任务节点
	pthread_t *tids;			//线程号指针变量
 
	unsigned waiting_tasks;		//等待任务
	unsigned active_threads;	//执行线程
	
	bool shutdown;				//一个线程池销毁开关
}thread_pool;

typedef struct file_path
{
    /* 用于存储文件路径的结构体 */
    char source_file[4096];  //字符数组,用于存储源文件路径
    char copy_file[4096];    //字符数组,用于存放复制后的文件路径
}file_path;


//********************预处理板块**************************


//解除死锁,防止死锁     handler函数会被routine调用
void handler(void *arg)	
{
    pthread_mutex_unlock((pthread_mutex_t *)arg);
}

//提取调用任务链表的任务结点
void *routine(void *arg)    //这里的arg是task结构体的一项参数
{
    thread_pool *pool = (thread_pool *)arg;    //
    task *p;      //定义一个任务结点

    while (1)
    {
        /* code */
        pthread_cleanup_push(handler, (void *)&pool->lock);
        //调用hand函数,防止死锁

        //做一个判断,当没有任务,并且pool没有销毁时,执行休眠操作
        if (pool->waiting_tasks == 0 && !pool->shutdown)
        {
            pthread_cond_wait(&pool->cond, &pool->lock); //休眠
        }

        //判断是否需要要关闭线程
        if(pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			pthread_mutex_unlock(&pool->lock);	//先执行解锁操作
			pthread_exit(NULL);	//退出线程
		}
         
		p = pool->task_list->next;			//从线程池中任务链表取出(复制)一个任务结点给P
		pool->task_list->next = p->next;	//然后被取出的任务被其他任务覆盖
		pool->waiting_tasks--;				//线程池中等待的任务减少一项

        pthread_mutex_unlock(&pool->lock);	//一次任务取出成功,解除互斥锁资源,方便下一次调用任务结点使用
		pthread_cleanup_pop(0);             //
 
//注意!

		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //强制性的阻塞 任何进程都不能取消,我想老老实实把这个线程完成,其他的都先排队
		(p->task)(p->arg);		
					
									//他运行的是add_task( pool, copfile, tmpfile);	
									//传过来的参数 
									//p->task 等于copyfile
									//p->arg 等于  tmpfile
		
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);//关闭强制阻塞
		
		free(p->arg);							//释放在检索目录时 在内存开辟的空间
		free(p);								//释放掉完成任务的节点
	}
 
	pthread_exit(NULL);

 //注意!没有这一段,copyfile无法执行


}

//初始化线程池
bool pool_init(thread_pool *pool, unsigned int threads_number)
{
/*  线程池结构体元素
    pthread_mutex_t lock;		//互斥锁
	pthread_cond_t  cond;		//条件变量  跟互斥锁是搭配使用
	
    task *task_list;		    //一个任务节点
	pthread_t *tids;			//线程号指针变量
 
	unsigned waiting_tasks;		//等待任务   任务数量
	unsigned active_threads;	//执行线程   线程数量
	
	bool shutdown;				//一个线程池销毁开关
*/

    pthread_mutex_init(&pool->lock,NULL);   //线程池互斥锁初始化,这是个初始化函数
    pthread_cond_init(&pool->cond,NULL);   //线程池条件变量初始化,这是个初始化函数


//先分配内存
    pool->shutdown = false;  //线程池销毁为假,即不销毁(刚申请一个线程池,初始化肯定不能销毁呀!)
    pool->task_list = malloc(sizeof(task));  //线程池中的任务链表初始化分配内存空间
    pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);//给进程分配空间

    if (pool->task_list == NULL || pool->tids == NULL)
    {
        perror("任务链表 或 线程分配 内存失败!");
        return false;
    }

//在把数量清零
    pool->task_list->next = NULL; //只有一个节点,还未连城任务链表
    pool->waiting_tasks = 0;   //暂时没有等待中的任务,要等后面添加任务结点
    pool->active_threads = threads_number;   //thread_number是参数,是传入的线程数量,赋值过来

    int i;
    for (i = 0; i < pool->active_threads; i++)
    {
        if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0)  
		//返回值为0表示创建成功,返回不为0表示创建失败
		{
			perror("线程创建失败");//根据传入的线程数,逐一创建线程,存入tid[]
			return false;
		}
    }
    return true;
}




//添加线程
int add_thread(thread_pool *pool, unsigned additional_threads)		
{
    if(additional_threads == 0)     //传入添加的线程数
    {
	    return 0;         //没有添加线程,啥都干不了,只能退出
    }
	unsigned total_threads = pool->active_threads + additional_threads;
    //计算先从总数 = 线程池中的线程 + 新添加的线程

	int i, actual_increment = 0;
	for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++)
	{
		if(pthread_create(&((pool->tids)[i]),NULL, routine, (void *)pool) != 0)
		{
			perror("添加线程失败!");
			if(actual_increment == 0)
            {
				return -1;
            }
			break;  //创建失败的话,直接break,不再执行后面的++
		}//创建失败

		actual_increment++; //实际上添加成功的线程数
	}
 
	pool->active_threads += actual_increment;
	return actual_increment;

}


//添加任务
bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)
{
	
    // pool->task_list = malloc(sizeof(task));  //之前初始化的是线程池中的链表结点内存
	struct task *new_task = malloc(sizeof(task));  // 任务结点new_task分配内存

	if(new_task == NULL)
	{
		perror("任务结点分配内存失败!");
		return false;
	}

    //初始化结点参数
	new_task->task = task;   
	new_task->arg = arg;
	new_task->next = NULL;
	
	pthread_mutex_lock(&pool->lock);   //上锁
	
	if(pool->waiting_tasks >= MAX_WAITING_TASKS)  //判断任务是否超出上限,必要的操作
	{
		pthread_mutex_unlock(&pool->lock);
 
		fprintf(stderr, "任务过多!\\n");
		free(new_task);  //释放任务空间
		return false;
	}
	
	struct task *tmp = pool->task_list;
	while(tmp->next != NULL)  //从任务结点取到不为空,就是取到任务
    {
        tmp = tmp->next;  //把这个任务取出来,拿到tmp上
    }

	tmp->next = new_task;
	pool->waiting_tasks++;
 
 
	pthread_mutex_unlock(&pool->lock);   //释放锁资源
	pthread_cond_signal(&pool->cond);	//唤醒一个休眠的线程

	return true;
}


//复制文件函数
void *copyfile(void * arg)
{

/*
    把要打开的文件路径放入source_file变量下(路径)
    打开源文件--------可读   判断打开是否成功
    创建目标文件,即复制文件    判断打开是否结束
    利用read函数和write函数,实现文件读写
    关闭打开的两个文件
*/

	file_path *dofile = (file_path *)arg; //arg是文件参数
	struct stat file_stat; //这个结构体来自#include <sys/stat.h>	

	stat(dofile->source_file, &file_stat);
    //通过文件名 获取文件的路径把他存放到结构体的sourc_file里面
	int source_fd,copy_fd;					
	source_fd = open(dofile->source_file,O_RDONLY);//打开源路径,只读模式

	if(source_fd == -1 )
	{
		printf("打开文件 %s\\n 失败!\\n",dofile->source_file);
        //打开文件失败
		return NULL;
	}
	
	copy_fd = open(dofile->copy_file,O_CREAT | O_TRUNC | O_RDWR,file_stat.st_mode);
	
	if( copy_fd == -1)
	{
		printf("打开文件 %s 失败!\\n",dofile->copy_file);
		return NULL;
	}
 
	int nread;
	char buf[100];
	while((nread = read(source_fd,buf,100)) > 0)  //读取源文件的内容
	{
		if( write(copy_fd,buf,nread) == -1)	  //把读到的全部写进目标文件
		{
			break;
		}
	}
	printf("读取文件: %s\\n",dofile->source_file);
	printf("写入文件: %s\\n",dofile->copy_file);

	close(source_fd);  //关闭源文件
	close(copy_fd);    //关闭复制文件
	return NULL;

}

//复制目录函数
int copydir( file_path *dofile,thread_pool *pool)
{
	struct stat file_stat;				
	stat(dofile->source_file,&file_stat); //获取文件的属性
	mkdir(dofile->copy_file,file_stat.st_mode); //以源目录的类型和目录来创建一个目录
 
	DIR *sourcedir = opendir(dofile->source_file); //打开源目录
	struct dirent *dp;
	
	
	
	while( (dp = readdir(sourcedir))!=NULL )    //获取文件夹内文件的信息
	{
		
	
		if(dp->d_name[0] == \'.\') //如果文件为. 或者 .. 则跳过
		{						 
			continue;
		}
		//对本目录下的所有文件进行拷贝操作
		file_path *tmpfile = malloc(sizeof(file_path)); //为文件结构体开辟内存空间
		
		memset(tmpfile,0,sizeof(file_path));				//对内存清零 
		
		sprintf(tmpfile->source_file,"%s/%s",dofile->source_file,dp->d_name);//拼凑源文件路径
		sprintf(tmpfile->copy_file,"%s/%s",dofile->copy_file,dp->d_name);//拼凑目标文件路径
 
		struct stat tmpstat;
		stat(tmpfile->source_file,&tmpstat);	     			
 
		if(S_ISREG(tmpstat.st_mode))						//如果为普通文件,则拷贝
		{
			printf("tmpfile->source_file = %s\\n",tmpfile->source_file);
			printf("tmpfile->copy_file = %s\\n", tmpfile->copy_file);
			printf("\\n");
			add_task( pool, copyfile, tmpfile);			//把复制的任务丢到任务链表
		}
		else if(S_ISDIR(tmpstat.st_mode))//如果为目录,则递归
		{
			copydir(tmpfile,pool);
		}
 
	}
	return 0;
}


//摧毁线程池
bool destroy_pool(thread_pool *pool)	
{
	pool->shutdown = true;
	pthread_cond_broadcast(&pool->cond);
	
	sleep(1);//等待空闲线程响应完毕

	int i;
	for(i=0; i<pool->active_threads; i++)
	{
		errno = pthread_join(pool->tids[i], NULL);
		if(errno != 0)
		{
			printf("copy tids[%d] error: %s\\n",i, strerror(errno));
		}
		else
			printf("[%u] is copyed\\n", (unsigned)pool->tids[i]);
		
	}
 
	free(pool->task_list);
	free(pool->tids);
	free(pool);
 
	return true;
}











//函数1:初始化线程池   pool_init(pool,100);


int main(int argc, char const *argv[])
{

    //第一步:   if-判断输入命令是否规范规范
    if(argc != 3)
    {
        printf("你的命令格式错误!\\n");
        printf("请按右侧格式run: ./%s 源文件路径  生成文件路径 \\n",argv[0]);
        return -1; //命令异常退出
    }


    //第二步:   初始化线程池
    thread_pool *pool = malloc(sizeof(thread_pool));//定义线程池的poor指针,初始化内存空间
    /*调用线程初始化函数------pool_init()*/
    pool_init(pool,100);          //调用函数初始化线程,分配内存(初始化线程函数会调用routine)


    //第三步:   创建对象文件,存储源文件和复制文件路径
    file_path dofile;     //创建一个路径实例dofile,存储源文件和复制文件的路径
    strcpy(dofile.source_file,argv[1]);   //strcpy是函数   #include <string.h>
    strcpy(dofile.copy_file,argv[2]);    // char *strcpy(char *dest, const char *src);

    struct stat copestat;//这个结构体来自#include <sys/stat.h>
    stat(dofile.source_file,&copestat);  //stat是函数,将dofile.source_file的信息,获取到&copestat
    /*
        #include<sys/stat.h>
        int stat(const char *restrict pathname,struct stat *restrict buf);
        int fstat(int fields,struct stat *buf);
        int lstat(const char *restrict pathname,struct stat *restrict buf);
    */


    //第四步:   判断文件属性,执行拷贝操作    
    if(S_ISREG(copestat.st_mode))//如果为普通文件,则拷贝
	{
		copyfile(&dofile);  //直接拷贝,不用添加到任务链表
		
	}
	else if(S_ISDIR(copestat.st_mode))//如果为目录,则递归
	{
		copydir(&dofile,pool);
	}


    //第五步:   销毁线程池
    destroy_pool(pool);    //销毁pool   destroy破坏
    return 0;
}


/*


    详细设计步骤:
        1、预处理板块:构建
				任务结点结构体、
				线程池结构体,
				文件路径结构体
        
        2、void handler(void *arg)	//解除死锁,防止死锁
				pthread_mutex_unlock((pthread_mutex_t *)arg);
				
        3、void *routine(void *arg)//提取调用任务链表的任务结点  被pool_init和add_thread调用过
				传入pool  定义一个任务结点p(task结构体的实例)
				while循环
				{
					首先调用线程清理函数,清理掉残留的线程,解放锁资源
					当没有任务情况下,线程执行休眠操作还是直接销毁(由shutdown决定)
					当有任务情况下,提取任务结点(拿出来给p,任务结点数量减少),任务取出后,解放对应锁资源
					
					
					
				}
				
        
        4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
				(传入pool)
				加锁,条件变量
				给pool的各成员变量初始化分配内存
						判断是否成功
				pool各成员变量初始化赋值为空
				for循环逐一创建线程:(返回为0表示创建成功,不为0表示创建失败)
		(注意:thread:线程标识符     attr:线程属性设置      start_routine:线程函数起始地址   arg:传递参数)
				
        5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
				创建线程,增加线程数,计算可用线程
        
        6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)  		//添加任务
				先定于一个new_task指针,申请内存,初始化结构体成员变量
							内存申请成功?
				给互斥锁资源
				判断等待任务结点是否超过上限?
				定义一个tmp结点指针指向任务链表		
				把任务结点拿出来,组织起来,形成等待任务链表
				任务链表串完了,解除互斥锁哦并且唤醒一个进程
				
        7、void *copyfile(void * arg)  //复制文件函数
				把要打开的文件路径放入source_file变量下(路径)
				打开源文件--------可读   判断打开是否成功
				创建目标文件,即复制文件    判断打开是否结束
				利用read函数和write函数,实现文件读写
				关闭打开的两个文件
        
        8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
				普通文件	调用add_task函数复制
				目录文件	mkdir创建目录(tmpfile内的文件夹命名)  如果是目录不停递归
				
        10、bool destroy_pool(thread_pool *pool)	 //摧毁线程池

        main()
        {
            第一步:if-判断输入命令是否规范规范(三个字符串命令:./%s 源文件路径  生成文件路径)
            第二步:初始化线程池
					  thread_pool *pool = malloc(sizeof(thread_pool));
					  //定义线程池的poor指针,初始化内存空间
					  
					  //调用线程初始化函数------pool_init()
					  pool_init(pool,100);          
					  //调用函数初始化线程,分配内存(初始化线程函数会调用routine)
							

            第三步:创建对象文件,存储源文件和复制文件路径
						创建一个dofile,存储源文件和复制文件路径
								(dofile是file_path结构体的一个实例)dofile.source 源文件路径
						(copeatat:提取文件队列,作为判断文件是普通文件还是目录文件的参数)
						
            第四步:判断文件属性,执行拷贝操作
						使用st_mode判断文件类型
								执行copyfile和copydir操作(复制文件和目录)
            第五步:销毁线程池
						直接调用destroy_pool函数销毁线程池
        }

*/

29多线程(线程池定时器)将一个文件复制多次拆分文件并合并多线程复制文件

线程池

程序启动一个新线程成本是比较高的,因为它涉及到与操作系统交互。而使用线程池可以很好的提高性能,尤其是

当程序中要创建大量生存期很短的线程时,更应该考虑使用线程池。

线程池里的每一个线程代码结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用。

在JDK 5之前,我们必须手动实现自己的线程池,从JDK 5开始,Java内置支持线程池。

package org.westos.demo;

import java.util.concurrent.*;

/**
 * @author lwj
 * @date 2020/6/7 8:36
 */
public class MyTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        线程池:容器,存有一定数量线程对象的容器
        JDK5新增了一个Executors工厂类来产生线程池,有如下几个方法
		public static ExecutorService newCachedThreadPool():			根据任务的数量来创建线程对应的线程个数
		public static ExecutorService newFixedThreadPool(int nThreads):	固定初始化几个线程
		public static ExecutorService newSingleThreadExecutor():			初始化一个线程的线程池
        这些方法的返回值是ExecutorService对象,该对象表示一个线程池,可以执行Runnable对象或者Callable对象代表的线程。它提供了如下方法
            Future<?> submit(Runnable task)
            <T> Future<T> submit(Callable<T> task)
        使用步骤:
            创建线程池对象
            创建Runnable实例
            提交Runnable实例
            关闭线程池
         */
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("线程执行了这个任务");
            }
        });
        //根据任务数量创建相应数量的线程对象
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("提交第二个任务");
            }
        });
        Future<Integer> submit = executorService.submit(new MyCallable());
        System.out.println(submit.get());

        executorService.shutdown();
        //关闭线程池
    }
}

class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        return 100;
    }
}
package org.westos.demo2;

import java.util.concurrent.*;

/**
 * @author lwj
 * @date 2020/6/7 8:36
 */
public class MyTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        线程池:容器,存有一定数量线程对象的容器
        JDK5新增了一个Executors工厂类来产生线程池,有如下几个方法
		public static ExecutorService newCachedThreadPool():			根据任务的数量来创建线程对应的线程个数
		public static ExecutorService newFixedThreadPool(int nThreads):	固定初始化几个线程
		public static ExecutorService newSingleThreadExecutor():			初始化一个线程的线程池
        这些方法的返回值是ExecutorService对象,该对象表示一个线程池,可以执行Runnable对象或者Callable对象代表的线程。它提供了如下方法
            Future<?> submit(Runnable task)
            <T> Future<T> submit(Callable<T> task)
        使用步骤:
            创建线程池对象
            创建Runnable实例
            提交Runnable实例
            关闭线程池
         */
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        });
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        });
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        });
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        });

        executorService.shutdown();
    }
}
package org.westos.demo3;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lwj
 * @date 2020/6/7 8:36
 */
public class MyTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        线程池:容器,存有一定数量线程对象的容器
        JDK5新增了一个Executors工厂类来产生线程池,有如下几个方法
		public static ExecutorService newCachedThreadPool():			根据任务的数量来创建线程对应的线程个数
		public static ExecutorService newFixedThreadPool(int nThreads):	固定初始化几个线程
		public static ExecutorService newSingleThreadExecutor():			初始化一个线程的线程池
        这些方法的返回值是ExecutorService对象,该对象表示一个线程池,可以执行Runnable对象或者Callable对象代表的线程。它提供了如下方法
            Future<?> submit(Runnable task)
            <T> Future<T> submit(Callable<T> task)
        使用步骤:
            创建线程池对象
            创建Runnable实例
            提交Runnable实例
            关闭线程池
         */
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        });
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        });

        executorService.shutdown();
    }
}

定时器与定时任务

package org.westos.demo4;

import java.util.Timer;
import java.util.TimerTask;

/**
 * 定时器
 * @author lwj
 * @date 2020/6/7 9:28
 */
public class MyTest {
    public static void main(String[] args) {
        Timer timer = new Timer();
        //TimerTask抽象类
        MyTimerTask myTimerTask = new MyTimerTask(timer);
        timer.schedule(myTimerTask, 3000);
        //等3s后执行该定时任务

        // 取消定时器
        // timer.cancel();
    }
}

class MyTimerTask extends TimerTask {
    public Timer timer;

    public MyTimerTask(Timer timer) {
        this.timer = timer;
    }

    @Override
    public void run() {
        System.out.println("时间到了");
        timer.cancel();
    }
}
package org.westos.demo4;

import java.util.Timer;
import java.util.TimerTask;

/**
 * @author lwj
 * @date 2020/6/7 9:36
 */
public class MyTest2 {
    public static void main(String[] args) {
        Timer timer = new Timer();
        MyTimerTask2 myTimerTask2 = new MyTimerTask2();
        timer.schedule(myTimerTask2, 3000, 1000);
        //等3s后执行此任务,以后间隔1s重复执行定时任务
    }
}

class MyTimerTask2 extends TimerTask {

    @Override
    public void run() {
        System.out.println("时间到了");
    }
}
package org.westos.demo4;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

/**
 * @author lwj
 * @date 2020/6/7 9:38
 */
public class MyTest3 {
    public static void main(String[] args) throws ParseException {
        Timer timer = new Timer();
        MyTimerTask3 myTimerTask3 = new MyTimerTask3();
        //myTimerTask2.cancel(); 取消定时任务

        String str = "2020-06-07 09:49:00";
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date parse = sdf.parse(str);

        timer.schedule(myTimerTask3, parse);
        //指定某个时间执行定时任务
    }
}

class MyTimerTask3 extends TimerTask {

    @Override
    public void run() {
        System.out.println("时间到了");
    }
}

指定时间删除文件夹

package org.westos.demo4;

import java.io.File;
import java.util.Timer;
import java.util.TimerTask;

/**
 * 定时删除一个文件夹
 * @author lwj
 * @date 2020/6/7 9:49
 */
public class MyDemo {
    public static void main(String[] args) {
        Timer timer = new Timer();
        //定时器

        File source = new File("E:\\新建文件夹");
        TimerTaskDemo timerTaskDemo = new TimerTaskDemo(source, timer);

        timer.schedule(timerTaskDemo, 5000);
        //延迟5s后执行定时任务,删除文件夹
    }
}

class TimerTaskDemo extends TimerTask {
    public File directory;
    public Timer timer;

    public TimerTaskDemo(File directory, Timer timer) {
        this.directory = directory;
        this.timer = timer;
    }

    @Override
    public void run() {
        //删除文件夹并关闭定时器
        deleteDir(directory);
        timer.cancel();
        //取消定时器
    }

    public void deleteDir(File file) {
        if (file.isDirectory()) {
            File[] files = file.listFiles();
            for (File file1 : files) {
                deleteDir(file1);
            }
        }
        file.delete();
        //删除文件或者空文件夹
    }
}

练习题

1、把一个文件复制多次

package org.westos.demo5;

import java.io.*;

/**
 * @author lwj
 * @date 2020/6/7 15:11
 */
public class MyTest {
    public static void main(String[] args) throws IOException {
        File source = new File("C:\\Users\\shawn\\Music\\MV\\G.E.M. 邓紫棋-我说了算(多芬秀发合作单曲)(蓝光).mp4");
        RandomAccessFile rw = new RandomAccessFile(source, "rw");
        byte[] bytes = new byte[1024 * 8];
        int len = 0;
        for (int i = 0; i < 3; i++) {
            FileOutputStream fos = new FileOutputStream(new File("E:\\music", "G.E.M" + i + ".mp4"));
            while ((len = rw.read(bytes)) != -1) {
                fos.write(bytes, 0, len);
                fos.flush();
            }
            fos.close();
            //每复制一个文件,将随机读取流的指针重置为0
            rw.seek(0);
        }
        rw.close();
    }
}

2、拆分文件并合并

package org.westos.demo5;

import java.io.*;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Vector;

/**
 * @author lwj
 * @date 2020/6/7 15:24
 */
public class MyTest2 {
    public static void main(String[] args) throws IOException {
        split();
        merge();
    }

    private static void merge() throws IOException {
        File source = new File(".");
        System.out.println(source.getAbsolutePath());
        File[] files = source.listFiles(new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return pathname.isFile() && pathname.length() <= 1024 * 1024 && pathname.getName().endsWith(".mp3");
            }
        });
        Vector<FileInputStream> vector = new Vector<>();
        for (File file : files) {
            vector.addElement(new FileInputStream(file));
        }
        Enumeration<FileInputStream> elements = vector.elements();
        SequenceInputStream sis = new SequenceInputStream(elements);

        byte[] bytes = new byte[1024];
        int len = 0;
        FileOutputStream fos = new FileOutputStream(new File("百事.mp3"));
        while ((len = sis.read(bytes)) != -1) {
            fos.write(bytes, 0, len);
            fos.flush();
        }
        fos.close();
        sis.close();
    }

    private static void split() throws IOException {
        //把一份文件拆分为多份,每份1M,然后合并
        File source = new File("C:\\Users\\shawn\\Music\\G.E.M. 邓紫棋 _ 王嘉尔 - 热爱就一起.mp3");

        FileInputStream fis = new FileInputStream(source);
        byte[] bytes = new byte[1024 * 1024];
        int len = 0;
        int index = 0;
        while ((len = fis.read(bytes)) != -1) {
            FileOutputStream fos = new FileOutputStream(new File(index++ + ".mp3"));
            fos.write(bytes, 0, len);
            fos.close();
        }
        fis.close();
    }
}

3、多线程复制文件

package org.westos.demo5;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

/**
 * @author lwj
 * @date 2020/6/7 15:49
 */
public class MyDemo {
    public static void main(String[] args) throws FileNotFoundException {
        //多线程复制同一份文件
        File source = new File("C:\\Users\\shawn\\Music\\G.E.M. 邓紫棋 _ 王嘉尔 - 热爱就一起.mp3");
        File destFile = new File(".\\邓紫棋-热爱就一起.mp3");
        long length = source.length();
        //文件的总大小
        long threadNums = 3;
        long average = length / threadNums;
        for (int i = 0; i < 3; i++) {
            long start = i * (average + 1);
            long end = (i + 1) * average;
            new CopyFileThread(start, end, source, destFile).start();
        }
        //如果3个线程不能均分,那么再添加一个线程
        if (length % threadNums != 0) {
            long start = threadNums * average + 1;
            new CopyFileThread(start, length, source, destFile).start();
        }
    }
}


class CopyFileThread extends Thread {
    public long start;
    public long end;
    public RandomAccessFile in;
    public RandomAccessFile out;

    public CopyFileThread(long start, long end, File srcFile, File destFile) throws FileNotFoundException {
        this.start = start;
        this.end = end;
        this.in = new RandomAccessFile(srcFile, "rw");
        this.out = new RandomAccessFile(destFile, "rw");
    }

    @Override
    public void run() {
        try {
            in.seek(start);
            out.seek(start);
            //设置读写起始指针
            byte[] bytes = new byte[1024 * 8];
            int len = 0;
            long pointer = in.getFilePointer();
            while (pointer <= end && (len = in.read(bytes)) != -1) {
                //最后一次有可能只剩余1KB,然而最后一次还是会读取8KB,然后pointer > end退出while
                out.write(bytes, 0, len);
                pointer += len;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

以上是关于线程池实现文件复制操作的主要内容,如果未能解决你的问题,请参考以下文章

29多线程(线程池定时器)将一个文件复制多次拆分文件并合并多线程复制文件

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

线程池-实现一个取消选项

线程池与并行度

Motan在服务provider端用于处理request的线程池

线程同步&线程池