Rust编程语言入门之最后的项目:多线程 Web 服务器
Posted 小乔的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rust编程语言入门之最后的项目:多线程 Web 服务器相关的知识,希望对你有一定的参考价值。
最后的项目:多线程 Web 服务器
构建多线程 Web 服务器
- 在 socket 上监听 TCP 连接
- 解析少量的 HTTP 请求
- 创建一个合适的 HTTP 响应
- 使用线程池改进服务器的吞吐量
- 优雅的停机和清理
- 注意:并不是最佳实践
创建项目
~/rust
➜ cargo new hello
Created binary (application) `hello` package
~/rust
➜
main.rs 文件
use std::net::TcpListener;
fn main()
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming()
let stream = stream.unwrap();
println!("Connection established!");
修改一:
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main()
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming()
let stream = stream.unwrap();
handle_connection(stream);
fn handle_connection(mut stream: TcpStream)
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
println!("Request: ", String::from_utf8_lossy(&buffer[..]));
修改二:
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main()
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming()
let stream = stream.unwrap();
handle_connection(stream);
fn handle_connection(mut stream: TcpStream)
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
// 请求
// Method Request-URI HTTP-Version CRLF
// headers CRLF
// message-body
// 响应
// HTTP-Version Status-Code Reason-Phrase CRLF
// headers CRLF
// message-body
let response = "HTTP/1.1 200 OK\\r\\n\\r\\n";
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
println!("Request: ", String::from_utf8_lossy(&buffer[..]));
修改三:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main()
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming()
let stream = stream.unwrap();
handle_connection(stream);
fn handle_connection(mut stream: TcpStream)
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
// 请求
// Method Request-URI HTTP-Version CRLF
// headers CRLF
// message-body
// 响应
// HTTP-Version Status-Code Reason-Phrase CRLF
// headers CRLF
// message-body
let contents = fs::read_to_string("hello.html").unwrap();
let response = format!("HTTP/1.1 200 OK\\r\\n\\r\\n", contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
修改四:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main()
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming()
let stream = stream.unwrap();
handle_connection(stream);
fn handle_connection(mut stream: TcpStream)
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\\r\\n";
if buffer.starts_with(get)
let contents = fs::read_to_string("hello.html").unwrap();
let response = format!("HTTP/1.1 200 OK\\r\\n\\r\\n", contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
else
let status_line = "HTTP/1.1 404 NOT FOUND\\r\\n\\r\\n";
let contents = fs::read_to_string("404.html").unwrap();
let response = format!("", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
修改五:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main()
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming()
let stream = stream.unwrap();
handle_connection(stream);
fn handle_connection(mut stream: TcpStream)
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\\r\\n";
let (status_line, filename) = if buffer.starts_with(get)
("HTTP/1.1 200 OK\\r\\n\\r\\n", "hello.html")
else
("HTTP/1.1 404 NOT FOUND\\r\\n\\r\\n", "404.html")
;
let contents = fs::read_to_string(filename).unwrap();
let response = format!("", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
hello.html 文件
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello</title>
</head>
<body>
<h1>Hello</h1>
<p>Hi from Rust</p>
</body>
</html>
404.html 文件
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don\'t know what you\'re asking for.</p>
</body>
</html>
单线程Web服务器
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::time::Duration;
fn main()
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming()
let stream = stream.unwrap();
handle_connection(stream);
fn handle_connection(mut stream: TcpStream)
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\\r\\n";
let sleep = b"GET /sleep HTTP/1.1\\r\\n";
let (status_line, filename) = if buffer.starts_with(get)
("HTTP/1.1 200 OK\\r\\n\\r\\n", "hello.html")
else if buffer.starts_with(sleep)
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\\r\\n\\r\\n", "hello.html")
else
("HTTP/1.1 404 NOT FOUND\\r\\n\\r\\n", "404.html")
;
let contents = fs::read_to_string(filename).unwrap();
let response = format!("", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
开启线程
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::time::Duration;
fn main()
let listener = TcpListener::bind("localhost:7878").unwrap();
for stream in listener.incoming()
let stream = stream.unwrap();
thread::spawn(||
handle_connection(stream);
);
fn handle_connection(mut stream: TcpStream)
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\\r\\n";
let sleep = b"GET /sleep HTTP/1.1\\r\\n";
let (status_line, filename) = if buffer.starts_with(get)
("HTTP/1.1 200 OK\\r\\n\\r\\n", "hello.html")
else if buffer.starts_with(sleep)
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\\r\\n\\r\\n", "hello.html")
else
("HTTP/1.1 404 NOT FOUND\\r\\n\\r\\n", "404.html")
;
let contents = fs::read_to_string(filename).unwrap();
let response = format!("", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
lib.rs 文件
use std::thread;
pub struct ThreadPool
threads: Vec<thread::JoinHandle<()>>,
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size
// create some threads and store them in the vector
ThreadPool threads
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
lib.rs 修改一
use std::thread;
pub struct ThreadPool
workers: Vec<Worker>,
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size
workers.push(Worker::new(id));
ThreadPool workers
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
struct Worker
id: usize,
thread: thread::JoinHandle<()>,
impl Worker
fn new(id: usize) -> Worker
let thread = thread::spawn(|| );
Worker id, thread
lib.rs 修改二
use std::thread;
use std::sync::mpsc;
pub struct ThreadPool
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
struct Job;
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size
workers.push(Worker::new(id, receiver));
ThreadPool workers, sender
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
struct Worker
id: usize,
thread: thread::JoinHandle<()>,
impl Worker
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker
let thread = thread::spawn(||
receiver;
);
Worker id, thread
lib.rs
修改三
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
pub struct ThreadPool
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
struct Job;
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size
workers.push(Worker::new(id, Arc::clone(&receiver)));
ThreadPool workers, sender
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
struct Worker
id: usize,
thread: thread::JoinHandle<()>,
impl Worker
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker
let thread = thread::spawn(||
receiver;
);
Worker id, thread
lib.rs
修改四
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
pub struct ThreadPool
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
// struct Job;
type Job = Box<FnOnce() + Send + \'static>;
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size
workers.push(Worker::new(id, Arc::clone(&receiver)));
ThreadPool workers, sender
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
let job = Box::new(f);
self.sender.send(job).unwrap();
struct Worker
id: usize,
thread: thread::JoinHandle<()>,
impl Worker
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker
let thread = thread::spawn(move || loop
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker got a job; executing.", id);
(*job)();
);
Worker id, thread
lib.rs
修改五
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
pub struct ThreadPool
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
// struct Job;
// type Job = Box<FnOnce() + Send + \'static>;
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size
workers.push(Worker::new(id, Arc::clone(&receiver)));
ThreadPool workers, sender
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
let job = Box::new(f);
self.sender.send(job).unwrap();
struct Worker
id: usize,
thread: thread::JoinHandle<()>,
trait FnBox
fn call_box(self: Box<Self>);
impl<F: FnOnce()> FnBox for F
fn call_box(self: Box<F>)
(*self)()
type Job = Box<dyn FnBox + Send + \'static>;
impl Worker
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker
let thread = thread::spawn(move || loop
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker got a job; executing.", id);
// (*job)();
job.call_box();
);
Worker id, thread
lib.rs
修改六
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct ThreadPool
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
// struct Job;
// type Job = Box<FnOnce() + Send + \'static>;
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size
workers.push(Worker::new(id, Arc::clone(&receiver)));
ThreadPool workers, sender
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
let job = Box::new(f);
self.sender.send(job).unwrap();
struct Worker
id: usize,
thread: thread::JoinHandle<()>,
trait FnBox
fn call_box(self: Box<Self>);
impl<F: FnOnce()> FnBox for F
fn call_box(self: Box<F>)
(*self)()
type Job = Box<dyn FnBox + Send + \'static>;
impl Worker
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker
let thread = thread::spawn(move || loop
while let Ok(job) = receiver.lock().unwrap().recv()
println!("Worker got a job; executing.", id);
job.call_box();
);
Worker id, thread
优雅的停机和清理
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct ThreadPool
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
// struct Job;
// type Job = Box<FnOnce() + Send + \'static>;
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size
workers.push(Worker::new(id, Arc::clone(&receiver)));
ThreadPool workers, sender
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
let job = Box::new(f);
self.sender.send(job).unwrap();
impl Drop for ThreadPool
fn drop(&mut self)
for worker in &mut self.workers
println!("Shutting down worker ", worker.id);
worker.thread.join().unwrap()
struct Worker
id: usize,
thread: thread::JoinHandle<()>,
trait FnBox
fn call_box(self: Box<Self>);
impl<F: FnOnce()> FnBox for F
fn call_box(self: Box<F>)
(*self)()
type Job = Box<dyn FnBox + Send + \'static>;
impl Worker
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker
let thread = thread::spawn(move || loop
while let Ok(job) = receiver.lock().unwrap().recv()
println!("Worker got a job; executing.", id);
job.call_box();
);
Worker id, thread
修改优化一:
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct ThreadPool
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
// struct Job;
// type Job = Box<FnOnce() + Send + \'static>;
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size
workers.push(Worker::new(id, Arc::clone(&receiver)));
ThreadPool workers, sender
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
let job = Box::new(f);
self.sender.send(job).unwrap();
impl Drop for ThreadPool
fn drop(&mut self)
for worker in &mut self.workers
println!("Shutting down worker ", worker.id);
if let Some(thread) = worker.thread.take()
thread.join().unwrap();
struct Worker
id: usize,
thread: Option<thread::JoinHandle<()>>,
trait FnBox
fn call_box(self: Box<Self>);
impl<F: FnOnce()> FnBox for F
fn call_box(self: Box<F>)
(*self)()
type Job = Box<dyn FnBox + Send + \'static>;
impl Worker
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker
let thread = thread::spawn(move || loop
while let Ok(job) = receiver.lock().unwrap().recv()
println!("Worker got a job; executing.", id);
job.call_box();
);
Worker
id,
thread: Some(thread),
最终版 lib.rs 文件
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
enum Message
NewJob(Job),
Terminate,
pub struct ThreadPool
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
// struct Job;
// type Job = Box<FnOnce() + Send + \'static>;
impl ThreadPool
/// Creates a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size
workers.push(Worker::new(id, Arc::clone(&receiver)));
ThreadPool workers, sender
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + \'static,
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
impl Drop for ThreadPool
fn drop(&mut self)
println!("Sending terminate message to all workers.");
for _ in &mut self.workers
self.sender.send(Message::Terminate).unwrap();
println!("Shutting down all workers.");
for worker in &mut self.workers
println!("Shutting down worker ", worker.id);
if let Some(thread) = worker.thread.take()
thread.join().unwrap();
struct Worker
id: usize,
thread: Option<thread::JoinHandle<()>>,
trait FnBox
fn call_box(self: Box<Self>);
impl<F: FnOnce()> FnBox for F
fn call_box(self: Box<F>)
(*self)()
type Job = Box<dyn FnBox + Send + \'static>;
impl Worker
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker
let thread = thread::spawn(move || loop
let message = receiver.lock().unwrap().recv().unwrap();
match message
Message::NewJob(job) =>
println!("Worker got a job; executing.", id);
job.call_box();
Message::Terminate =>
println!("Worker got a job; executing.", id);
break;
);
Worker
id,
thread: Some(thread),
最终版 main.rs 文件
use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;
fn main()
let listener = TcpListener::bind("localhost:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2)
let stream = stream.unwrap();
pool.execute(||
handle_connection(stream);
);
println!("Shutting down.");
fn handle_connection(mut stream: TcpStream)
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\\r\\n";
let sleep = b"GET /sleep HTTP/1.1\\r\\n";
let (status_line, filename) = if buffer.starts_with(get)
("HTTP/1.1 200 OK\\r\\n\\r\\n", "hello.html")
else if buffer.starts_with(sleep)
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\\r\\n\\r\\n", "hello.html")
else
("HTTP/1.1 404 NOT FOUND\\r\\n\\r\\n", "404.html")
;
let contents = fs::read_to_string(filename).unwrap();
let response = format!("", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
运行
hello on master [?] is Rust编程语言入门之编写自动化测试
编写自动化测试
一、编写和运行测试
测试(函数)
- 测试:
- 函数
- 验证非测试代码的功能是否和预期一致
- 测试函数体(通常)执行的3个操作:
- 准备数据/状态
- 运行被测试的代码
- 断言(Assert)结果
解剖测试函数
- 测试函数需要使用 test 属性(attribute)进行标注
- Attribute就是一段Rust代码的元数据
- 在函数上加 #[test],可把函数变成测试函数
运行测试
-
使用 cargo test 命令运行所有测试函数
- Rust会构建一个 Test Runner 可执行文件
- 它会运行标注了 test 的函数,并报告其运行是否成功
-
当使用 cargo 创建 library 项目的时候,会生成一个 test module,里面有一个test 函数
- 你可以添加任意数量的 test module 或 函数
~/rust
➜ cargo new adder --lib
Created library `adder` package
~/rust
➜ cd adder
adder on master [?] via 以上是关于Rust编程语言入门之最后的项目:多线程 Web 服务器的主要内容,如果未能解决你的问题,请参考以下文章