分布式内存网格Hazelcast源码导读

Posted 超人汪小建(seaboat)

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式内存网格Hazelcast源码导读相关的知识,希望对你有一定的参考价值。

去年项目需要看了hazelcast源码,当时记录的笔记。



Node是节点的抽象,里面包含节点引擎、客户端引擎、分区服务、集群服务、组播服务、连接管理、命令管理、组播属性、节点配置、本地成员、tcp地址、组播地址、连接者、节点初始化器、管理中心、安全上下文、


Config类,包含GroupConfig、NetworkConfig、MapConfig、TopicConfig、QueueConfig、MultiMapConfig、ListConfig、SetConfig、ExecutorConfig、SemaphoreConfig、WanReplicationConfig、ServicesConfig、SecurityConfig、ListenerConfig、PartitionGroupConfig、ManagementCenterConfig、SerializationConfig。

GroupConfig集群用户名及密码DEFAULT_GROUP_PASSWORD = "dev-pass";DEFAULT_GROUP_NAME = "dev";
NetworkConfig网络相关配置,包括InterfacesConfig、JoinConfig、SSLConfig等,其中JoinConfig包括MulticastConfig、TcpIpConfig、AwsConfig。
SecurityConfig客户端登陆相关配置。
WanReplicationConfig集群复制配置,包括WanTargetClusterConfig配置,所有目标节点相关配置。

ClusterServiceImpl用于维护集群各个成员节点,实例化时把本地节点加入成员Map中,MulticastService

Node实例化时根据MulticastConfig使用组播加入一个组,MulticastService提供节点基于组播传递的,使用监听模式每当接收到其他节点传播的消息调用监听器的onMessage,传递的参数为JoinMessage,它由序列化模块提供转化,默认NodeMulticastListener监听器,会对集群的节点校验,是否是同个集群用户名密码。

MulticastJoiner用于加入集群节点,创建JoinRequest对象用MulticastService发送给其他成员,不断发送加入集群请求直到node里面的masteraddress,如果配置里面指定了targetAddress就不用使用这种不断发送的方式选举主节点。MulticastService负责发送申请加入的组播消息和组播消息接收及处理的工作。

找主节点方式:循环发送JoinRequest消息向组内发送,如果已加入状态且是master的节点接收到后会向外组播JoinMessage,告诉其他节点我的组成员信息,还没加入集群的成员则将自己节点的master地址设置为master发出的JoinMessage中的地址。所以JoinMessage只有master才会发出。其他已加入组的成员节点接收到JoinMessage类型消息则直接忽略,

JoinMessage包含包版本、地址等信息。

SerializationService序列化转化模块,SerializationServiceBuilder生产者,默认字节存放顺序序列为ByteOrder.BIG_ENDIAN,即使用ByteArrayInputOutputFactory,

Packet为数据包结构,第一字节为版本号,第二字节表示为长度,第三字节表示类型。

SocketConnector会执行连接操作,使用的是阻塞读写。

TcpIpConnectionManager集成ConnectionManager,用于管理TCPIP连接

JoinMessage{
protected byte packetVersion;
    protected int buildNumber;
    protected Address address;
    protected String uuid;
    protected ConfigCheck configCheck;
    protected int memberCount;
}

JoinRequest继承JoinMessage,并添加credentials、tryCount属性。

默认5701作为每个节点对外暴露的端口。

ReadHandler的handle方法会初始化socketReader,集群成员维护的话使用SocketPacketReader读取报文,

OperationServiceImpl类内部类RemoteOperationProcessor用于ResponseOperation对象
response.beforeRun();
response.run();
response.afterRun();
即会调用JoinRequestOperation的run方法,调用ClusterServiceImpl里面的handleJoinRequest方法,完成成员更新工作,接收成功会向源节点返回SetMasterOperation、MemberInfoUpdateOperation等等更新源节点成员及节点joined状态。

里面一切需要处理的都用run方法,会统一处理。

