并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue

Posted qjm201000

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue相关的知识,希望对你有一定的参考价值。

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

 

Delayed

一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。

此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。

 

下面例子是订单超时处理的具体代码:

重点是DelayOrderComponent 和OrderMessage

import com.concurrent.delayqueue.component.DelayOrderComponent;
import com.concurrent.delayqueue.model.OrderInfo;
import com.concurrent.delayqueue.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
@RequestMapping("/order")
public class OrderController {
    @Autowired
    private OrderService orderService;

    //创建订单
    @RequestMapping("insert")
    public void insert() {
        OrderInfo orderInfo = new OrderInfo();
        orderInfo.setCreateTime(new Date());
        orderInfo.setStatus(0);
        orderService.insert(orderInfo);
    }

    //取消订单
    @RequestMapping("cancel")
    public void cancel(Long orderId) {
        orderService.cancel(orderId);
    }

    //支付订单
    @RequestMapping("paysuccess")
    public void paysuccess(Long orderId) {
        orderService.paysuccess(orderId);
    }

    //查看队列中剩余处理数
    @RequestMapping("queuecount")
    public int queuecount() {
        return DelayOrderComponent.getDelayQueueCount();
    }
}
@Service
public class OrderService {
    @Autowired
    private OrderInfoMapper orderInfoMapper;
    @Autowired
    private DelayOrderComponent delayOrderComponent;

    /**
     * 插入
     * @param orderInfo
     */
    @Transactional
    public void insert(OrderInfo orderInfo){
        orderInfoMapper.insert(orderInfo);
        //加入到延时队列中,用于超时未支付
        boolean flag = delayOrderComponent.addDelayQueue(new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime()));
        if(!flag){
            throw new RuntimeException();
        }
    }

    /**
     * 取消
     */
    @Transactional
    public void cancel(Long orderId){
        orderInfoMapper.updateByStatus(orderId,0,-1);
        delayOrderComponent.removeDelayQueue(orderId);
    }

    /**
     * 用户支付成功
     */
    public void paysuccess(Long orderId){
        orderInfoMapper.updateByStatus(orderId,0,1);
        delayOrderComponent.removeDelayQueue(orderId);
    }

}
import com.concurrent.delayqueue.mapper.OrderInfoMapper;
import com.concurrent.delayqueue.message.OrderMessage;
import com.concurrent.delayqueue.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;

/**
 * 处理订单超时
 */
@Component
@Lazy(false)
public class DelayOrderComponent {
    @Autowired
    private OrderInfoMapper orderInfoMapper;

    private static DelayQueue<OrderMessage> delayQueue = new DelayQueue<OrderMessage>();
    public static int getDelayQueueCount(){
        return delayQueue.size();
    }

    /**
     * 系统启动时,预先加载的数据@PostConstruct
     */
    @PostConstruct
    public void init(){
        /**初始化时加载数据库中需处理超时的订单**/
        System.out.println("获取数据库中需要处理的超时的订单");
        List<OrderInfo> list = orderInfoMapper.selectByStatus(0);
        for(int i=0;i<list.size();i++){
            OrderInfo orderInfo = list.get(i);
            OrderMessage orderMessage = new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime());
            this.addDelayQueue(orderMessage);//加入队列
        }

        /**
         * 启动线程,取延时消息
         */
        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        OrderMessage orderMessage = delayQueue.take();
                        //处理超时订单
                        orderInfoMapper.updateByStatus(orderMessage.getOrderId(),0,2);//订单状态改成超时订单
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

    /**
     * 加入延时队列
     * 用户下单时,调用此方法
     */
    public boolean addDelayQueue(OrderMessage orderMessage){
        return delayQueue.add(orderMessage);
    }

    /**
     * 从延时队列中删除
     * 用户主动取消,或者支付成功后,调用此方法
     */
    public boolean removeDelayQueue(Long orderId){
        for (Iterator<OrderMessage> iterator = delayQueue.iterator(); iterator.hasNext();) {
            OrderMessage queue = iterator.next();
            if(orderId.equals(queue.getOrderId())){
                return delayQueue.remove(queue);
            }
        }
        return false;
    }

}
public class OrderMessage implements Delayed {
    private final static long DELAY = 15*60*1000L;//默认延迟15分钟

