rocketmq的线程服务基类

Posted 瓜子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq的线程服务基类相关的知识,希望对你有一定的参考价值。

RocketMQ有很多的线程服务,这些服务都继承自抽象类ServiceThread。

这个抽象类可以单独抽出来用到我们其他的项目中来,仅仅需要修改下日志模块:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */


/**
 * @author shijia.wxr
 */
public abstract class ServiceThread implements Runnable {
    // 抽象类可以不用实现接口中的方法,但是继承改抽象类的类就必须实现了。
    private static final long JoinTime = 90 * 1000;

    protected final Thread thread;

    protected volatile boolean hasNotified = false;

    protected volatile boolean stoped = false;


    public ServiceThread() {
        this.thread = new Thread(this, this.getServiceName());
    }


    public abstract String getServiceName();


    public void start() {
        this.thread.start();
    }


    public void shutdown() {
        this.shutdown(false);
    }

    public void shutdown(final boolean interrupt) {
        this.stoped = true;
        System.out.println();
        System.out.println("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
        synchronized (this) {
            if (!this.hasNotified) {
                this.hasNotified = true;
                this.notify();
            }
        }

        try {
            if (interrupt) {
                this.thread.interrupt();
            }

            long beginTime = System.currentTimeMillis();
            if (!this.thread.isDaemon()) {
                this.thread.join(this.getJointime());
            }
            long eclipseTime = System.currentTimeMillis() - beginTime;
            System.out.println("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
                    + this.getJointime());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public long getJointime() {
        return JoinTime;
    }

    public void stop() {
        this.stop(false);
    }

    public void stop(final boolean interrupt) {
        this.stoped = true;
        System.out.println("stop thread " + this.getServiceName() + " interrupt " + interrupt);
        synchronized (this) {
            if (!this.hasNotified) {
                this.hasNotified = true;
                this.notify();
            }
        }

        if (interrupt) {
            this.thread.interrupt();
        }
    }

    public void makeStop() {
        this.stoped = true;
        System.out.println("makestop thread " + this.getServiceName());
    }

    public void wakeup() {
        synchronized (this) {
            if (!this.hasNotified) {
                this.hasNotified = true;
                this.notify();
            }
        }
    }

    protected void waitForRunning(long interval) {
        synchronized (this) {
            if (this.hasNotified) {
                this.hasNotified = false;
                this.onWaitEnd();
                return;
            }

            try {
                this.wait(interval);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                this.hasNotified = false;
                this.onWaitEnd();
            }
        }
    }

    protected void onWaitEnd() {
    }

    public boolean isStoped() {
        return stoped;
    }
}

使用方法:

继承这个类,需要实现两个方法,一个来自runnable接口的run方法,一个是来自ServiceThread 的getServiceName方法。

getServiceName方法主要是未来用来初始化线程。在上面标黄的位置可以看到。

构造完对象之后调用抽象父类ServiceThread的start方法就能启动线程了(上面标黄红字部分)。

你还可以通过这篇文章来观察一下,在rocketmq中一个完整的使用流程是怎样的:

http://www.cnblogs.com/guazi/p/6850988.html

 

 

 

 

 

以上是关于rocketmq的线程服务基类的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ - 如何用死信队列解决消费者异常

配置 kafka 同步刷盘

Motan在服务provider端用于处理request的线程池

二本学渣考研失败,java线程面试题

rocketmq入门笔记

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段