Node.start--TcpIpConnectionManager.start--InSelectorImpl.run(输入即读取选择器,run不断选择可读的对象并获取attachment,调用它的handle方法,这里的attachment是在TcpIpConnection实例化的时候创建readHandle对象并调用start方法把readHandler当做attachment注册到socketChannel里面的,readHandler调用handle方法,如果是集群协议CLUSTER则实例化SocketPacketReader,SocketPacketReader包含一个PacketReader,默认是DefaultPacketReader,用于将套接字读出来的字节转化成Packet包并调用handlePacket方法处理消息包,如果是集群成员消息包则调用handleMemberPacket方法处理,间接调用NodeEngineImpl的handlePacket方法,有三种不同头部类型:Packet.HEADER_OP、Packet.HEADER_EVENT、Packet.HEADER_WAN_REPLICATION,HEADER_OP表示操作类型,会调用operationService的handleOperation方法处理包消息,OperationThread会一直跑调用OperationRunner处理需要执行的Operation,OperationRunner线程数由hazelcast.operation.thread.count设置,如果-1则为机器cpu个数,接着会执行operation的beforeRun、run、operation如果设置了响应的话还要执行产生响应内容在返回给调用者、afterRun方法。)--OutSelectorImpl.run(输出即写入选择器)--SocketAcceptor.run--TcpIpConnection.start(TcpIpConnection包括WriteHandler和ReadHandler)--

private void process(Object task) {
        processedCount++;


        if (task instanceof Operation) {
            processOperation((Operation) task);
            return;
        }


        if (task instanceof Packet) {
            processPacket((Packet) task);
            return;
        }


        if (task instanceof PartitionSpecificRunnable) {
            processPartitionSpecificRunnable((PartitionSpecificRunnable) task);
            return;
        }


        throw new IllegalStateException("Unhandled task type for task:" + task);
    }


设计得不错的是它直接支持各种Operation的传递,并执行,本质也是序列化反序列化,然后调用Operation的beforerun、run、afterrun方法,后面还会自动执行handleResponse方法,此方法用于向其他备份节点同步数据,这部分操作是在operation的afterrun之前完成备份。备份工作由OperationBackupHandler完成,backup方法,备份又分为同步备份和异步备份,相加等于总的需要备份数。map操作默认备份一份数据且是同步的,异步的默认为0.异步备份不会阻塞操作。备份的operation是Backup,它的run会执行PutOperation的run方法,即把数据放到缓存中并修改版本,这里的run不会再执行复制操作。
<hazelcast>
  <map name="default">
    <backup-count>1</backup-count>
    <async-backup-count>1</async-backup-count>
  </map>
</hazelcast>
operation的run同样会在备份节点上执行,putoperation其实就是在本地缓存更新值。备份的过程没有一个ack机制,信息传输的可靠性如何保证?

一个节点调用map的put操作时,会在本节点上缓存这个结果,再把operation传输到对应partition的第一个备份节点(这个节点可能就是自己本地节点)上,第一个节点接收到后备份到第二个节点上,所以默认就只有两个备份数据。所以nearcache缓存是可能存在每个节点上的。

PutOperation的afterRun方法主要是触发一些拦截器,触发各个节点的事件监听器什么的、更新各个备份节点缓存等等。

IOBalancer平衡io读取写入。

packet报文结构,1byte版本+2byte头部+4byte分区+4byte长度+nbyte消息。

主节点一个一个发给其他成员节点关于成员的消息,其他节点进行更新。

