java队列实现异步执行

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java队列实现异步执行相关的知识,希望对你有一定的参考价值。

现在需求是这样的,
系统需要调用第三方接口推送信息,现在的代码是,调用接口,等待接口返回值,处理结果,然后再执行后面的代码,最后的部分跟处理结果没有太大的联系,可以分开进行,
所以新的设计是,创建一个队列,队列中放需要执行的动作,后台不停循环这个队列,如果有元素,调用对应方法,如果队列为空,后台进入休眠,直到队列添加新的元素。系统只要往队列中添加动作,然后直接执行最后面的代码,队列中的动作访问接口,完成后自己去处理返回结果。
对队列不是很熟,请大神指教。
目前我写的小程序测试结果
第一张图,在queue.offer(new ProcessData(ss));下面还有一句
run();

在整个思路上要调整一下

1、会有很多线程给一个队列上添加任务

2、有一个或者多个线程逐个执行队列的任务


考虑一下几点:

1、没有任务时,队列执行线程处于等待状态

2、添加任务时,激活队列执行线程,全部run起来,首先抢到任务的执行,其他全部wait


给个小例子吧

package org;
import java.util.LinkedList;
import java.util.List;
public class Queues 
public static List<Task> queue = new LinkedList<Task>();
/**
 * 假如 参数o 为任务
 * @param o
 */
public static void add (Task t)
synchronized (Queues.queue) 
Queues.queue.add(t); //添加任务
Queues.queue.notifyAll();//激活该队列对应的执行线程,全部Run起来


static class Task
public void test()
System.out.println("我被执行了");


package org;
import java.util.List;
public class Exec implements Runnable
@Override
public void run() 
while(true)
synchronized (Queues.queue) 
while(Queues.queue.isEmpty()) //
try 
Queues.queue.wait(); //队列为空时,使线程处于等待状态
 catch (InterruptedException e) 
e.printStackTrace();

System.out.println("wait...");

Queues.Task t= Queues.queue.remove(0); //得到第一个
t.test(); //执行该任务
System.out.println("end");



public static void main(String[] args) 
Exec e = new Exec();
for (int i = 0; i < 2; i++) 
new Thread(e).start(); //开始执行时,队列为空,处于等待状态

//上面开启两个线程执行队列中的任务,那就是先到先得了
//添加一个任务测试
Queues.Task t =new Queues.Task();
Queues.add(t); //执行该方法,激活所有对应队列,那两个线程就会开始执行啦



上面的就是很简单的例子了

参考技术A 你想问什么呢?判断队列是否有值?取队列中的值?
这两个问题你能完成的话还有啥需要解决呢?跟你以前的需求不一样么?追问

我也不知道,以前没用过队列,看了下文档,然后写了这货,给组长看看,组长说不行,想请大神指导下异步写队列的规范。
还有,我再添加队列元素的时候需要调用下run那个方法,会不会对当前执行的run有影响。

Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

@Author:Runsen

消息队列

消息队列让应用程序在用户请求之外异步执行称为任务的工作。如果应用程序需要在后台执行工作,它会将任务添加到任务队列中。这些任务稍后由工作服务执行。

Celery

Celery 是一个分布式任务队列,可帮助在后台异步执行大量进程/消息并进行实时处理。芹菜由三个主要成分组成:

  • Celery 客户端
  • 消息代理
  • Celery 工人

下图显示了组件之间如何交互的简化图。我们将使用 FastAPI 作为我们的 Celery 客户端和 RabbitMQ 作为消息代理。

  • 将Celery 客户将运行FastAPI应用程序,并会发出消息/后台作业的RabbitMQ。

  • RabbitMQ 将作为消息代理来调解客户端和工作线程之间的消息。

  • RabbitMQ 收到客户端的消息后,会通过将消息发送给 celery worker 来启动客户端任务。

  • 一个celery 工人被认为是将实现在任何Web服务器的请求的异步后台任务。可以有多个工人一次执行/完成许多任务。

  • celery 将确保每个 worker 一次只执行一个任务,并且每个任务只由一个 worker 分配。

FastAPI 是一个现代、快速(高性能)的 Web 框架,

安装 fastapi包

pip install fastapi

安装 celery 包

pip install celery

我们还需要安装 ASGI 服务器来运行我们的 FastAPI 应用程序。

pip install uvicorn

在我们的本地机器上运行 RabbitMQ 的最简单方法之一是使用 Docker。

Docker安装查看:https://docs.docker.com/get-docker/

运行以下命令即可通过终端中的 docker 启动 RabbitMQ 映像。

docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3

如果没有安装RabbitMQ 映像,会直接下载安装。

创建 Celery Worker 任务

现在,一旦消息代理RabbitMQ 运行,就可以创建Worker 程序。

这是因为 celery Worker 会侦听消息代理以执行排队的任务。

在这一部分,我们将为 celery worker 创建任务。创建一个文件celery_worker.py:

from time import sleep
from celery import Celery
from celery.utils.log import get_task_logger
#初始Celery
celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//')
#创建记录器-启用以在任务记录器上显示消息
celery_log = get_task_logger(__name__)
#创建订单-与芹菜异步运行
#长时间运行任务的示例流程
@celery.task
def create_order(name, quantity):
    # 每1个订单5秒
    complete_time_per_item = 5
    # 根据订购的物品数量不断增加
    sleep(complete_time_per_item * quantity)
    
    celery_log.info(f"Order Complete!")
    return {"message": f"Hi {name}, Your order has completed!",
            "order_quantity": quantity}

新建一个model.py

from pydantic import BaseModel
#请求主体的订单类模型
class Order(BaseModel):
    customer_name: str
    order_quantity: int

然后,创建一个名为main.py的新文件:

from fastapi import FastAPI
from celery_worker import create_order
from model import Order
# 创建FastAPI应用程序
app = FastAPI()
# 创建订单
@app.post('/order')
def add_order(order: Order):
    # 使用delay=() 方法调用芹菜任务
    create_order.delay(order.customer_name, order.order_quantity)
    return {"message": "Order Received! Thank you for your patience."}

运行 FastAPI 应用程序:

uvicorn main:app --reload

访问http://localhost:8000/docs

以查看 Swagger 文档中正在运行的 FastAPI。

celery -A celery_worker.celery worker --loglevel=info

可以通过插入请求正文输入来测试我们的端点。这是请求正文的示例输入:

单击“Execute”以从端点获取响应,将看到以下结果:

有没有什么有效的方法来监控后台任务?

我们可以使用Flower 监控工具来监控我们所有的Celery 工人。

pip install flower

然后,flower在我们的本地机器上运行服务器:

celery flower -A celery_worker.celery --broker:amqp://localhost//

访问http://localhost:5555/。

Flower 服务器并单击导航栏上的“Tasks”选项卡来监控我们的 celery 工人。

以上是关于java队列实现异步执行的主要内容,如果未能解决你的问题,请参考以下文章

java消息队列

iOS(Swift) TaskProtocol异步任务队列

rabbitmq消息队列介绍

Redis 实现 消息队列

java并发包提供的三种常用并发队列实现

详解如何构建Promise队列实现异步函数顺序执行