在一个Web服务请求中启动一个线程,并在另一个请求中停止它

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在一个Web服务请求中启动一个线程,并在另一个请求中停止它相关的知识,希望对你有一定的参考价值。

由于使用Hyper.rs的Web服务请求,我想启动一个记录传感器值的线程。然后,在另一个请求中,我想停止该线程并获取所有记录的数据。

我知道如果一个线程已经运行,感谢Mutex,所以我试图将Sender<T>放入Mutex,但我不能使用这个Mutex的值,因为“无法摆脱借来的内容”。

这是我的代码:

extern crate futures;
extern crate hyper;

use futures::future::Future;
use hyper::{Method, StatusCode};
use hyper::server::{Http, Request, Response, Service};
use std::sync::Mutex;
use std::sync::mpsc::{channel, Sender};
use std::thread;
use std::thread::ThreadId;

fn main() {
    let addr = "127.0.0.1:3000".parse().unwrap();
    let server = Http::new().bind(&addr, || Ok(Sensor::new())).unwrap();
    server.run().unwrap();
}

struct Sensor {
    current_thread: Mutex<Option<ThreadId>>,
    current_tx: Mutex<Option<Sender<String>>>,
}

impl Sensor {
    pub fn new() -> Sensor {
        Sensor {
            current_thread: Mutex::new(None),
            current_tx: Mutex::new(None),
        }
    }

    fn start_sensors(&self) -> <Sensor as Service>::Future {
        let mut current_thread = self.current_thread.lock().unwrap();
        match *current_thread {
            Some(id) => {
                self.response_with_body(format!("thread with id {:?} is already running", id))
            }
            None => {
                let (tx, rx) = channel();
                let builder = thread::Builder::new();
                let join_handle = builder
                    .spawn(|| {
                        for received in rx {
                            println!("Got: {}", received);
                            if received == "stop" {
                                break;
                            }
                        }
                    })
                    .unwrap();
                let thread = join_handle.thread();

                let mut current_tx = self.current_tx.lock().unwrap();
                *current_tx = Some(tx);

                let message = format!("thread id: {:?}", &thread.id());
                *current_thread = Some(thread.id());

                self.response_with_body(message)
            }
        }
    }

    fn stop_sensors(&self) -> <Sensor as Service>::Future {
        let mut current_tx = self.current_tx.lock().unwrap();
        match *current_tx {
            Some(tx) => {
                let mut current_thread = self.current_thread.lock().unwrap();

                // just to trying to communicate with the thread
                let vals = vec![String::from("hi"), String::from("stop")];

                for val in vals {
                    tx.send(val).unwrap();
                }

                *current_tx = None;
                *current_thread = None;
                Box::new(futures::future::ok(Response::new()))
            }
            None => self.response_with_body(format!("No thread running")),
        }
    }

    fn response_with_body(&self, body: String) -> <Sensor as Service>::Future {
        Box::new(futures::future::ok(Response::new().with_body(body)))
    }
}

impl Service for Sensor {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, req: Request) -> Self::Future {
        //let mut current_thread: Option<ThreadId> = None;
        match (req.method(), req.path()) {
            (&Method::Get, "/sensors/start") => self.start_sensors(),
            (&Method::Get, "/sensors/stop") => self.stop_sensors(),
            _ => Box::new(futures::future::ok(
                Response::new().with_status(StatusCode::NotFound),
            )),
        }
    }
}
error[E0507]: cannot move out of borrowed content
  --> src/main.rs:65:15
   |
65 |         match *current_tx {
   |               ^^^^^^^^^^^ cannot move out of borrowed content
66 |             Some(tx) => {
   |                  -- hint: to prevent move, use `ref tx` or `ref mut tx`

我该怎么做?

答案

您可以设法将None放入*current_tx,只需稍加修改:

fn stop_sensors(&self) -> <Sensor as Service>::Future {
    let mut current_tx = self.current_tx.lock().unwrap();

    match std::mem::replace(&mut *current_tx, None) {
        Some(tx) => {
            let mut current_thread = self.current_thread.lock().unwrap();

            // just to trying to communicate with the thread
            let vals = vec![String::from("hi"), String::from("stop")];

            for val in vals {
                tx.send(val).unwrap();
            }

            // *current_tx = None;
            *current_thread = None;
            Box::new(futures::future::ok(Response::new()))
        }
        None => self.response_with_body(format!("No thread running")),
    }
}

std::mem::replace将None置于*current_tx并返回旧值。

以上是关于在一个Web服务请求中启动一个线程,并在另一个请求中停止它的主要内容,如果未能解决你的问题,请参考以下文章

如何在python中启动一个线程池并在一个线程完成时停止?

多线程设计最佳实践

如何在另一个线程/任务中正确调用大逻辑?

为啥套接字被阻止接收,而我睡在另一个线程上?

WCF 服务接受并发请求

将鼠标光标更改为等待光标,然后启动工作线程并在线程完成时改回