ServiceManagerImpl用于管理所有的远程,启动时会注册常用的一些服务,包括如下的服务,什么map、queue什么的
    private void registerCoreServices() {
        Node node = nodeEngine.getNode();
        registerService(ClusterServiceImpl.SERVICE_NAME, node.getClusterService());
        registerService(InternalPartitionService.SERVICE_NAME, node.getPartitionService());
        registerService(ProxyServiceImpl.SERVICE_NAME, nodeEngine.getProxyService());
        registerService(TransactionManagerServiceImpl.SERVICE_NAME, nodeEngine.getTransactionManagerService());
        registerService(ClientEngineImpl.SERVICE_NAME, node.clientEngine);
        registerService(QuorumServiceImpl.SERVICE_NAME, nodeEngine.getQuorumService());
    }


    private void registerDefaultServices(ServicesConfig servicesConfig) {
        registerService(MapService.SERVICE_NAME, createService(MapService.class));
        registerService(LockService.SERVICE_NAME, new LockServiceImpl(nodeEngine));
        registerService(QueueService.SERVICE_NAME, new QueueService(nodeEngine));
        registerService(TopicService.SERVICE_NAME, new TopicService());
        registerService(ReliableTopicService.SERVICE_NAME, new ReliableTopicService(nodeEngine));
        registerService(MultiMapService.SERVICE_NAME, new MultiMapService(nodeEngine));
        registerService(ListService.SERVICE_NAME, new ListService(nodeEngine));
        registerService(SetService.SERVICE_NAME, new SetService(nodeEngine));
        registerService(DistributedExecutorService.SERVICE_NAME, new DistributedExecutorService());
        registerService(AtomicLongService.SERVICE_NAME, new AtomicLongService());
        registerService(AtomicReferenceService.SERVICE_NAME, new AtomicReferenceService());
        registerService(CountDownLatchService.SERVICE_NAME, new CountDownLatchService());
        registerService(SemaphoreService.SERVICE_NAME, new SemaphoreService(nodeEngine));
        registerService(IdGeneratorService.SERVICE_NAME, new IdGeneratorService(nodeEngine));
        registerService(MapReduceService.SERVICE_NAME, new MapReduceService(nodeEngine));
        registerService(ReplicatedMapService.SERVICE_NAME, new ReplicatedMapService(nodeEngine));
        registerService(RingbufferService.SERVICE_NAME, new RingbufferService(nodeEngine));
        registerService(XAService.SERVICE_NAME, new XAService(nodeEngine));
        registerCacheServiceIfAvailable();
    }

Map通过AbstractMapServiceFactory创建,使用MapRemoteService处理远程操作,RemoteService服务有两个方法createDistributedObject和destroyDistributedObject方法,最终是通过MapProxyImpl。Map的partition策略在MapContainer里面。

PutOperation用于执行集群节点的put操作。一致性哈希根据key使用MurmurHash哈希计算出结果,再根据分区数(默认271)取余。每个节点都是使用new ConcurrentHashMap<Data, Record>(1000, 0.75f, 1)存放记录。

成员组MemberGroup一共有若干个组,多少个成员就多少个组,最大的复制节点数为7,如果成员组小于7则使用成员组数量。

address[271][4]

Address[271][7] 
1、addr0 addr9 addr3 addr4 null null null
2、.
.
.
.
.
271、addr1 addr2 addr3 addr4 null null null


对分配的结果尝试重新分配,把过载的组分配一些成员给不足的组,并检测每个成员组内的节点数相差不会超过系数1.1,否则重新分组,尽可能达到均匀。


最终形成一张表,271个分区每个分区都对应着若干个复制的成员地址。

int avgPartitionPerGroup = partitionCount / groupSize;
就是说最多4个组,假如5个结点,分组情况为2,1,1,1,则每个组分到的partition个数为271/4,可能有些组多1个partition。

一共有271条线程处理operation,OperationThread,里面有队列ScheduleQueue,线程会不断处理,ScheduleQueue用于operation执行缓冲队列,里面的有两种队列,normalQueue和priorityQueue,一种用于正常的排队,一种用于设置优先级的队列,take方法会优先从priorityQueue中获取需要优先处理的operation。

有个同步复制、异步复制。

OperationServiceImpl.createInvocationBuilder(此方法有两个,一个用于partition、一个用于指定的address),->  实例化一个InvocationBuilderImpl对象,调用invoke方法会创建Invocation,Invocation包含PartitionInvocation(针对分区)和TargetInvocation(针对指定节点)两种。

在集群中传输一切以Data形式传输。

Map的服务名为hz:impl:mapService。
假如HazelcastInstance执行instance.getMap("customers")则,通过HazelcastInstanceProxy的getMap方法,代理是调HazelcastInstanceImpl的getMap方法,调用getDistributedObject方法,它会通过ProxyService(它代理了所有服务)代理找到MapService,调用mapservice的createDistributedObject方法创建DistributedObject,间接调用MapRemoteService的createDistributedObject方法创建MapProxyImpl,调用MapProxyImpl的put方法把数据放到集群,主要如下操作
final Data key = toData(k, partitionStrategy);
final Data value = toData(v);
final Data result = putInternal(key, value, ttl, timeunit);
return (V) toObject(result);
分别将key和value转化为Data,其实是序列化,方便网络传输。putInternal方法使用PutOperation,并且要根据key计算出partitionId,接着再完成operation的调用。

