如何提高nfs的并发处理能力
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何提高nfs的并发处理能力相关的知识,希望对你有一定的参考价值。
参考技术A 有什么方法衡量服务器并发处理能力1. 吞吐率
吞吐率,单位时间里服务器处理的最大请求数,单位req/s
从服务器角度,实际并发用户数的可以理解为服务器当前维护的代表不同用户的文件描述符总数,也就是并发连接数。服务器一般会限制同时服务的最多用户数,比如apache的MaxClents参数。
这里再深入一下,对于服务器来说,服务器希望支持高吞吐率,对于用户来说,用户只希望等待最少的时间,显然,双方不能满足,所以双方利益的平衡点,就是我们希望的最大并发用户数。
2. 压力测试
有一个原理一定要先搞清楚,假如100个用户同时向服务器分别进行10个请求,与1个用户向服务器连续进行1000次请求,对服务器的压力是一样吗?实际上是不一样的,因对每一个用户,连续发送请求实际上是指发送一个请求并接收到响应数据后再发送下一个请求。这样对于1个用户向服务器连续进行1000次请求, 任何时刻服务器的网卡接收缓冲区中只有1个请求,而对于100个用户同时向服务器分别进行10个请求,服务器的网卡接收缓冲区最多有100个等待处理的请求,显然这时的服务器压力更大。
压力测试前提考虑的条件
并发用户数: 指在某一时刻同时向服务器发送请求的用户总数(HttpWatch)
总请求数
请求资源描述
请求等待时间(用户等待时间)
用户平均请求的等待时间
服务器平均请求处理的时间
硬件环境
压力测试中关心的时间又细分以下2种:
用户平均请求等待时间(这里暂不把数据在网络的传输时间,还有用户PC本地的计算时间计算入内)
服务器平均请求处理时间
用户平均请求等待时间主要用于衡量服务器在一定并发用户数下,单个用户的服务质量;而服务器平均请求处理时间就是吞吐率的倒数,一般来说,用户平均请求等待时间 = 服务器平均请求处理时间 * 并发用户数
怎么提高服务器的并发处理能力
1. 提高CPU并发计算能力
服务器之所以可以同时处理多个请求,在于操作系统通过多执行流体系设计使得多个任务可以轮流使用系统资源,这些资源包括CPU,内存以及I/O. 这里的I/O主要指磁盘I/O, 和网络I/O。
多进程 & 多线程
多执行流的一般实现便是进程,多进程的好处可以对CPU时间的轮流使用,对CPU计算和IO操作重叠利用。这里的IO主要是指磁盘IO和网络IO,相对CPU而言,它们慢的可怜。
而实际上,大多数进程的时间主要消耗在I/O操作上。现代计算机的DMA技术可以让CPU不参与I/O操作的全过程,比如进程通过系统调用,使得CPU向网卡或者磁盘等I/O设备发出指令,然后进程被挂起,释放出CPU资源,等待I/O设备完成工作后通过中断来通知进程重新就绪。对于单任务而言,CPU大部分时间空闲,这时候多进程的作用尤为重要。
多进程不仅能够提高CPU的并发度。其优越性还体现在独立的内存地址空间和生命周期所带来的稳定性和健壮性,其中一个进程崩溃不会影响到另一个进程。
但是进程也有如下缺点:
fork()系统调用开销很大: prefork
进程间调度和上下文切换成本: 减少进程数量
庞大的内存重复:共享内存
IPC编程相对比较麻烦
Rust实践-采用线程池提高web服务器的并发处理能力
前言:
在上一篇rust入门实践单线程http服务器时,我们实现的单线程版本满足web第一阶段的理论实现,即通过实现掌握rust编写web服务器的基础(引用net包,链接TCP链接),同时掌握http协议进行请求解析。走出一小步,才能大步快步迈向那个Hello World。
接下来,将通过实现线程池模型来提高服务器的并发能力,线程池模型也是生产环境常用的一种方式,其他的高并发方式还有fork/join(如:jdk7 基于work-strealing思想实现的并行计算)、单线程异步io(如:nginx、redis)。
这里我们主要讲解线程池,也即是常说的threaPool。
内容
1、掌握线程池的设计思路
2、掌握rust的闭包
3、rust的类设计
从发现问题到解决问题
问题
一、单线程会阻塞并发请求
在生产环境中,我们的web服务器面向的是互联网群体,用户量之大,请求之多,没个用户的请求应该是互不影响的,而第一版本的web服务器存在一个问题是当有多个请求并发时,每一个请求必须等待前一个请求处理完才能响应,用户体验极差。那单线程无法处理,我们可以开多个线程呀,每个请求都由一个线程处理不就完美解决了嘛,让我们看下如何实现多线程吧:
fn main() {
let listener = TcpListener::bind("10.86.168.45:9999").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
std::thread::spawn(move || {
handle_connection(stream);
});
}
}
当服务器接收到请求流时,使用rust的thread库,通过调用std::thread::spawn创建线程,其源码如下:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(f).expect("failed to spawn thread")
}
其参数接收的是FnOnce类型闭包,闭包还有其他两种类型:Fn、FnMut区别在于闭包获取环境变量的所有权。
1、FnOnce参数类型是 self,所以,这种类型的闭包会获取变量的所有权,生命周期只能是当前作用域,之后就会被释放了,不能运行多次。
2、FnMut参数类型是 &mut self,所以,这种类型的闭包是可变借用,会改变变量,但不会释放该变量。可以运行多次。
3、Fn参数类型是 &self,所以,这种类型的闭包是不可变借用,不会改变变量,也不会释放该变量。可以运行多次。
而在参数列表前使用 move 关键字是强制闭包获取其使用的环境值的所有权。这个技巧在创建新线程将值的所有权从一个线程移动到另一个线程时最为实用,而不需要由rust编译器去推导所有权,但当使用move关键字将stream转移给闭包线程后,在主线程便不能再进行操作,这是move语义需要注意的,在这里不展开。
二、多线程改进方式会引发DoS攻击
通过为每个请求创建线程的方式虽然解决了多并发阻塞的问题,但是当请求量无限上涨时,比如常见的DoS攻击(洪水攻击),即不停的创建请求导致线程无上限创建时会消耗服务器资源。
那可以限定线程的数量,而线程的执行效率也是跟cpu挂钩,系统线程数=cpu个数 * 核数,当运行线程数大于系统线程数时,操作系统会采用cpu时间片控制,采用线程调度算法进行线程切换,线程数越大切换越频繁,所以经常说线程数不可过大,一般设置线程数有前辈给出过实践公式,可以搜一下。我一般不分io密集还是cpu密集,会设置标准数量为核数的大小,可最大创建两倍核数大小。
限制线程数可以防止线程无限创建导致的资源问题,那么当请求大于工作线程数量时,又该如何处理不能及时处理的请求呢,想象一下我们平时生活中的场景,去银行办理业务时,银行柜台工作有限,那么排队的人就需要获取排队号码进行等待,同理的,我们可以引入一个FIFO队列,即先请求的任务势必先处理。
限定线程池 + 请求队列,这便是我们线程池的两大核心:
1、预创建固定数量的线程数
2、将请求放入请求队列等待处理
总结:
在rust实践中,我们将使用channel来进行任务的分发,ThreadPool在接受到任务时写入channel,预创建的线程工作者可以同channel获取到任务并执行。
设计
线程池不仅仅是可以用在web请求处理上,实际上它应该被作为一个基础组件运用在其他需要进行并行计算的位置,我们可以单独把线程池作为一个独立的模块。
首先,我们先拆分之前的项目结构,在src目录下创建bin目录,用于放置我们的主工程,而在bin外层,也即src下创建lib.rs,用于封装ThreadPool作为主工程的外部库,拆分后的目录结构如下:
封装ThreadPool
lib.rs 代码如下:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
main.rs 使用 ThreadPool
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use webBean::ThreadPool;
fn main() {
let listener = TcpListener::bind("10.86.168.45:9999").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
// std::thread::spawn(move || {
// handle_connection(stream);
// });
pool.execute(|| {
handle_connection(stream);
});
}
}
当我们在设计ThreadPool的Api的时候, ThreadPool的使用者即开发人员是我们要服务的对象,那Api必须让使用者尽量少的学习成本以及使用负担,ThreadPool的Api如下说明:
-
new函数:表示线程池里的预创建线程数量,这里还未采用弹性创建方式
-
execute函数:将函数处理交由线程池托管,函数api跟std::thread::spawn一致
executre在模仿spawn实现时,核心在于函数参数:闭包。
在这里,咱们先在脑里回顾前文讲到的rust闭包有三种特性:Fn,FnMut,FnOnce。在线程池的这个使用场景下,我们该使用哪个特性呢?
处理请求的线程只执行请求的闭包函数一次,之后闭包的生命周期便结束了,不可再运行多次,所以在这里可以使用FnOnce
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
实现
Api设计结束,虽然代码已经可以运行,但是只是个架子,接下来需要为其丰富其内涵了。
- 回顾银行柜台处理业务的场景,线程池的机制便是先预创建柜台员工(Worker),每个Workder持续(线程)工作(处理每一个请求)。
在ThreadPool内部,我们定义Worker并实现
struct Worker {
//每个员工有编号
id: usize,
//绑定线程持续处理请求
thread: thread::JoinHandle<()>,
}
impl Worker {
//构造Workder
fn new (id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker {id, thread}
}
}
- 现在员工(Worker)有了, 那他们应该怎么知道有请求要进行处理呢。接下来,我们将引入Rust的线程通信机制Channel。
应用层调用ThreadPool的Execute方法时,ThreadPool可以通过chancel发送一个任务(Job)给到Workder绑定的channel端。
Job的定义如下:
type Job = Box<dyn FnOnce() + Send + 'static>;
这句定义可能比较隐晦,我们先一步步拆解 Rust的语法糖
FnOnce() + Send + 'static 是一个特征对象:
-
FnOnce也即是闭包特性
-
Send 表示该类型的值可以通过跨线程发送,用于Channel
-
'static 表示对特征对象里包含的引用变量生命周期的约束,要保证的是在执行闭包时,闭包的对象生命周期不能小于闭包,而实现安全检测。
-
Channel通信机制
ThreadPool在初始化时创建Channel的作为Job的发送端,每个Worker获取Channel作为Job的接收端,当每个Worker在获取Channel中的Job任务时,涉及到并发共享锁的问题,所以在消费Job时,每个Worker需要对Channel使用Arc<Mutex>,Arc 如C++中的智能指针shared_ptr,只是增加引用计数,最后一个指针销毁后,内存才被释放,而这里加了Mutex表示内存是mut可修改的。 -
ThreadPool创建Channel
-
Worker构造时接收Channel
-
Worker处理Channel任务Job时,进行锁
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
//短别名
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
// 创建Channel,作为任务发送端
let (sender, receiver) = mpsc::channel();
//创建Channel接收端指针
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
//每个work共享一个receiver
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
//threadPool接收任务时通过Channel发送Job任务
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
//构造Worker时接收Receiver作为线程的闭包参数捕获
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
//调用recv阻塞当前线程,直到任务接收后进行执行,执行结束再进行loop循环从Channel接收Job
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
}
});
Worker {
id,
thread,
}
}
}
以上是关于如何提高nfs的并发处理能力的主要内容,如果未能解决你的问题,请参考以下文章
Linux(Centos )的网络内核参数优化来提高服务器并发处理能力