线程池实现文件复制操作
Posted 曦系
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池实现文件复制操作相关的知识,希望对你有一定的参考价值。
项目:使用线程池实现大目录拷贝
创建一个线程的综合资源,让需要执行的任务挂载在线程池中。如果线程池有空闲的线程,就可以安排线程去执行任务,如果线程池没有空闲的线程,就安排任务等待,直到有线程空闲出来。
线程池步骤:
- 初始化线程池资源
- 向线程池中加入线程
- 向线程池中添加任务 ---如果有空闲的线程,空闲线程可以自动获取任务并运行
- 销毁线程池。
源码分享:thread_pool.c
/*
设计思路:
1、预处理板块:构建任务结点结构体、线程池结构体,文件路径结构体
2、void handler(void *arg) //解除死锁,防止死锁
3、void *routine(void *arg)//提取调用任务链表的任务结点
4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg) //添加任务
7、void *copyfile(void * arg) //复制文件函数
8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
9、bool destroy_pool(thread_pool *pool) //摧毁线程池
main()
{
if-判断输入命令是否规范规范
初始化线程池
创建对象文件,存储源文件和复制文件路径
判断文件属性,执行拷贝操作
销毁线程池
}
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <dirent.h>
#include <stdbool.h>
#include <string.h>
#include <errno.h>
//********************预处理板块**************************
#define MAX_WAITING_TASKS 1000 //等待任务最大数目
#define MAX_ACTIVE_THREADS 200 //最大线程数
//任务结构体
typedef struct task
{
void *(*task)(void *arg);//定义任务函数指针以及参数
void *arg; //函数参数(指针传参)
struct task *next; //结点内当然有指向下一结点的指针
/*
任务链表的结点内包含三个元素:
1、函数指针,指向任务函数(属于data)
2、函数所对应的参数,这里用指针传参
3、结点内指向下一结点的指针 next
*/
}task;
typedef struct thread_pool
{
/* data */
pthread_mutex_t lock; //互斥锁
pthread_cond_t cond; //条件变量 跟互斥锁是搭配使用
task *task_list; //一个任务节点
pthread_t *tids; //线程号指针变量
unsigned waiting_tasks; //等待任务
unsigned active_threads; //执行线程
bool shutdown; //一个线程池销毁开关
}thread_pool;
typedef struct file_path
{
/* 用于存储文件路径的结构体 */
char source_file[4096]; //字符数组,用于存储源文件路径
char copy_file[4096]; //字符数组,用于存放复制后的文件路径
}file_path;
//********************预处理板块**************************
//解除死锁,防止死锁 handler函数会被routine调用
void handler(void *arg)
{
pthread_mutex_unlock((pthread_mutex_t *)arg);
}
//提取调用任务链表的任务结点
void *routine(void *arg) //这里的arg是task结构体的一项参数
{
thread_pool *pool = (thread_pool *)arg; //
task *p; //定义一个任务结点
while (1)
{
/* code */
pthread_cleanup_push(handler, (void *)&pool->lock);
//调用hand函数,防止死锁
//做一个判断,当没有任务,并且pool没有销毁时,执行休眠操作
if (pool->waiting_tasks == 0 && !pool->shutdown)
{
pthread_cond_wait(&pool->cond, &pool->lock); //休眠
}
//判断是否需要要关闭线程
if(pool->waiting_tasks == 0 && pool->shutdown == true)
{
pthread_mutex_unlock(&pool->lock); //先执行解锁操作
pthread_exit(NULL); //退出线程
}
p = pool->task_list->next; //从线程池中任务链表取出(复制)一个任务结点给P
pool->task_list->next = p->next; //然后被取出的任务被其他任务覆盖
pool->waiting_tasks--; //线程池中等待的任务减少一项
pthread_mutex_unlock(&pool->lock); //一次任务取出成功,解除互斥锁资源,方便下一次调用任务结点使用
pthread_cleanup_pop(0); //
//注意!
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //强制性的阻塞 任何进程都不能取消,我想老老实实把这个线程完成,其他的都先排队
(p->task)(p->arg);
//他运行的是add_task( pool, copfile, tmpfile);
//传过来的参数
//p->task 等于copyfile
//p->arg 等于 tmpfile
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);//关闭强制阻塞
free(p->arg); //释放在检索目录时 在内存开辟的空间
free(p); //释放掉完成任务的节点
}
pthread_exit(NULL);
//注意!没有这一段,copyfile无法执行
}
//初始化线程池
bool pool_init(thread_pool *pool, unsigned int threads_number)
{
/* 线程池结构体元素
pthread_mutex_t lock; //互斥锁
pthread_cond_t cond; //条件变量 跟互斥锁是搭配使用
task *task_list; //一个任务节点
pthread_t *tids; //线程号指针变量
unsigned waiting_tasks; //等待任务 任务数量
unsigned active_threads; //执行线程 线程数量
bool shutdown; //一个线程池销毁开关
*/
pthread_mutex_init(&pool->lock,NULL); //线程池互斥锁初始化,这是个初始化函数
pthread_cond_init(&pool->cond,NULL); //线程池条件变量初始化,这是个初始化函数
//先分配内存
pool->shutdown = false; //线程池销毁为假,即不销毁(刚申请一个线程池,初始化肯定不能销毁呀!)
pool->task_list = malloc(sizeof(task)); //线程池中的任务链表初始化分配内存空间
pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);//给进程分配空间
if (pool->task_list == NULL || pool->tids == NULL)
{
perror("任务链表 或 线程分配 内存失败!");
return false;
}
//在把数量清零
pool->task_list->next = NULL; //只有一个节点,还未连城任务链表
pool->waiting_tasks = 0; //暂时没有等待中的任务,要等后面添加任务结点
pool->active_threads = threads_number; //thread_number是参数,是传入的线程数量,赋值过来
int i;
for (i = 0; i < pool->active_threads; i++)
{
if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0)
//返回值为0表示创建成功,返回不为0表示创建失败
{
perror("线程创建失败");//根据传入的线程数,逐一创建线程,存入tid[]
return false;
}
}
return true;
}
//添加线程
int add_thread(thread_pool *pool, unsigned additional_threads)
{
if(additional_threads == 0) //传入添加的线程数
{
return 0; //没有添加线程,啥都干不了,只能退出
}
unsigned total_threads = pool->active_threads + additional_threads;
//计算先从总数 = 线程池中的线程 + 新添加的线程
int i, actual_increment = 0;
for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++)
{
if(pthread_create(&((pool->tids)[i]),NULL, routine, (void *)pool) != 0)
{
perror("添加线程失败!");
if(actual_increment == 0)
{
return -1;
}
break; //创建失败的话,直接break,不再执行后面的++
}//创建失败
actual_increment++; //实际上添加成功的线程数
}
pool->active_threads += actual_increment;
return actual_increment;
}
//添加任务
bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)
{
// pool->task_list = malloc(sizeof(task)); //之前初始化的是线程池中的链表结点内存
struct task *new_task = malloc(sizeof(task)); // 任务结点new_task分配内存
if(new_task == NULL)
{
perror("任务结点分配内存失败!");
return false;
}
//初始化结点参数
new_task->task = task;
new_task->arg = arg;
new_task->next = NULL;
pthread_mutex_lock(&pool->lock); //上锁
if(pool->waiting_tasks >= MAX_WAITING_TASKS) //判断任务是否超出上限,必要的操作
{
pthread_mutex_unlock(&pool->lock);
fprintf(stderr, "任务过多!\\n");
free(new_task); //释放任务空间
return false;
}
struct task *tmp = pool->task_list;
while(tmp->next != NULL) //从任务结点取到不为空,就是取到任务
{
tmp = tmp->next; //把这个任务取出来,拿到tmp上
}
tmp->next = new_task;
pool->waiting_tasks++;
pthread_mutex_unlock(&pool->lock); //释放锁资源
pthread_cond_signal(&pool->cond); //唤醒一个休眠的线程
return true;
}
//复制文件函数
void *copyfile(void * arg)
{
/*
把要打开的文件路径放入source_file变量下(路径)
打开源文件--------可读 判断打开是否成功
创建目标文件,即复制文件 判断打开是否结束
利用read函数和write函数,实现文件读写
关闭打开的两个文件
*/
file_path *dofile = (file_path *)arg; //arg是文件参数
struct stat file_stat; //这个结构体来自#include <sys/stat.h>
stat(dofile->source_file, &file_stat);
//通过文件名 获取文件的路径把他存放到结构体的sourc_file里面
int source_fd,copy_fd;
source_fd = open(dofile->source_file,O_RDONLY);//打开源路径,只读模式
if(source_fd == -1 )
{
printf("打开文件 %s\\n 失败!\\n",dofile->source_file);
//打开文件失败
return NULL;
}
copy_fd = open(dofile->copy_file,O_CREAT | O_TRUNC | O_RDWR,file_stat.st_mode);
if( copy_fd == -1)
{
printf("打开文件 %s 失败!\\n",dofile->copy_file);
return NULL;
}
int nread;
char buf[100];
while((nread = read(source_fd,buf,100)) > 0) //读取源文件的内容
{
if( write(copy_fd,buf,nread) == -1) //把读到的全部写进目标文件
{
break;
}
}
printf("读取文件: %s\\n",dofile->source_file);
printf("写入文件: %s\\n",dofile->copy_file);
close(source_fd); //关闭源文件
close(copy_fd); //关闭复制文件
return NULL;
}
//复制目录函数
int copydir( file_path *dofile,thread_pool *pool)
{
struct stat file_stat;
stat(dofile->source_file,&file_stat); //获取文件的属性
mkdir(dofile->copy_file,file_stat.st_mode); //以源目录的类型和目录来创建一个目录
DIR *sourcedir = opendir(dofile->source_file); //打开源目录
struct dirent *dp;
while( (dp = readdir(sourcedir))!=NULL ) //获取文件夹内文件的信息
{
if(dp->d_name[0] == \'.\') //如果文件为. 或者 .. 则跳过
{
continue;
}
//对本目录下的所有文件进行拷贝操作
file_path *tmpfile = malloc(sizeof(file_path)); //为文件结构体开辟内存空间
memset(tmpfile,0,sizeof(file_path)); //对内存清零
sprintf(tmpfile->source_file,"%s/%s",dofile->source_file,dp->d_name);//拼凑源文件路径
sprintf(tmpfile->copy_file,"%s/%s",dofile->copy_file,dp->d_name);//拼凑目标文件路径
struct stat tmpstat;
stat(tmpfile->source_file,&tmpstat);
if(S_ISREG(tmpstat.st_mode)) //如果为普通文件,则拷贝
{
printf("tmpfile->source_file = %s\\n",tmpfile->source_file);
printf("tmpfile->copy_file = %s\\n", tmpfile->copy_file);
printf("\\n");
add_task( pool, copyfile, tmpfile); //把复制的任务丢到任务链表
}
else if(S_ISDIR(tmpstat.st_mode))//如果为目录,则递归
{
copydir(tmpfile,pool);
}
}
return 0;
}
//摧毁线程池
bool destroy_pool(thread_pool *pool)
{
pool->shutdown = true;
pthread_cond_broadcast(&pool->cond);
sleep(1);//等待空闲线程响应完毕
int i;
for(i=0; i<pool->active_threads; i++)
{
errno = pthread_join(pool->tids[i], NULL);
if(errno != 0)
{
printf("copy tids[%d] error: %s\\n",i, strerror(errno));
}
else
printf("[%u] is copyed\\n", (unsigned)pool->tids[i]);
}
free(pool->task_list);
free(pool->tids);
free(pool);
return true;
}
//函数1:初始化线程池 pool_init(pool,100);
int main(int argc, char const *argv[])
{
//第一步: if-判断输入命令是否规范规范
if(argc != 3)
{
printf("你的命令格式错误!\\n");
printf("请按右侧格式run: ./%s 源文件路径 生成文件路径 \\n",argv[0]);
return -1; //命令异常退出
}
//第二步: 初始化线程池
thread_pool *pool = malloc(sizeof(thread_pool));//定义线程池的poor指针,初始化内存空间
/*调用线程初始化函数------pool_init()*/
pool_init(pool,100); //调用函数初始化线程,分配内存(初始化线程函数会调用routine)
//第三步: 创建对象文件,存储源文件和复制文件路径
file_path dofile; //创建一个路径实例dofile,存储源文件和复制文件的路径
strcpy(dofile.source_file,argv[1]); //strcpy是函数 #include <string.h>
strcpy(dofile.copy_file,argv[2]); // char *strcpy(char *dest, const char *src);
struct stat copestat;//这个结构体来自#include <sys/stat.h>
stat(dofile.source_file,&copestat); //stat是函数,将dofile.source_file的信息,获取到&copestat
/*
#include<sys/stat.h>
int stat(const char *restrict pathname,struct stat *restrict buf);
int fstat(int fields,struct stat *buf);
int lstat(const char *restrict pathname,struct stat *restrict buf);
*/
//第四步: 判断文件属性,执行拷贝操作
if(S_ISREG(copestat.st_mode))//如果为普通文件,则拷贝
{
copyfile(&dofile); //直接拷贝,不用添加到任务链表
}
else if(S_ISDIR(copestat.st_mode))//如果为目录,则递归
{
copydir(&dofile,pool);
}
//第五步: 销毁线程池
destroy_pool(pool); //销毁pool destroy破坏
return 0;
}
/*
详细设计步骤:
1、预处理板块:构建
任务结点结构体、
线程池结构体,
文件路径结构体
2、void handler(void *arg) //解除死锁,防止死锁
pthread_mutex_unlock((pthread_mutex_t *)arg);
3、void *routine(void *arg)//提取调用任务链表的任务结点 被pool_init和add_thread调用过
传入pool 定义一个任务结点p(task结构体的实例)
while循环
{
首先调用线程清理函数,清理掉残留的线程,解放锁资源
当没有任务情况下,线程执行休眠操作还是直接销毁(由shutdown决定)
当有任务情况下,提取任务结点(拿出来给p,任务结点数量减少),任务取出后,解放对应锁资源
}
4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
(传入pool)
加锁,条件变量
给pool的各成员变量初始化分配内存
判断是否成功
pool各成员变量初始化赋值为空
for循环逐一创建线程:(返回为0表示创建成功,不为0表示创建失败)
(注意:thread:线程标识符 attr:线程属性设置 start_routine:线程函数起始地址 arg:传递参数)
5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
创建线程,增加线程数,计算可用线程
6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg) //添加任务
先定于一个new_task指针,申请内存,初始化结构体成员变量
内存申请成功?
给互斥锁资源
判断等待任务结点是否超过上限?
定义一个tmp结点指针指向任务链表
把任务结点拿出来,组织起来,形成等待任务链表
任务链表串完了,解除互斥锁哦并且唤醒一个进程
7、void *copyfile(void * arg) //复制文件函数
把要打开的文件路径放入source_file变量下(路径)
打开源文件--------可读 判断打开是否成功
创建目标文件,即复制文件 判断打开是否结束
利用read函数和write函数,实现文件读写
关闭打开的两个文件
8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
普通文件 调用add_task函数复制
目录文件 mkdir创建目录(tmpfile内的文件夹命名) 如果是目录不停递归
10、bool destroy_pool(thread_pool *pool) //摧毁线程池
main()
{
第一步:if-判断输入命令是否规范规范(三个字符串命令:./%s 源文件路径 生成文件路径)
第二步:初始化线程池
thread_pool *pool = malloc(sizeof(thread_pool));
//定义线程池的poor指针,初始化内存空间
//调用线程初始化函数------pool_init()
pool_init(pool,100);
//调用函数初始化线程,分配内存(初始化线程函数会调用routine)
第三步:创建对象文件,存储源文件和复制文件路径
创建一个dofile,存储源文件和复制文件路径
(dofile是file_path结构体的一个实例)dofile.source 源文件路径
(copeatat:提取文件队列,作为判断文件是普通文件还是目录文件的参数)
第四步:判断文件属性,执行拷贝操作
使用st_mode判断文件类型
执行copyfile和copydir操作(复制文件和目录)
第五步:销毁线程池
直接调用destroy_pool函数销毁线程池
}
*/
29多线程(线程池定时器)将一个文件复制多次拆分文件并合并多线程复制文件
线程池
程序启动一个新线程成本是比较高的,因为它涉及到与操作系统交互。而使用线程池可以很好的提高性能,尤其是
当程序中要创建大量生存期很短的线程时,更应该考虑使用线程池。
线程池里的每一个线程代码结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用。
在JDK 5之前,我们必须手动实现自己的线程池,从JDK 5开始,Java内置支持线程池。
package org.westos.demo;
import java.util.concurrent.*;
/**
* @author lwj
* @date 2020/6/7 8:36
*/
public class MyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/*
线程池:容器,存有一定数量线程对象的容器
JDK5新增了一个Executors工厂类来产生线程池,有如下几个方法
public static ExecutorService newCachedThreadPool(): 根据任务的数量来创建线程对应的线程个数
public static ExecutorService newFixedThreadPool(int nThreads): 固定初始化几个线程
public static ExecutorService newSingleThreadExecutor(): 初始化一个线程的线程池
这些方法的返回值是ExecutorService对象,该对象表示一个线程池,可以执行Runnable对象或者Callable对象代表的线程。它提供了如下方法
Future<?> submit(Runnable task)
<T> Future<T> submit(Callable<T> task)
使用步骤:
创建线程池对象
创建Runnable实例
提交Runnable实例
关闭线程池
*/
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("线程执行了这个任务");
}
});
//根据任务数量创建相应数量的线程对象
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("提交第二个任务");
}
});
Future<Integer> submit = executorService.submit(new MyCallable());
System.out.println(submit.get());
executorService.shutdown();
//关闭线程池
}
}
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 100;
}
}
package org.westos.demo2;
import java.util.concurrent.*;
/**
* @author lwj
* @date 2020/6/7 8:36
*/
public class MyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/*
线程池:容器,存有一定数量线程对象的容器
JDK5新增了一个Executors工厂类来产生线程池,有如下几个方法
public static ExecutorService newCachedThreadPool(): 根据任务的数量来创建线程对应的线程个数
public static ExecutorService newFixedThreadPool(int nThreads): 固定初始化几个线程
public static ExecutorService newSingleThreadExecutor(): 初始化一个线程的线程池
这些方法的返回值是ExecutorService对象,该对象表示一个线程池,可以执行Runnable对象或者Callable对象代表的线程。它提供了如下方法
Future<?> submit(Runnable task)
<T> Future<T> submit(Callable<T> task)
使用步骤:
创建线程池对象
创建Runnable实例
提交Runnable实例
关闭线程池
*/
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
executorService.shutdown();
}
}
package org.westos.demo3;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lwj
* @date 2020/6/7 8:36
*/
public class MyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/*
线程池:容器,存有一定数量线程对象的容器
JDK5新增了一个Executors工厂类来产生线程池,有如下几个方法
public static ExecutorService newCachedThreadPool(): 根据任务的数量来创建线程对应的线程个数
public static ExecutorService newFixedThreadPool(int nThreads): 固定初始化几个线程
public static ExecutorService newSingleThreadExecutor(): 初始化一个线程的线程池
这些方法的返回值是ExecutorService对象,该对象表示一个线程池,可以执行Runnable对象或者Callable对象代表的线程。它提供了如下方法
Future<?> submit(Runnable task)
<T> Future<T> submit(Callable<T> task)
使用步骤:
创建线程池对象
创建Runnable实例
提交Runnable实例
关闭线程池
*/
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
executorService.shutdown();
}
}
定时器与定时任务
package org.westos.demo4;
import java.util.Timer;
import java.util.TimerTask;
/**
* 定时器
* @author lwj
* @date 2020/6/7 9:28
*/
public class MyTest {
public static void main(String[] args) {
Timer timer = new Timer();
//TimerTask抽象类
MyTimerTask myTimerTask = new MyTimerTask(timer);
timer.schedule(myTimerTask, 3000);
//等3s后执行该定时任务
// 取消定时器
// timer.cancel();
}
}
class MyTimerTask extends TimerTask {
public Timer timer;
public MyTimerTask(Timer timer) {
this.timer = timer;
}
@Override
public void run() {
System.out.println("时间到了");
timer.cancel();
}
}
package org.westos.demo4;
import java.util.Timer;
import java.util.TimerTask;
/**
* @author lwj
* @date 2020/6/7 9:36
*/
public class MyTest2 {
public static void main(String[] args) {
Timer timer = new Timer();
MyTimerTask2 myTimerTask2 = new MyTimerTask2();
timer.schedule(myTimerTask2, 3000, 1000);
//等3s后执行此任务,以后间隔1s重复执行定时任务
}
}
class MyTimerTask2 extends TimerTask {
@Override
public void run() {
System.out.println("时间到了");
}
}
package org.westos.demo4;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
/**
* @author lwj
* @date 2020/6/7 9:38
*/
public class MyTest3 {
public static void main(String[] args) throws ParseException {
Timer timer = new Timer();
MyTimerTask3 myTimerTask3 = new MyTimerTask3();
//myTimerTask2.cancel(); 取消定时任务
String str = "2020-06-07 09:49:00";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date parse = sdf.parse(str);
timer.schedule(myTimerTask3, parse);
//指定某个时间执行定时任务
}
}
class MyTimerTask3 extends TimerTask {
@Override
public void run() {
System.out.println("时间到了");
}
}
指定时间删除文件夹
package org.westos.demo4;
import java.io.File;
import java.util.Timer;
import java.util.TimerTask;
/**
* 定时删除一个文件夹
* @author lwj
* @date 2020/6/7 9:49
*/
public class MyDemo {
public static void main(String[] args) {
Timer timer = new Timer();
//定时器
File source = new File("E:\\新建文件夹");
TimerTaskDemo timerTaskDemo = new TimerTaskDemo(source, timer);
timer.schedule(timerTaskDemo, 5000);
//延迟5s后执行定时任务,删除文件夹
}
}
class TimerTaskDemo extends TimerTask {
public File directory;
public Timer timer;
public TimerTaskDemo(File directory, Timer timer) {
this.directory = directory;
this.timer = timer;
}
@Override
public void run() {
//删除文件夹并关闭定时器
deleteDir(directory);
timer.cancel();
//取消定时器
}
public void deleteDir(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles();
for (File file1 : files) {
deleteDir(file1);
}
}
file.delete();
//删除文件或者空文件夹
}
}
练习题
1、把一个文件复制多次
package org.westos.demo5;
import java.io.*;
/**
* @author lwj
* @date 2020/6/7 15:11
*/
public class MyTest {
public static void main(String[] args) throws IOException {
File source = new File("C:\\Users\\shawn\\Music\\MV\\G.E.M. 邓紫棋-我说了算(多芬秀发合作单曲)(蓝光).mp4");
RandomAccessFile rw = new RandomAccessFile(source, "rw");
byte[] bytes = new byte[1024 * 8];
int len = 0;
for (int i = 0; i < 3; i++) {
FileOutputStream fos = new FileOutputStream(new File("E:\\music", "G.E.M" + i + ".mp4"));
while ((len = rw.read(bytes)) != -1) {
fos.write(bytes, 0, len);
fos.flush();
}
fos.close();
//每复制一个文件,将随机读取流的指针重置为0
rw.seek(0);
}
rw.close();
}
}
2、拆分文件并合并
package org.westos.demo5;
import java.io.*;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Vector;
/**
* @author lwj
* @date 2020/6/7 15:24
*/
public class MyTest2 {
public static void main(String[] args) throws IOException {
split();
merge();
}
private static void merge() throws IOException {
File source = new File(".");
System.out.println(source.getAbsolutePath());
File[] files = source.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.isFile() && pathname.length() <= 1024 * 1024 && pathname.getName().endsWith(".mp3");
}
});
Vector<FileInputStream> vector = new Vector<>();
for (File file : files) {
vector.addElement(new FileInputStream(file));
}
Enumeration<FileInputStream> elements = vector.elements();
SequenceInputStream sis = new SequenceInputStream(elements);
byte[] bytes = new byte[1024];
int len = 0;
FileOutputStream fos = new FileOutputStream(new File("百事.mp3"));
while ((len = sis.read(bytes)) != -1) {
fos.write(bytes, 0, len);
fos.flush();
}
fos.close();
sis.close();
}
private static void split() throws IOException {
//把一份文件拆分为多份,每份1M,然后合并
File source = new File("C:\\Users\\shawn\\Music\\G.E.M. 邓紫棋 _ 王嘉尔 - 热爱就一起.mp3");
FileInputStream fis = new FileInputStream(source);
byte[] bytes = new byte[1024 * 1024];
int len = 0;
int index = 0;
while ((len = fis.read(bytes)) != -1) {
FileOutputStream fos = new FileOutputStream(new File(index++ + ".mp3"));
fos.write(bytes, 0, len);
fos.close();
}
fis.close();
}
}
3、多线程复制文件
package org.westos.demo5;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
/**
* @author lwj
* @date 2020/6/7 15:49
*/
public class MyDemo {
public static void main(String[] args) throws FileNotFoundException {
//多线程复制同一份文件
File source = new File("C:\\Users\\shawn\\Music\\G.E.M. 邓紫棋 _ 王嘉尔 - 热爱就一起.mp3");
File destFile = new File(".\\邓紫棋-热爱就一起.mp3");
long length = source.length();
//文件的总大小
long threadNums = 3;
long average = length / threadNums;
for (int i = 0; i < 3; i++) {
long start = i * (average + 1);
long end = (i + 1) * average;
new CopyFileThread(start, end, source, destFile).start();
}
//如果3个线程不能均分,那么再添加一个线程
if (length % threadNums != 0) {
long start = threadNums * average + 1;
new CopyFileThread(start, length, source, destFile).start();
}
}
}
class CopyFileThread extends Thread {
public long start;
public long end;
public RandomAccessFile in;
public RandomAccessFile out;
public CopyFileThread(long start, long end, File srcFile, File destFile) throws FileNotFoundException {
this.start = start;
this.end = end;
this.in = new RandomAccessFile(srcFile, "rw");
this.out = new RandomAccessFile(destFile, "rw");
}
@Override
public void run() {
try {
in.seek(start);
out.seek(start);
//设置读写起始指针
byte[] bytes = new byte[1024 * 8];
int len = 0;
long pointer = in.getFilePointer();
while (pointer <= end && (len = in.read(bytes)) != -1) {
//最后一次有可能只剩余1KB,然而最后一次还是会读取8KB,然后pointer > end退出while
out.write(bytes, 0, len);
pointer += len;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
以上是关于线程池实现文件复制操作的主要内容,如果未能解决你的问题,请参考以下文章
29多线程(线程池定时器)将一个文件复制多次拆分文件并合并多线程复制文件
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段