SerializationServiceImpl提供各种类型的序列化支持,toData提供由object到Data的转化,toObject提供由Data到object的转化,Data默认是使用DefaultData,DefaultData里面其实就是包含了一个字节数组还有不同的偏移量,例如从几位到几位表示类型,序列化工作其实也跟这种类似,把某一对象的相关信息转化为字节数组,传递到目的地后再根据约定反向组装成指定对象。

OperationThread专门用于执行接收到的请求,process方法,可以有三种请求,包括Operation、Packet、PartitionSpecificRunnable,分别不同的处理逻辑,Operation则直接反序列化后调用beforeRun、run、afterRun等方法。

HealthMonitor是独立一条线程用于监控健康,包括
private class HealthMetrics {
        private final long memoryFree;
        private final long memoryTotal;
        private final long memoryUsed;
        private final long memoryMax;
        private final double memoryUsedOfTotalPercentage;
        private final double memoryUsedOfMaxPercentage;
        //following three load variables are always between 0 and 100.
        private final double processCpuLoad;
        private final double systemLoadAverage;
        private final double systemCpuLoad;
        private final int threadCount;
        private final int peakThreadCount;
        private final long clusterTimeDiff;
        private final int asyncExecutorQueueSize;
        private final int clientExecutorQueueSize;
        private final int queryExecutorQueueSize;
        private final int scheduledExecutorQueueSize;
        private final int systemExecutorQueueSize;
        private final int eventQueueSize;
        private final int pendingInvocationsCount;
        private final double pendingInvocationsPercentage;
        private final int operationServiceOperationExecutorQueueSize;
        private final int operationServiceOperationPriorityExecutorQueueSize;
        private final int operationServiceOperationResponseQueueSize;
        private final int runningOperationsCount;
        private final int remoteOperationsCount;
        private final int proxyCount;
        private final int clientEndpointCount;
        private final int activeConnectionCount;
        private final int currentClientConnectionCount;
        private final int connectionCount;
        private final int ioExecutorQueueSize;
}

PerformanceMonitor表示性能监控,监控的参数包括inSelector的已读取数量,outSelect的已写入事件数量,OperationService相关的性能参数,例如挂起的调用比例、总体调用比例、最大调用数量、271个分区线程已执行数量、271个分区operation线程正在挂起线程(即任务排队队列中的数量),常规operation线程的排队队列数量、常规operation线程已执行数量、响应线程已执行数量、响应线程排队队列数量。

hazelcast核心——Node,包含各种各样重要的基础服务,日志、节点关闭钩子、序列化服务、节点引擎、客户端引擎、分区服务、集群服务、广播服务、连接管理服务、命令服务、配置文件服务、群组属性服务、本节点地址、本地集群成员对象、主节点地址、hazelcast实例引用、日志服务、集群节点加入服务、节点扩展服务、管理中心服务、安全上下文、创建信息服务、版本校对服务、hazelcast线程组。
public class Node {
    private final ILogger logger;
    private final NodeShutdownHookThread shutdownHookThread = new NodeShutdownHookThread("hz.ShutdownThread");
    private final SerializationService serializationService;
    public final NodeEngineImpl nodeEngine;
    public final ClientEngineImpl clientEngine;
    public final InternalPartitionService partitionService;
    public final ClusterServiceImpl clusterService;
    public final MulticastService multicastService;
    public final ConnectionManager connectionManager;
    public final TextCommandServiceImpl textCommandService;
    public final Config config;
    public final GroupProperties groupProperties;
    public final Address address;
    public final MemberImpl localMember;
    private volatile Address masterAddress = null;
    public final HazelcastInstanceImpl hazelcastInstance;
    public final LoggingServiceImpl loggingService;
    private final Joiner joiner;
    private final NodeExtension nodeExtension;
    private ManagementCenterService managementCenterService;
    public final SecurityContext securityContext;
    private final ClassLoader configClassLoader;
    private final BuildInfo buildInfo;
    private final VersionCheck versionCheck = new VersionCheck();
    private final HazelcastThreadGroup hazelcastThreadGroup;
}

