java队列实现异步执行
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java队列实现异步执行相关的知识,希望对你有一定的参考价值。
现在需求是这样的,
系统需要调用第三方接口推送信息,现在的代码是,调用接口,等待接口返回值,处理结果,然后再执行后面的代码,最后的部分跟处理结果没有太大的联系,可以分开进行,
所以新的设计是,创建一个队列,队列中放需要执行的动作,后台不停循环这个队列,如果有元素,调用对应方法,如果队列为空,后台进入休眠,直到队列添加新的元素。系统只要往队列中添加动作,然后直接执行最后面的代码,队列中的动作访问接口,完成后自己去处理返回结果。
对队列不是很熟,请大神指教。
目前我写的小程序测试结果
第一张图,在queue.offer(new ProcessData(ss));下面还有一句
run();
在整个思路上要调整一下
1、会有很多线程给一个队列上添加任务
2、有一个或者多个线程逐个执行队列的任务
考虑一下几点:
1、没有任务时,队列执行线程处于等待状态
2、添加任务时,激活队列执行线程,全部run起来,首先抢到任务的执行,其他全部wait
给个小例子吧
package org;import java.util.LinkedList;
import java.util.List;
public class Queues
public static List<Task> queue = new LinkedList<Task>();
/**
* 假如 参数o 为任务
* @param o
*/
public static void add (Task t)
synchronized (Queues.queue)
Queues.queue.add(t); //添加任务
Queues.queue.notifyAll();//激活该队列对应的执行线程,全部Run起来
static class Task
public void test()
System.out.println("我被执行了");
package org;
import java.util.List;
public class Exec implements Runnable
@Override
public void run()
while(true)
synchronized (Queues.queue)
while(Queues.queue.isEmpty()) //
try
Queues.queue.wait(); //队列为空时,使线程处于等待状态
catch (InterruptedException e)
e.printStackTrace();
System.out.println("wait...");
Queues.Task t= Queues.queue.remove(0); //得到第一个
t.test(); //执行该任务
System.out.println("end");
public static void main(String[] args)
Exec e = new Exec();
for (int i = 0; i < 2; i++)
new Thread(e).start(); //开始执行时,队列为空,处于等待状态
//上面开启两个线程执行队列中的任务,那就是先到先得了
//添加一个任务测试
Queues.Task t =new Queues.Task();
Queues.add(t); //执行该方法,激活所有对应队列,那两个线程就会开始执行啦
上面的就是很简单的例子了
参考技术A 你想问什么呢?判断队列是否有值?取队列中的值?这两个问题你能完成的话还有啥需要解决呢?跟你以前的需求不一样么?追问
我也不知道,以前没用过队列,看了下文档,然后写了这货,给组长看看,组长说不行,想请大神指导下异步写队列的规范。
还有,我再添加队列元素的时候需要调用下run那个方法,会不会对当前执行的run有影响。
Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控
@Author:Runsen
消息队列
消息队列让应用程序在用户请求之外异步执行称为任务的工作。如果应用程序需要在后台执行工作,它会将任务添加到任务队列中。这些任务稍后由工作服务执行。
Celery
Celery 是一个分布式任务队列,可帮助在后台异步执行大量进程/消息并进行实时处理。芹菜由三个主要成分组成:
- Celery 客户端
- 消息代理
- Celery 工人
下图显示了组件之间如何交互的简化图。我们将使用 FastAPI 作为我们的 Celery 客户端和 RabbitMQ 作为消息代理。
-
将Celery 客户将运行FastAPI应用程序,并会发出消息/后台作业的RabbitMQ。
-
RabbitMQ 将作为消息代理来调解客户端和工作线程之间的消息。
-
RabbitMQ 收到客户端的消息后,会通过将消息发送给 celery worker 来启动客户端任务。
-
一个celery 工人被认为是将实现在任何Web服务器的请求的异步后台任务。可以有多个工人一次执行/完成许多任务。
-
celery 将确保每个 worker 一次只执行一个任务,并且每个任务只由一个 worker 分配。
FastAPI 是一个现代、快速(高性能)的 Web 框架,
安装 fastapi包
pip install fastapi
安装 celery 包
pip install celery
我们还需要安装 ASGI 服务器来运行我们的 FastAPI 应用程序。
pip install uvicorn
在我们的本地机器上运行 RabbitMQ 的最简单方法之一是使用 Docker。
Docker安装查看:https://docs.docker.com/get-docker/
运行以下命令即可通过终端中的 docker 启动 RabbitMQ 映像。
docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3
如果没有安装RabbitMQ 映像,会直接下载安装。
创建 Celery Worker 任务
现在,一旦消息代理RabbitMQ 运行,就可以创建Worker 程序。
这是因为 celery Worker 会侦听消息代理以执行排队的任务。
在这一部分,我们将为 celery worker 创建任务。创建一个文件celery_worker.py:
from time import sleep
from celery import Celery
from celery.utils.log import get_task_logger
#初始Celery
celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//')
#创建记录器-启用以在任务记录器上显示消息
celery_log = get_task_logger(__name__)
#创建订单-与芹菜异步运行
#长时间运行任务的示例流程
@celery.task
def create_order(name, quantity):
# 每1个订单5秒
complete_time_per_item = 5
# 根据订购的物品数量不断增加
sleep(complete_time_per_item * quantity)
celery_log.info(f"Order Complete!")
return {"message": f"Hi {name}, Your order has completed!",
"order_quantity": quantity}
新建一个model.py
from pydantic import BaseModel
#请求主体的订单类模型
class Order(BaseModel):
customer_name: str
order_quantity: int
然后,创建一个名为main.py的新文件:
from fastapi import FastAPI
from celery_worker import create_order
from model import Order
# 创建FastAPI应用程序
app = FastAPI()
# 创建订单
@app.post('/order')
def add_order(order: Order):
# 使用delay=() 方法调用芹菜任务
create_order.delay(order.customer_name, order.order_quantity)
return {"message": "Order Received! Thank you for your patience."}
运行 FastAPI 应用程序:
uvicorn main:app --reload
访问http://localhost:8000/docs
以查看 Swagger 文档中正在运行的 FastAPI。
celery -A celery_worker.celery worker --loglevel=info
可以通过插入请求正文输入来测试我们的端点。这是请求正文的示例输入:
单击“Execute”以从端点获取响应,将看到以下结果:
有没有什么有效的方法来监控后台任务?
我们可以使用Flower 监控工具来监控我们所有的Celery 工人。
pip install flower
然后,flower在我们的本地机器上运行服务器:
celery flower -A celery_worker.celery --broker:amqp://localhost//
访问http://localhost:5555/。
Flower 服务器并单击导航栏上的“Tasks”选项卡来监控我们的 celery 工人。
以上是关于java队列实现异步执行的主要内容,如果未能解决你的问题,请参考以下文章