并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue
Posted qjm201000
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue相关的知识,希望对你有一定的参考价值。
DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
Delayed
一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。
此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
下面例子是订单超时处理的具体代码:
重点是DelayOrderComponent 和OrderMessage
import com.concurrent.delayqueue.component.DelayOrderComponent; import com.concurrent.delayqueue.model.OrderInfo; import com.concurrent.delayqueue.service.OrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; @RestController @RequestMapping("/order") public class OrderController { @Autowired private OrderService orderService; //创建订单 @RequestMapping("insert") public void insert() { OrderInfo orderInfo = new OrderInfo(); orderInfo.setCreateTime(new Date()); orderInfo.setStatus(0); orderService.insert(orderInfo); } //取消订单 @RequestMapping("cancel") public void cancel(Long orderId) { orderService.cancel(orderId); } //支付订单 @RequestMapping("paysuccess") public void paysuccess(Long orderId) { orderService.paysuccess(orderId); } //查看队列中剩余处理数 @RequestMapping("queuecount") public int queuecount() { return DelayOrderComponent.getDelayQueueCount(); } }
@Service public class OrderService { @Autowired private OrderInfoMapper orderInfoMapper; @Autowired private DelayOrderComponent delayOrderComponent; /** * 插入 * @param orderInfo */ @Transactional public void insert(OrderInfo orderInfo){ orderInfoMapper.insert(orderInfo); //加入到延时队列中,用于超时未支付 boolean flag = delayOrderComponent.addDelayQueue(new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime())); if(!flag){ throw new RuntimeException(); } } /** * 取消 */ @Transactional public void cancel(Long orderId){ orderInfoMapper.updateByStatus(orderId,0,-1); delayOrderComponent.removeDelayQueue(orderId); } /** * 用户支付成功 */ public void paysuccess(Long orderId){ orderInfoMapper.updateByStatus(orderId,0,1); delayOrderComponent.removeDelayQueue(orderId); } }
import com.concurrent.delayqueue.mapper.OrderInfoMapper; import com.concurrent.delayqueue.message.OrderMessage; import com.concurrent.delayqueue.model.OrderInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Iterator; import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.Executors; /** * 处理订单超时 */ @Component @Lazy(false) public class DelayOrderComponent { @Autowired private OrderInfoMapper orderInfoMapper; private static DelayQueue<OrderMessage> delayQueue = new DelayQueue<OrderMessage>(); public static int getDelayQueueCount(){ return delayQueue.size(); } /** * 系统启动时,预先加载的数据@PostConstruct */ @PostConstruct public void init(){ /**初始化时加载数据库中需处理超时的订单**/ System.out.println("获取数据库中需要处理的超时的订单"); List<OrderInfo> list = orderInfoMapper.selectByStatus(0); for(int i=0;i<list.size();i++){ OrderInfo orderInfo = list.get(i); OrderMessage orderMessage = new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime()); this.addDelayQueue(orderMessage);//加入队列 } /** * 启动线程,取延时消息 */ Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { while(true){ try { OrderMessage orderMessage = delayQueue.take(); //处理超时订单 orderInfoMapper.updateByStatus(orderMessage.getOrderId(),0,2);//订单状态改成超时订单 } catch (InterruptedException e) { e.printStackTrace(); } } } }); } /** * 加入延时队列 * 用户下单时,调用此方法 */ public boolean addDelayQueue(OrderMessage orderMessage){ return delayQueue.add(orderMessage); } /** * 从延时队列中删除 * 用户主动取消,或者支付成功后,调用此方法 */ public boolean removeDelayQueue(Long orderId){ for (Iterator<OrderMessage> iterator = delayQueue.iterator(); iterator.hasNext();) { OrderMessage queue = iterator.next(); if(orderId.equals(queue.getOrderId())){ return delayQueue.remove(queue); } } return false; } }
public class OrderMessage implements Delayed { private final static long DELAY = 15*60*1000L;//默认延迟15分钟 private Long orderId;//订单号 private Long expireTime;//过期时间 public OrderMessage(Long orderId,Long createTime){ this.orderId = orderId; this.expireTime = createTime + DELAY; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expireTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed other) { if (other == this){ return 0; } if(other instanceof OrderMessage){ OrderMessage otherRequest = (OrderMessage)other; long otherStartTime = otherRequest.expireTime; return (int)(this.expireTime - otherStartTime); } return 0; } public Long getOrderId() { return orderId; } public void setOrderId(Long orderId) { this.orderId = orderId; } public Long getExpireTime() { return expireTime; } public void setExpireTime(Long expireTime) { this.expireTime = expireTime; } }
import java.util.Date; public class OrderInfo { private Long orderId;//订单状态 private Date createTime;//创建时间 private Integer status;//订单状态:0待支付1已支付-1取消2已超时 public Long getOrderId() { return orderId; } public void setOrderId(Long orderId) { this.orderId = orderId; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Integer getStatus() { return status; } public void setStatus(Integer status) { this.status = status; } }
import com.concurrent.delayqueue.model.OrderInfo; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.util.List; @Mapper public interface OrderInfoMapper { int deleteByPrimaryKey(Long orderId); int insert(OrderInfo record); int insertSelective(OrderInfo record); OrderInfo selectByPrimaryKey(Long orderId); int updateByPrimaryKeySelective(OrderInfo record); int updateByPrimaryKey(OrderInfo record); List<OrderInfo> selectByStatus(int status); int updateByStatus(@Param("orderId")Long orderId, @Param("oldstatus")Integer oldstatus,@Param("newstatus")Integer newstatus); }
OrderInfoMapper.xml <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.concurrent.delayqueue.mapper.OrderInfoMapper" > <resultMap id="BaseResultMap" type="com.concurrent.delayqueue.model.OrderInfo" > <id column="order_id" property="orderId" jdbcType="BIGINT" /> <result column="create_time" property="createTime" jdbcType="TIMESTAMP" /> <result column="status" property="status" jdbcType="INTEGER" /> </resultMap> <sql id="Base_Column_List" > order_id, create_time, status </sql> <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Long" > select <include refid="Base_Column_List" /> from t_order where order_id = #{orderId,jdbcType=BIGINT} </select> <delete id="deleteByPrimaryKey" parameterType="java.lang.Long" > delete from t_order where order_id = #{orderId,jdbcType=BIGINT} </delete> <insert id="insert" parameterType="com.concurrent.delayqueue.model.OrderInfo" useGeneratedKeys="true" keyProperty="orderId"> insert into t_order (order_id, create_time, status ) values (#{orderId,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, #{status,jdbcType=INTEGER} ) </insert> <insert id="insertSelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" > insert into t_order <trim prefix="(" suffix=")" suffixOverrides="," > <if test="orderId != null" > order_id, </if> <if test="createTime != null" > create_time, </if> <if test="status != null" > status, </if> </trim> <trim prefix="values (" suffix=")" suffixOverrides="," > <if test="orderId != null" > #{orderId,jdbcType=BIGINT}, </if> <if test="createTime != null" > #{createTime,jdbcType=TIMESTAMP}, </if> <if test="status != null" > #{status,jdbcType=INTEGER}, </if> </trim> </insert> <update id="updateByPrimaryKeySelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" > update t_order <set > <if test="createTime != null" > create_time = #{createTime,jdbcType=TIMESTAMP}, </if> <if test="status != null" > status = #{status,jdbcType=INTEGER}, </if> </set> where order_id = #{orderId,jdbcType=BIGINT} </update> <update id="updateByPrimaryKey" parameterType="com.concurrent.delayqueue.model.OrderInfo" > update t_order set create_time = #{createTime,jdbcType=TIMESTAMP}, status = #{status,jdbcType=INTEGER} where order_id = #{orderId,jdbcType=BIGINT} </update> <select id="selectByStatus" resultMap="BaseResultMap" parameterType="java.lang.Integer" > select <include refid="Base_Column_List" /> from t_order where status = #{status,jdbcType=INTEGER} </select> <update id="updateByStatus"> update t_order set status = #{newstatus,jdbcType=INTEGER} where order_id = #{orderId,jdbcType=BIGINT} and status = #{oldstatus,jdbcType=INTEGER} </update> </mapper>
application.properties spring.datasource.url = jdbc:mysql://localhost:3306/concurrent?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC spring.datasource.username = root spring.datasource.password = 123456 mybatis.mapper-locations=classpath:/mybatis/*Mapper.xml
源码地址:https://github.com/qjm201000/concurrent_delayqueue.git
数据库sql文件:到源码里面查看readme,按照步骤来就行。
以上是关于并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue的主要内容,如果未能解决你的问题,请参考以下文章
深入理解java:2.3.4. 并发编程concurrent包 之容器ConcurrentLinkedQueue
Java多线程工具包java.util.concurrent---目录
Java多线程工具包java.util.concurrent---目录