NodeEngineImpl作为节点引擎包含了许多服务,重要的例如,事件服务、operation服务、执行服务、等待通知服务、service(内置许多service,例如Map、Queue,用户自定义的服务可配置到hazelcast.xml,启动时会加载进来)管理服务、事务管理服务、代理服务、wan复制服务、包传输服务、证明人服务。
public class NodeEngineImpl implements NodeEngine {


    private final Node node;
    private final ILogger logger;
    private final EventServiceImpl eventService;
    private final OperationServiceImpl operationService;
    private final ExecutionServiceImpl executionService;
    private final WaitNotifyServiceImpl waitNotifyService;
    private final ServiceManagerImpl serviceManager;
    private final TransactionManagerServiceImpl transactionManagerService;
    private final ProxyServiceImpl proxyService;
    private final WanReplicationService wanReplicationService;
    private final PacketTransceiver packetTransceiver;
    private final QuorumServiceImpl quorumService;


}

ServiceManagerImpl用于管理所有服务,启动时默认会实例化核心的service、默认的service,如果用户通过配置文件配置了自定义service则也会实例化。启动时注册即将service实例put到ConcurrentMap中,核心service包括ClusterServiceImpl、InternalPartitionService、ProxyServiceImpl、TransactionManagerServiceImpl、ClientEngineImpl、QuorumServiceImpl。默认service包括MapService、LockService、QueueService、TopicService、ReliableTopicService、MultiMapService、ListService、SetService、DistributedExecutorService、AtomicLongService、AtomicReferenceService、CountDownLatchService、SemaphoreService、IdGeneratorService、MapReduceService、ReplicatedMapService、RingbufferService、XAService。如果允许还将把缓存服务CacheService添加进来。
public final class ServiceManagerImpl implements ServiceManager {


    private final ConcurrentMap<String, ServiceInfo> services = new ConcurrentHashMap<String, ServiceInfo>(20, .75f, 1);


}

节点加入,Joiner负责加入工作,例如广播则使用MulticastJoiner、单播则使用TcpIpJoiner、AWS则使用TcpIpJoinerOverAWS。Node启动时会根据情况启动个线程,
multicast只是做节点发现工作,真正的节点加入工作是交由tcpip做,向主节点发送加入请求,主节点把请求节点添加到成员列表中,然后返回请求节点让它把主节点地址设置为本人。

DefaultSerializers包括DateSerializer、ObjectSerializer、ClassSerializer等等序列化器,实现StreamSerializer的read和write方法完成序列化和反序列化处理。

Date.class, new DateSerializer());
        BigInteger.class, new BigIntegerSerializer());
        BigDecimal.class, new BigDecimalSerializer());
        Externalizable.class, new Externalizer(enableCompression));
        Serializable.class, new ObjectSerializer(enableSharedObject, enableCompression));
        Class.class, new ClassSerializer());
        Enum.class, new EnumSerializer());

DataSerializable.class, dataSerializerAdapter);
        Portable.class, portableSerializerAdapter);
        Byte.class, new ByteSerializer());
        Boolean.class, new BooleanSerializer());
        Character.class, new CharSerializer());
        Short.class, new ShortSerializer());
        Integer.class, new IntegerSerializer());
        Long.class, new LongSerializer());
        Float.class, new FloatSerializer());
        Double.class, new DoubleSerializer());
        byte[].class, new TheByteArraySerializer());
        char[].class, new CharArraySerializer());
        short[].class, new ShortArraySerializer());
        int[].class, new IntegerArraySerializer());
        long[].class, new LongArraySerializer());
        float[].class, new FloatArraySerializer());
        double[].class, new DoubleArraySerializer());
        String.class, new StringSerializer());

每个service都有自己的context,例如mapservice的MapServiceContext,它里面是保留了一份partition映射表的副本,在底层完成迁移之前并不会更新,当然底层的map数据也不会一边迁移一边删除,而是复制一份进行删除



喜欢java的同学可以交个朋友:





以上是关于分布式内存网格Hazelcast源码导读的主要内容,如果未能解决你的问题,请参考以下文章

Hazelcast介绍与使用

Spring Boot集成Hazelcast实现集群与分布式内存缓存

Hazelcast集群服务——Hazelcast介绍

Hazelcast是什么

分布式缓存组件Hazelcast

Spring本地缓存的使用方法有哪些?