java socket实现,如果有并发访问100个用户,然后每个用户10个处理一次,该怎么写这个socket程序呢!

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java socket实现,如果有并发访问100个用户,然后每个用户10个处理一次,该怎么写这个socket程序呢!相关的知识,希望对你有一定的参考价值。

各位高手帮帮忙,代码发到邮箱:yanshuailong8917@163.com

参考技术A 使用一个for来实现啊 而下面的问题就是说如果有19个 那么先要看你那19个客户端是无限次连接线 如果是无限次连接那就可以不管它了 如果不是 那么可以在里面写一个时钟就可以了呀追问

那如果在加上线程池,该怎么写呢!

参考技术B 什么叫做“每个用户10个处理一次”

是不是想说 “每10个用户处理一次” ?追问

恩是了!就是
每10个用户处理一次!谢谢你的纠正,那你知道怎么做吗!

追答

知道 但是没时间给你写 = =

你在服务器端 对用户进行10个的处理操作。。那么如果有19个用户呢 最后那9个等着?等着第20个用户触发,处理操作么?你的初衷是为了减少代码压力吧。。可是造成了用户通讯逻辑等待的问题啊

本回答被提问者采纳

Java笔记:并发工具

一、基础知识

并发工具定义了一些核心特征,用于以其他方式实现同步和线程间通信。

  • 同步器:提供了同步多线程间交互的高级方法。
  • 执行器:管理线程的执行。
  • 并发集合:提供了由集合框架定义的相关类的并发替代版本。
  • Fork/Join框架:支持并行编程。

 

二、同步对象使用

Semaphore实现了经典的信号量,信号量通过计数器控制对共享资源的访问。如果计数器大于0则允许访问,如果计数器为0则拒绝访问。希望获得共享资源的线程尝试获得许可证,若允许访问则线程可得到许可证,若不允许访问则线程阻塞直至得到许可证为止。

技术分享图片
import java.util.concurrent.Semaphore;

class Solution {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(1);
        Integer i = 0;
        new IncThread(semaphore);
        new DecThread(semaphore);
    }
}

class Shared {
    static int integer = 0;
}

class IncThread implements Runnable {
    private Semaphore semaphore;

    IncThread(Semaphore semaphore) {
        this.semaphore = semaphore;
        new Thread(this).start();
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();//阻塞直至得到许可
            for (int i = 0; i < 5; i++) {
                Shared.integer++;
                System.out.println(Shared.integer);
                Thread.sleep(500);
            }
        } catch (InterruptedException exc) {
            System.out.println(exc.getMessage());
        }
        semaphore.release();//释放许可
    }
}

class DecThread implements Runnable {
    private Semaphore semaphore;

    DecThread(Semaphore semaphore) {
        this.semaphore = semaphore;
        new Thread(this).start();
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            for (int i = 0; i < 5; i++) {
                Shared.integer--;
                System.out.println(Shared.integer);
                Thread.sleep(500);
            }
        } catch (InterruptedException exc) {
            System.out.println(exc.getMessage());
        }
        semaphore.release();
    }
}
View Code

如果希望线程进行等待,直到发生一个或多个事件为止。CountDownLatch可用于处理这类情况,计数器必须从指定事件数量归零,锁存器才会被释放。

技术分享图片
import java.util.concurrent.CountDownLatch;

class Solution {
    public static void main(String[] args) {
        CountDownLatch cdl = new CountDownLatch(5);

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println(i);
                cdl.countDown();
            }
        }).start();

        try {
            cdl.await();//暂停主线程直至计数器递减5次
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }

        System.out.println("Hello");
    }
}
View Code

如果由多个线程组成的线程组必须在某处进行等待,直到线程组中所有线程都到达执行点。CyclicBarrier可用于处理这类情况,同步对象会被挂起直至指定数量的线程都到达预定位置为止。

技术分享图片
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

class Solution {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(10, () -> System.out.println("Hello"));
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    int t = (int) (Math.random() * 5000);
                    Thread.sleep(t);
                    System.out.println(t);
                    barrier.await();//指定数量的线程调用await后释放被挂起的线程
                } catch (InterruptedException | BrokenBarrierException e) {
                    System.out.println(e.getMessage());
                }
            }).start();
        }
    }
}
View Code

Exchanger用于简化两个线程之间的数据交换。简单的进行等待,直到两个独立的线程均调用exchange方法为止,之后交换线程所提供的数据。

