java 简单 SocketPool

Posted googler

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 简单 SocketPool相关的知识,希望对你有一定的参考价值。

package org.rx.socks;

import lombok.extern.slf4j.Slf4j;
import org.rx.common.LogWriter;
import org.rx.common.NQuery;
import org.rx.beans.DateTime;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;

import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

import static org.rx.common.Contract.require;

@Slf4j
public final class SocketPool extends Traceable implements AutoCloseable 
    public static final class PooledSocket implements AutoCloseable 
        private final SocketPool owner;
        private DateTime lastActive;
        public final Socket socket;

        public boolean isConnected() 
            return !owner.isClosed() && !socket.isClosed() && socket.isConnected();
        

        public DateTime getLastActive() 
            return lastActive;
        

        public void setLastActive(DateTime lastActive) 
            this.lastActive = lastActive;
        

        private PooledSocket(SocketPool owner, Socket socket) 
            this.owner = owner;
            this.socket = socket;
            lastActive = DateTime.utcNow();
        

        @Override
        public void close() 
            owner.returnSocket(this);
        
    

    public static final SocketPool Pool = new SocketPool();
    private static final int DefaultConnectTimeout = 30000;
    private static final int DefaultMaxIdleMillis = 120000;
    private static final int DefaultMaxSocketsCount = 64;
    private final ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedDeque<PooledSocket>> pool;
    private volatile int connectTimeout;
    private volatile int maxIdleMillis;
    private volatile int maxSocketsCount;
    private final Timer timer;
    private volatile boolean isTimerRun;

    public int getConnectTimeout() 
        return connectTimeout;
    

    public void setConnectTimeout(int connectTimeout) 
        this.connectTimeout = connectTimeout;
    

    public int getMaxIdleMillis() 
        return maxIdleMillis;
    

    public void setMaxIdleMillis(int maxIdleMillis) 
        if (maxIdleMillis <= 0) 
            maxIdleMillis = DefaultMaxIdleMillis;
        
        this.maxIdleMillis = maxIdleMillis;
    

    public int getMaxSocketsCount() 
        return maxSocketsCount;
    

    public void setMaxSocketsCount(int maxSocketsCount) 
        if (maxSocketsCount < 0) 
            maxSocketsCount = 0;
        
        this.maxSocketsCount = maxSocketsCount;
    

    private SocketPool() 
        pool = new ConcurrentHashMap<>();
        connectTimeout = DefaultConnectTimeout;
        maxIdleMillis = DefaultMaxIdleMillis;
        maxSocketsCount = DefaultMaxSocketsCount;
        String n = "SocketPool";
        timer = new Timer(n, true);
        LogWriter tracer = new LogWriter();
        tracer.setPrefix(n + " ");
        tracer.info("started..");
        setTracer(tracer);
    

    @Override
    protected void freeObjects() 
        clear();
    

    private void runTimer() 
        if (isTimerRun) 
            return;
        
        synchronized (timer) 
            if (isTimerRun) 
                return;
            

            long period = 90000;
            timer.schedule(new TimerTask() 
                @Override
                public void run() 
                    clearIdleSockets();
                
            , period, period);
            isTimerRun = true;
        
        getTracer().info("runTimer..");
    

    private void clearIdleSockets() 
        for (Map.Entry<InetSocketAddress, ConcurrentLinkedDeque<PooledSocket>> entry : NQuery.of(pool.entrySet())) 
            ConcurrentLinkedDeque<PooledSocket> sockets = entry.getValue();
            if (sockets == null) 
                continue;
            

            for (PooledSocket socket : NQuery.of(sockets)) 
                if (!socket.isConnected()
                        || DateTime.utcNow().subtract(socket.getLastActive()).getTotalMilliseconds() >= maxIdleMillis) 
                    sockets.remove(socket);
                    getTracer().info("clear idle socket[local=%s, remote=%s]..",
                            Sockets.getId(socket.socket, false), Sockets.getId(socket.socket, true));
                
            
            if (sockets.isEmpty()) 
                pool.remove(entry.getKey());
            
        
        if (pool.size() == 0) 
            stopTimer();
        
    

    private void stopTimer() 
        synchronized (timer) 
            timer.cancel();
            timer.purge();
            isTimerRun = false;
        
        getTracer().info("stopTimer..");
    

    private ConcurrentLinkedDeque<PooledSocket> getSockets(InetSocketAddress remoteAddr) 
        ConcurrentLinkedDeque<PooledSocket> sockets = pool.get(remoteAddr);
        if (sockets == null) 
            pool.put(remoteAddr, sockets = new ConcurrentLinkedDeque<>());
            runTimer();
        
        return sockets;
    

    public PooledSocket borrowSocket(InetSocketAddress remoteAddr) 
        checkNotClosed();
        require(remoteAddr);

        boolean isExisted = true;
        ConcurrentLinkedDeque<PooledSocket> sockets = getSockets(remoteAddr);
        PooledSocket pooledSocket;
        if ((pooledSocket = sockets.pollFirst()) == null) 
            Socket sock = new Socket();
            try 
                sock.connect(remoteAddr, connectTimeout);
             catch (IOException ex) 
                throw new SocketException(remoteAddr, ex);
            
            pooledSocket = new PooledSocket(this, sock);
            isExisted = false;
        
        if (!pooledSocket.isConnected()) 
            if (isExisted) 
                sockets.remove(pooledSocket);
            
            return borrowSocket(remoteAddr);
        
        Socket sock = pooledSocket.socket;
        getTracer().info("borrow %s socket[local=%s, remote=%s]..", isExisted ? "existed" : "new",
                Sockets.getId(sock, false), Sockets.getId(sock, true));
        return pooledSocket;
    

    public void returnSocket(PooledSocket pooledSocket) 
        checkNotClosed();
        require(pooledSocket);

        String action = "return";
        try 
            if (!pooledSocket.isConnected()) 
                action = "discard closed";
                return;
            
            pooledSocket.setLastActive(DateTime.utcNow());
            ConcurrentLinkedDeque<PooledSocket> sockets = getSockets(
                    (InetSocketAddress) pooledSocket.socket.getRemoteSocketAddress());
            if (sockets.size() >= maxSocketsCount || sockets.contains(pooledSocket)) 
                action = "discard contains";
                return;
            

            sockets.addFirst(pooledSocket);
         finally 
            Socket sock = pooledSocket.socket;
            getTracer().info("%s socket[local=%s, remote=%s]..", action, Sockets.getId(sock, false),
                    Sockets.getId(sock, true));
        
    

    public void clear() 
        checkNotClosed();

        for (Socket socket : NQuery.of(pool.values()).selectMany(p -> p).select(p -> p.socket)) 
            try 
                getTracer().info("clear socket[local=%s, remote=%s]..", Sockets.getId(socket, false),
                        Sockets.getId(socket, true));
                Sockets.close(socket);
             catch (Exception ex) 
                log.error("SocketPool clear", ex);
            
        
        pool.clear();
    

 

以上是关于java 简单 SocketPool的主要内容,如果未能解决你的问题,请参考以下文章

现在学JAVA简单吗?

java简单工厂模式是啥

java基础学习02(简单的java程序)

java的UI程序 简单 排错

Java基础_3.5:简单Java类

一个简单的Java程序代码?