java socket多线程问题,我写了一个socket tcp服务端,高手来进来看下,谢啦。是关于多客户端并联的问题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java socket多线程问题,我写了一个socket tcp服务端,高手来进来看下,谢啦。是关于多客户端并联的问题相关的知识,希望对你有一定的参考价值。
我的处理方法是,每来一条tcp连接,我就为这一个客户端开一条线程进行数据通信解析处理。那么,如果有5000甚至更多的客户端来进行连接,我将有可能同时产生5000+线程,而我查过线程的最大数取决于可用虚拟内存的大小,默人一个线程可拥有1M的栈空间,所以一般来说也只可以创建4000+个线程(虚拟内存是4G的话)。。。
就算可以创建5000+个线程满足这些客户端进行连接,那么我想这么多的线程估计服务器CPU也吃不消。。。
不知道各位高手有没有遇到处理这样的情景,比如QQ是怎么做的,给小弟点建议啥的。。。不甚感激。另外我们这项目只能采用tcp通信。
PS:一般我们创建一条线程都是通过new一个对象,然后.start()进行启动线程。我们一般认为start()方法执行结束后也就意味线程已经停止对象也就没有用了该进行销毁了。可是我测试发现,就算start()已经执行结束了,对象还在,还是可以读取到对象的属性。。。说明对象并没有被销毁。这样就有一个问题,如果同时存在很多线程比如5000+,那就会产生很多的没有被销毁的对象内存垃圾。。。我又想到了对象单例模式,可是我觉得我用不上,因为启动线程的时候我给线程里面变量赋值是通过构造函数的方式。。。如下代码显示:
public class ShujubaoThread extends Thread
byte[] bs;
public ShujubaoThread(byte[] mybt)
this.bs=mybt;
public void run()
//开始解析数据包
int length=bs.length;
//。。。
//解析数据包结束
public static void main(String[] strings)
//第1个客户端建立连接,解析收到的数据包
ShujubaoThread t1=new ShujubaoThread(new byte[1]);
t1.start();
//第2个客户端建立连接,解析收到的数据包
ShujubaoThread t2=new ShujubaoThread(new byte[1]);
t2.start();
//第3个客户端建立连接,解析收到的数据包
ShujubaoThread t3=new ShujubaoThread(new byte[1]);
t3.start();
//第4个,第5个,第6个,第7个,第8个,第9个...
不知道我这做法对不对,或者有什么更好的方法。。。
我是第一次写socket服务端,对线程也不是太熟,希望给我高手指点一二
拜谢。。。 再次拜谢。。。
韩顺平java视频教程里的qq项目里有详细的解说!追问
恩,我现在就是想知道,如果需要一下子连接5000+个客户端,也就是可能同时需要new 5000+条线程,这么多线程肯定服务器会够呛,有没有好的解决办法
追答如果你有这么多的客户,你就要有强大的服务器!你知道腾讯的服务器有几多吗?这个不是技术就是能解决的,还是钱的问题!
参考技术A 。。。那你就先一样一样学 然后再往起装 就光java的多线程就够给你讲一周你都弄不明白的 参考技术B 抱歉,我也不懂的哦面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~
但如果线程池设置不当就会引起一系列问题, 下面就说下我最近碰到的问题。
案件还原
比如你有一个项目中有个接口部分功能使用了线程池,这个功能会去调用多个第三方接口,都有一定的耗时,为了不影响主流程的性能,不增加整体响应时间,所以放在线程池里和主线程并行执行,等线程池里的任务执行完通过future.get的方式获取线程池里的线程执行结果,然后合并到主流程的结果里返回,大致流程如下:
线程池参数为:
- coresize:50
- max:200
- queuesize:1
- keepalivetime:60s
- 拒绝策略为reject
假设每次请求提交5个task到线程池,平均每个task是耗时50ms
没过一会就收到了线程池满了走了拒绝策略的报错
结合你对线程池的了解,先思考下为什么
线程池的工作流程如下:
根据这个我们来列一个时间线
- 项目刚启动 第1次请求(每次5个task提交到线程池),创建5个核心线程
- 第2次请求 继续创建5个(共10个核心线程了)
- 直到第10次 核心线程数会达满50个
- 核心线程处理完之后核心线程会干嘛呢
根据 jdk1.8的线程池的源码:
线程池的线程处理处理了交给它的task之后,它会去getTask()
源码如下:
private Runnable getTask()
boolean timedOut = false; // Did the last poll() time out?
for (;;)
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))
decrementWorkerCount();
return null;
//加入Java开发交流君样:756584822一起吹水聊天
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty()))
if (compareAndDecrementWorkerCount(c))
return null;
continue;
try
//注意这段
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
catch (InterruptedException retry)
timedOut = false;
请注意上面代码中的bool类型的timed的赋值逻辑,
由于allowCoreThreadTimeOut默认为false,也就是说:
只要创建的线程数量超过了核心线程数,那么干完手上活后的线程(不管是核心线程,还是超过队列后新开的线程)就会走进
//线程状态为 timedwaiting
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
由于我们上面步骤里面还没有超过coresize所以会走进
//线程状态为 waiting
workQueue.take()
所以答案是:上面步骤干活的核心线程处理完之后核心线程会进入waiting状态,
只要队列一有活就会被唤醒去干活。
- 到第11次的时候
好家伙,到这步骤的时候 ,核心线程数已满,那么就往队列里面塞,但是设置的queuesize=1,
每次有5个task,那就是说往队列里面塞1个,剩下4个(别较真我懂你意思)要创建新的max线程了。
结果:
核心线程数:50
队列:1
max线程:4个
因为50个核心线程在waiting中,所以队列只要一add,就会立马被消费,假设消费的这个核心线程名字是小A。
这里要细品一下:
这里已经总线程数大于核心线程数了,那么getTask()里面
// timed=true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
那么小A干完活就会走进
//线程状态为 timedwaiting
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
此处核心线程小A就会变成timedwaiting的状态(keepalive设置的是60s)
- 到第12次的时候
继续往队列塞1个,创建4个max线程,max线程已经有8个了
这里 又会有一个新的核心线程小B ,会变成timedwaiting状态了
max线程们干完手上的活后,也会去调用getTask() 也会进入timedwaiting状态
因为queuesize=1,狼多肉少
- 继续下去,那么最终会变成
max满了,线程们都在timedwaiting(keepalive设置的是60s)
新的提交就会走拒绝策略了
问题总结
其实核心与非核心对于线程池来说都是一样的,只要一旦线程数超过了核心线程数,那么线程就会走进timewaiting
把queuesize调大就好了?
这里又有一个新的注意点:
上面举例的是I/O密集型业务,queuesize不是越大越好的,
因为:
线程池新创建的线程会优先处理新请求进来的任务,而不是去处理队列里的任务,队列里的任务只能等核心线程数忙完了才能被执行,这样可能造成队列里的任务长时间等待,导致队列积压,尤其是I/O密集场景
慎用CallRunnerPolicy这个拒绝策略
一定得理解这个策略会带来什么影响,
先看下这个拒绝策略的源码
如果你提交线程池的任务即时失败也没有关系的话,用这个拒绝策略是致命的,
因为一旦超过线程池的负载后开始吞噬tomcat线程。
用future.get的方式慎用DiscardPolicy这个拒绝策略
如果需要得到线程池里的线程执行结果,使用future的方式,拒绝策略不建议使用DiscardPolicy,这种丢弃策略虽然不执行子线程的任务,
但是还是会返回future对象(其实在这种情况下我们已经不需要线程池返回的结果了),然后后续代码即使判断了future!=null也没用,
这样的话还是会走到future.get()方法,如果get方法没有设置超时时间会导致一直阻塞下去
类似下面的伪代码:
// 如果线程池已满,新的请求会被直接执行拒绝策略,此时如果拒绝策略设置的是DiscardPolicy丢弃任务,
// 则还是会返回future对象, 这样的话后续流程还是可能会走到get获取结果的逻辑
Future<String> future = executor.submit(() ->
// 业务逻辑,比如调用第三方接口等操作
return result;
);
// 主流程调用逻辑
if(future != null) // 如果拒绝策略是DiscardPolicy还是会走到下面代码
future.get(超时时间); // 调用方阻塞等待结果返回,直到超时
推荐解决方案
- 用动态线程池,可以动态修改coresize,maxsize,queuesize,keepalivetime
对线程池的核心指标进行埋点监控,可以通过继承 ThreadPoolExecutor 然后Override掉beforeExecute,afterExecute,shutdown,shutdownNow方法,进行埋点记录到es
可以埋点的数据有:
包括线程池运行状态、核心线程数、最大线程数、任务等待数、已完成任务数、线程池异常关闭等信息
基于以上数据,我们可以实时监控和排查定位问题
参考代码:
/**
* 自定义线程池<p>
* 1.监控线程池状态及异常关闭等情况<p>
* 2.监控线程池运行时的各项指标, 比如:任务执行时间、任务等待数、已完成任务数、任务异常信息、核心线程数、最大线程数等<p>
* author: maoyingxu
*/
public class ThreadPoolExt extends ThreadPoolExecutor
private TimeUnit timeUnit;
public ThreadPoolExt(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeUnit = unit;
//加入Java开发交流君样:756584822一起吹水聊天
@Override
protected void beforeExecute(Thread t, Runnable r)
monitor("ThreadPool monitor data:"); // 监控线程池运行时的各项指标
@Override
protected void afterExecute(Runnable r, Throwable ex)
// 记录线程池执行任务的时间
ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("ThreadPool task executeTime:0", executeTime));
if (ex != null) // 监控线程池中的线程执行是否异常
LogUtils.warn("unknown exception caught in ThreadPool afterExecute:", ex);
@Override
public void shutdown()
monitor("ThreadPool will be shutdown:"); // 线程池将要关闭事件,此方法会等待线程池中正在执行的任务和队列中等待的任务执行完毕再关闭
super.shutdown();
@Override
public List<Runnable> shutdownNow()
monitor("ThreadPool going to immediately be shutdown:"); // 线程池立即关闭事件,此方法会立即关闭线程池,但是会返回队列中等待的任务
// 记录被丢弃的任务, 目前只记录日志, 后续可根据业务场景做进一步处理
List<Runnable> dropTasks = null;
try
dropTasks = super.shutdownNow();
ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("0ThreadPool discard task count:12",
System.lineSeparator(), dropTasks!=null ? dropTasks.size() : 0, System.lineSeparator()));
catch (Exception e)
LogUtils.addClogException("ThreadPool shutdownNow error", e);
//加入Java开发交流君样:756584822一起吹水聊天
return dropTasks;
/**
* 监控线程池运行时的各项指标, 比如:任务等待数、任务异常信息、已完成任务数、核心线程数、最大线程数等
* @param title
*/
private void monitor(String title)
try
// 线程池监控信息记录, 这里需要注意写ES的时机,尤其是多个子线程的日志合并到主流程的记录方式
String threadPoolMonitor = MessageFormat.format(
"01core pool size:2, current pool size:3, queue wait size:4, active count:5, completed task count:6, " +
"task count:7, largest pool size:8, max pool size:9, keep alive time:10, is shutdown:11, is terminated:12, " +
"thread name:1314",
System.lineSeparator(), title, this.getCorePoolSize(), this.getPoolSize(),
this.getQueue().size(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(timeUnit != null ? timeUnit : TimeUnit.SECONDS), this.isShutdown(),
this.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());
ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, threadPoolMonitor);
LogUtils.info(title, threadPoolMonitor);
ELKLogUtils.addFieldValue(APPIndexedLogTag.THREAD_POOL_USE_RATE, useRate); // ES埋点线程池使用率, useRate = (getActiveCount()/getMaximumPoolSize())*100
Cat.logEvent(key, String.valueOf(useRate)); // 报警设置
catch (Exception e)
LogUtils.addClogException("ThreadPool monitor error", e);
- 重写线程池拒绝策略, 拒绝策略主要参考了 Dubbo的线程池拒绝策略
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy
// 省略部分代码
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
+ "%d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg); // 记录最大负载情况下线程池的核心线程数,活跃数,最大线程数等参数
dumpJStack(); // 记录线程堆栈信息包括锁争用信息
throw new RejectedExecutionException(msg);
private void dumpJStack()
long now = System.currentTimeMillis();
//dump every 10 minutes 每隔10分钟记录一次
if (now - lastPrintTime < TEN_MINUTES_MILLS)
return;
//加入Java开发交流君样:756584822一起吹水聊天
if (!guard.tryAcquire()) // 加锁访问
return;
ExecutorService pool = Executors.newSingleThreadExecutor(); // 这里单独开启一个新的线程去执行(阿里的Java开发规范不允许直接调用Executors.newSingleThreadExecutor, 估计dubbo那时候还没出开发规范...)
pool.execute(() ->
String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));
SimpleDateFormat sdf;
String os = System.getProperty(OS_NAME_KEY).toLowerCase();
// window system dont support ":" in file name
if (os.contains(OS_WIN_PREFIX))
sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
else
sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
String dateStr = sdf.format(new Date());
//try-with-resources
try (FileOutputStream jStackStream = new FileOutputStream(
new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr)))
JVMUtil.jstack(jStackStream);
catch (Throwable t)
logger.error("dump jStack error", t);
finally
guard.release();
lastPrintTime = System.currentTimeMillis();
);
//must shutdown thread pool ,if not will lead to OOM
pool.shutdown();
最后,祝大家早日学有所成,拿到满意offer
以上是关于java socket多线程问题,我写了一个socket tcp服务端,高手来进来看下,谢啦。是关于多客户端并联的问题的主要内容,如果未能解决你的问题,请参考以下文章
面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~