技术分享图片
import java.util.Arrays;
import java.util.concurrent.Exchanger;

class Solution {
    public static void main(String[] args) {
        Exchanger<int[]> arrayExchanger = new Exchanger<>();
        Runnable consumer = () -> {
            try {
                for (int i = 0; i < 3; i++) {
                    int[] arr = arrayExchanger.exchange(new int[5]);
                    System.out.println(Arrays.toString(arr));
                }
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        };

        Runnable producer = () -> {
            int[] arr = new int[5];
            try {
                for (int i = 0; i < 15; i++) {
                    arr[i % 5] = i;
                    if ((i + 1) % 5 == 0)
                        arr = arrayExchanger.exchange(arr);
                }
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        };

        new Thread(consumer).start();
        new Thread(producer).start();
    }
}
View Code

Phaser允许多个线程进行同步。Phaser对象会等待所有party(等价于线程)完成当前阶段后才会进入下阶段,通过调用arrive方法或其变体通知当前阶段完成。

技术分享图片
import java.util.concurrent.Phaser;

class MyPhaser extends Phaser {
    private int total;

    MyPhaser(int parties, int total) {
        super(parties);
        this.total = total - 1;
    }

    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.out.println("Phase " + phase + " completed");
        return phase == total || registeredParties == 0;//返回true则结束Phaser
    }
}

class Solution {
    public static void main(String[] args) {
        Phaser phaser = new MyPhaser(0, 3);
        Runnable runnable = () -> {
            phaser.register();
            while (!phaser.isTerminated()) {
                System.out.println("Phase " + phaser.getPhase() + " started");
                phaser.arriveAndAwaitAdvance();
            }
        };

        for (int i = 0; i < 3; i++)
            new Thread(runnable).start();
    }
}
View Code

 

三、执行器

执行器用于启动并控制线程的执行,因此执行器为通过Thread管理线程提供了一种代替方案。执行器的核心是Executor接口,ExecutorService接口扩展了Executor接口。

线程池提供了用于执行各种任务的一组线程,每个任务不是使用独立的线程而是线程池中的线程。线程池减轻了创建许多独立线程所带来的负担,通过少量线程执行大量任务。

技术分享图片
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Solution {
    public static void main(String[] args) {
        CountDownLatch[] cdls = new CountDownLatch[5];
        for (int i = 0; i < cdls.length; i++)
            cdls[i] = new CountDownLatch(5);
        ExecutorService pool = Executors.newFixedThreadPool(3);//创建线程池

        for (CountDownLatch cdl : cdls)
            pool.execute(() -> {
                for (int j = 0; j < 5; j++) {
                    System.out.println(j);
                    cdl.countDown();
                }
            });

        try {
            for (CountDownLatch cdl : cdls)
                cdl.await();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
        pool.shutdown();//关闭线程池
    }
}
View Code

泛型接口Callable表示返回值的线程。应用程序可以使用Callable对象计算结果后,将结果返回给调用线程。call方法定义希望执行的任务,在任务完成后返回结果。Callable任务通过调用ExecutorService对象的submit方法执行。

泛型接口Future表示将由Callable对象返回的值。

技术分享图片
import java.util.concurrent.*;

class Add implements Callable<Integer> {
    private int[] arr;

    Add(int... arr) {
        this.arr = arr;
    }

    @Override
    public Integer call() {
        int sum = 0;
        for (int i : arr)
            sum += i;
        return sum;
    }
}

class Solution {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        Future<Integer> future = pool.submit(new Add(1, 2, 3));
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            System.out.println(e.getMessage());
        }
        pool.shutdown();
    }
}
View Code


四、时间

TimeUnit枚举用于指定时间单位。

技术分享图片
import java.util.concurrent.TimeUnit;

class Solution {
    public static void main(String[] args) {
        try {
            TimeUnit.SECONDS.sleep(3);//暂停3秒
            System.out.println(TimeUnit.SECONDS.toMillis(1));//单位转换
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }
}
View Code

 

五、锁

锁为使用synchronized控制共享资源的访问提供了替代技术。在访问共享资源自谦,申请用于保护资源的锁,当资源访问完成后释放锁。当某线程正在使用锁时,其他尝试申请锁的线程会被挂起。

所有锁都实现了Lock接口,申请锁时调用lock方法,如果不可得就会进行等待。如果不希望进行等待就使用tryLock方法。使用newCondition方法获取与锁关联的Condition对象之后可通过await或signal等方法控制锁,类似于wait和notify方法。

技术分享图片
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

class Shared {
    static int data = 0;
}

class Solution {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Runnable runnable = () -> {
            lock.lock();//申请锁
            Shared.data++;
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException exc) {
                System.out.println(exc.getMessage());
            } finally {
                System.out.println(Shared.data);
                lock.unlock();//释放锁
            }
        };