    private Long orderId;//订单号
    private Long expireTime;//过期时间
    public OrderMessage(Long orderId,Long createTime){
        this.orderId = orderId;
        this.expireTime = createTime + DELAY;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expireTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        if (other == this){
            return 0;
        }
        if(other instanceof OrderMessage){
            OrderMessage otherRequest = (OrderMessage)other;
            long otherStartTime = otherRequest.expireTime;
            return (int)(this.expireTime - otherStartTime);
        }
        return 0;
    }

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public Long getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(Long expireTime) {
        this.expireTime = expireTime;
    }
}
import java.util.Date;

public class OrderInfo {
    private Long orderId;//订单状态
    private Date createTime;//创建时间
    private Integer status;//订单状态:0待支付1已支付-1取消2已超时

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }
}
import com.concurrent.delayqueue.model.OrderInfo;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.List;

@Mapper
public interface OrderInfoMapper {
    int deleteByPrimaryKey(Long orderId);

    int insert(OrderInfo record);

    int insertSelective(OrderInfo record);

    OrderInfo selectByPrimaryKey(Long orderId);

    int updateByPrimaryKeySelective(OrderInfo record);

    int updateByPrimaryKey(OrderInfo record);

    List<OrderInfo> selectByStatus(int status);
    int updateByStatus(@Param("orderId")Long orderId, @Param("oldstatus")Integer oldstatus,@Param("newstatus")Integer newstatus);
}
OrderInfoMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.concurrent.delayqueue.mapper.OrderInfoMapper" >
  <resultMap id="BaseResultMap" type="com.concurrent.delayqueue.model.OrderInfo" >
    <id column="order_id" property="orderId" jdbcType="BIGINT" />
    <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    <result column="status" property="status" jdbcType="INTEGER" />
  </resultMap>
  <sql id="Base_Column_List" >
    order_id, create_time, status
  </sql>
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Long" >
    select
    <include refid="Base_Column_List" />
    from t_order
    where order_id = #{orderId,jdbcType=BIGINT}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.Long" >
    delete from t_order
    where order_id = #{orderId,jdbcType=BIGINT}
  </delete>
  <insert id="insert" parameterType="com.concurrent.delayqueue.model.OrderInfo"
          useGeneratedKeys="true" keyProperty="orderId">
    insert into t_order (order_id, create_time, status
      )
    values (#{orderId,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, #{status,jdbcType=INTEGER}
      )
  </insert>
  <insert id="insertSelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
    insert into t_order
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="orderId != null" >
        order_id,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="status != null" >
        status,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="orderId != null" >
        #{orderId,jdbcType=BIGINT},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="status != null" >
        #{status,jdbcType=INTEGER},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
    update t_order
    <set >
      <if test="createTime != null" >
        create_time = #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="status != null" >
        status = #{status,jdbcType=INTEGER},
      </if>
    </set>
    where order_id = #{orderId,jdbcType=BIGINT}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
    update t_order
    set create_time = #{createTime,jdbcType=TIMESTAMP},
      status = #{status,jdbcType=INTEGER}
    where order_id = #{orderId,jdbcType=BIGINT}
  </update>


  <select id="selectByStatus" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
    select
    <include refid="Base_Column_List" />
    from t_order
    where status = #{status,jdbcType=INTEGER}
  </select>
  <update id="updateByStatus">
    update t_order
    set status = #{newstatus,jdbcType=INTEGER}
    where order_id = #{orderId,jdbcType=BIGINT}
    and status = #{oldstatus,jdbcType=INTEGER}
  </update>
</mapper>

 

application.properties

spring.datasource.url = jdbc:mysql://localhost:3306/concurrent?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username = root
spring.datasource.password =  123456

mybatis.mapper-locations=classpath:/mybatis/*Mapper.xml

源码地址:https://github.com/qjm201000/concurrent_delayqueue.git

数据库sql文件:到源码里面查看readme,按照步骤来就行。

 

以上是关于并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue的主要内容,如果未能解决你的问题,请参考以下文章

深入理解java:2.3.4. 并发编程concurrent包 之容器ConcurrentLinkedQueue

Java多线程工具包java.util.concurrent---目录

Java多线程工具包java.util.concurrent---目录

java并发编程(十八)阻塞队列和阻塞栈

转: Java并发编程之二十一:并发新特性—阻塞队列和阻塞栈(含代码)

并发编程复习笔记