        for (int i = 0; i < 10; i++)
            new Thread(runnable).start();
    }
}
View Code

 

六、原子操作

当读写某些类型的变量时,原子操作提供了一种不可中断的方案。这意味着不再需要锁以及其他同步机制。

技术分享图片
import java.util.concurrent.atomic.AtomicInteger;

class Shared {
    static AtomicInteger integer = new AtomicInteger(0);
}

class Solution {
    public static void main(String[] args) {
        Runnable runnable = () -> {
            for (int i = 0; i < 3; i++)
                System.out.println(Shared.integer.getAndAdd(1));
        };

        for (int i = 0; i < 5; i++)
            new Thread(runnable).start();
    }
}
View Code


七、并行编程

Fork/Join框架通过简化多线程的创建使用和自动使用多处理器,增强了多线程编程。

ForkJoinTask用于定义能够被ForkJoinPool管理的任务,泛型参数指定了任务结果的类型。fork方法为调用任务的异步执行提交任务,join方法等待调用该方法的任务中止,invoke和invokeAll方法则将fork和join合并到单个调用中。

RecursiveAction和RecursiveTask均为ForkJoinTask的子类。前者用于封装不返回结果的任务,后者用于封装返回结果的任务。

ForkJoinPool用于管理ForkJoinTask。若调用需等待的任务则使用线程池的invoke方法(同步),若不需要等待任务完成则调用execute方法(异步)。

技术分享图片
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

class Sqrt extends RecursiveAction {//分治策略
    double[] arr;
    int front, rear;

    Sqrt(double[] arr, int front, int rear) {
        this.arr = arr;
        this.front = front;
        this.rear = rear;
    }

    @Override
    protected void compute() {
        if (rear - front < 100) {
            for (int i = front; i <= rear; i++)
                arr[i] = Math.sqrt(arr[i]);
        } else {
            int middle = (front + rear) / 2;
            invokeAll(new Sqrt(arr, front, middle), new Sqrt(arr, middle + 1, rear));
        }
    }
}

class Solution {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        double[] arr = new double[10000];
        for (int i = 0; i < arr.length; i++)
            arr[i] = (double) i;
        System.out.println(Arrays.toString(arr));

        Sqrt sqrt = new Sqrt(arr, 0, arr.length - 1);
        pool.invoke(sqrt);
        System.out.println(Arrays.toString(arr));
    }
}
View Code

若没有显示声明ForkJoinPool,则会自动使用公共池,使用commonPool可获取公共池的引用。

技术分享图片
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

class Sum extends RecursiveTask<Integer> {
    int[] arr;
    int front, rear;

    Sum(int[] arr, int front, int rear) {
        this.arr = arr;
        this.front = front;
        this.rear = rear;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (rear - front < 100) {
            for (int i = front; i <= rear; i++)
                sum += arr[i];
        } else {
            int middle = (front + rear) / 2;
            Sum taskA = new Sum(arr, front, middle);
            Sum taskB = new Sum(arr, middle + 1, rear);
            taskA.fork();
            taskB.fork();
            sum = taskA.join() + taskB.join();
        }
        return sum;
    }
}

class Solution {
    public static void main(String[] args) {
        ForkJoinPool pool = ForkJoinPool.commonPool();
        int[] arr = new int[10000];
        for (int i = 0; i < arr.length; i++)
            arr[i] = i;
        System.out.println(pool.invoke(new Sum(arr, 0, arr.length - 1)));
    }
}
View Code

 



以上是关于java socket实现,如果有并发访问100个用户,然后每个用户10个处理一次,该怎么写这个socket程序呢!的主要内容,如果未能解决你的问题,请参考以下文章

IO多路复用, 基于IO多路复用+socket实现并发请求(一个线程100个请求), 协程

Nodejs & Socket.io 可以支持多少用户?

服务器中并发与并行的区别

java Socket编程,如何实现成千上万个客户端同时访问,服务器能同时做出响应?最后有程序例子,谢了

关于JAVA编写的WEB程序多次并发访问数据库的问题

Nginx优化_访问并发量(进程可以打开的最大文件数量)