分布式内存网格Hazelcast源码导读
Posted 超人汪小建(seaboat)
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式内存网格Hazelcast源码导读相关的知识,希望对你有一定的参考价值。
去年项目需要看了hazelcast源码,当时记录的笔记。
Node是节点的抽象,里面包含节点引擎、客户端引擎、分区服务、集群服务、组播服务、连接管理、命令管理、组播属性、节点配置、本地成员、tcp地址、组播地址、连接者、节点初始化器、管理中心、安全上下文、
Config类,包含GroupConfig、NetworkConfig、MapConfig、TopicConfig、QueueConfig、MultiMapConfig、ListConfig、SetConfig、ExecutorConfig、SemaphoreConfig、WanReplicationConfig、ServicesConfig、SecurityConfig、ListenerConfig、PartitionGroupConfig、ManagementCenterConfig、SerializationConfig。
GroupConfig集群用户名及密码DEFAULT_GROUP_PASSWORD = "dev-pass";DEFAULT_GROUP_NAME = "dev";
NetworkConfig网络相关配置,包括InterfacesConfig、JoinConfig、SSLConfig等,其中JoinConfig包括MulticastConfig、TcpIpConfig、AwsConfig。
SecurityConfig客户端登陆相关配置。
WanReplicationConfig集群复制配置,包括WanTargetClusterConfig配置,所有目标节点相关配置。
ClusterServiceImpl用于维护集群各个成员节点,实例化时把本地节点加入成员Map中,MulticastService
Node实例化时根据MulticastConfig使用组播加入一个组,MulticastService提供节点基于组播传递的,使用监听模式每当接收到其他节点传播的消息调用监听器的onMessage,传递的参数为JoinMessage,它由序列化模块提供转化,默认NodeMulticastListener监听器,会对集群的节点校验,是否是同个集群用户名密码。
MulticastJoiner用于加入集群节点,创建JoinRequest对象用MulticastService发送给其他成员,不断发送加入集群请求直到node里面的masteraddress,如果配置里面指定了targetAddress就不用使用这种不断发送的方式选举主节点。MulticastService负责发送申请加入的组播消息和组播消息接收及处理的工作。
找主节点方式:循环发送JoinRequest消息向组内发送,如果已加入状态且是master的节点接收到后会向外组播JoinMessage,告诉其他节点我的组成员信息,还没加入集群的成员则将自己节点的master地址设置为master发出的JoinMessage中的地址。所以JoinMessage只有master才会发出。其他已加入组的成员节点接收到JoinMessage类型消息则直接忽略,
JoinMessage包含包版本、地址等信息。
SerializationService序列化转化模块,SerializationServiceBuilder生产者,默认字节存放顺序序列为ByteOrder.BIG_ENDIAN,即使用ByteArrayInputOutputFactory,
Packet为数据包结构,第一字节为版本号,第二字节表示为长度,第三字节表示类型。
SocketConnector会执行连接操作,使用的是阻塞读写。
TcpIpConnectionManager集成ConnectionManager,用于管理TCPIP连接
JoinMessage{
protected byte packetVersion;
protected int buildNumber;
protected Address address;
protected String uuid;
protected ConfigCheck configCheck;
protected int memberCount;
}
JoinRequest继承JoinMessage,并添加credentials、tryCount属性。
默认5701作为每个节点对外暴露的端口。
ReadHandler的handle方法会初始化socketReader,集群成员维护的话使用SocketPacketReader读取报文,
OperationServiceImpl类内部类RemoteOperationProcessor用于ResponseOperation对象
response.beforeRun();
response.run();
response.afterRun();
即会调用JoinRequestOperation的run方法,调用ClusterServiceImpl里面的handleJoinRequest方法,完成成员更新工作,接收成功会向源节点返回SetMasterOperation、MemberInfoUpdateOperation等等更新源节点成员及节点joined状态。
里面一切需要处理的都用run方法,会统一处理。
Node.start--TcpIpConnectionManager.start--InSelectorImpl.run(输入即读取选择器,run不断选择可读的对象并获取attachment,调用它的handle方法,这里的attachment是在TcpIpConnection实例化的时候创建readHandle对象并调用start方法把readHandler当做attachment注册到socketChannel里面的,readHandler调用handle方法,如果是集群协议CLUSTER则实例化SocketPacketReader,SocketPacketReader包含一个PacketReader,默认是DefaultPacketReader,用于将套接字读出来的字节转化成Packet包并调用handlePacket方法处理消息包,如果是集群成员消息包则调用handleMemberPacket方法处理,间接调用NodeEngineImpl的handlePacket方法,有三种不同头部类型:Packet.HEADER_OP、Packet.HEADER_EVENT、Packet.HEADER_WAN_REPLICATION,HEADER_OP表示操作类型,会调用operationService的handleOperation方法处理包消息,OperationThread会一直跑调用OperationRunner处理需要执行的Operation,OperationRunner线程数由hazelcast.operation.thread.count设置,如果-1则为机器cpu个数,接着会执行operation的beforeRun、run、operation如果设置了响应的话还要执行产生响应内容在返回给调用者、afterRun方法。)--OutSelectorImpl.run(输出即写入选择器)--SocketAcceptor.run--TcpIpConnection.start(TcpIpConnection包括WriteHandler和ReadHandler)--
private void process(Object task) {
processedCount++;
if (task instanceof Operation) {
processOperation((Operation) task);
return;
}
if (task instanceof Packet) {
processPacket((Packet) task);
return;
}
if (task instanceof PartitionSpecificRunnable) {
processPartitionSpecificRunnable((PartitionSpecificRunnable) task);
return;
}
throw new IllegalStateException("Unhandled task type for task:" + task);
}
设计得不错的是它直接支持各种Operation的传递,并执行,本质也是序列化反序列化,然后调用Operation的beforerun、run、afterrun方法,后面还会自动执行handleResponse方法,此方法用于向其他备份节点同步数据,这部分操作是在operation的afterrun之前完成备份。备份工作由OperationBackupHandler完成,backup方法,备份又分为同步备份和异步备份,相加等于总的需要备份数。map操作默认备份一份数据且是同步的,异步的默认为0.异步备份不会阻塞操作。备份的operation是Backup,它的run会执行PutOperation的run方法,即把数据放到缓存中并修改版本,这里的run不会再执行复制操作。
<hazelcast>
<map name="default">
<backup-count>1</backup-count>
<async-backup-count>1</async-backup-count>
</map>
</hazelcast>
operation的run同样会在备份节点上执行,putoperation其实就是在本地缓存更新值。备份的过程没有一个ack机制,信息传输的可靠性如何保证?
一个节点调用map的put操作时,会在本节点上缓存这个结果,再把operation传输到对应partition的第一个备份节点(这个节点可能就是自己本地节点)上,第一个节点接收到后备份到第二个节点上,所以默认就只有两个备份数据。所以nearcache缓存是可能存在每个节点上的。
PutOperation的afterRun方法主要是触发一些拦截器,触发各个节点的事件监听器什么的、更新各个备份节点缓存等等。
IOBalancer平衡io读取写入。
packet报文结构,1byte版本+2byte头部+4byte分区+4byte长度+nbyte消息。
主节点一个一个发给其他成员节点关于成员的消息,其他节点进行更新。
ServiceManagerImpl用于管理所有的远程,启动时会注册常用的一些服务,包括如下的服务,什么map、queue什么的
private void registerCoreServices() {
Node node = nodeEngine.getNode();
registerService(ClusterServiceImpl.SERVICE_NAME, node.getClusterService());
registerService(InternalPartitionService.SERVICE_NAME, node.getPartitionService());
registerService(ProxyServiceImpl.SERVICE_NAME, nodeEngine.getProxyService());
registerService(TransactionManagerServiceImpl.SERVICE_NAME, nodeEngine.getTransactionManagerService());
registerService(ClientEngineImpl.SERVICE_NAME, node.clientEngine);
registerService(QuorumServiceImpl.SERVICE_NAME, nodeEngine.getQuorumService());
}
private void registerDefaultServices(ServicesConfig servicesConfig) {
registerService(MapService.SERVICE_NAME, createService(MapService.class));
registerService(LockService.SERVICE_NAME, new LockServiceImpl(nodeEngine));
registerService(QueueService.SERVICE_NAME, new QueueService(nodeEngine));
registerService(TopicService.SERVICE_NAME, new TopicService());
registerService(ReliableTopicService.SERVICE_NAME, new ReliableTopicService(nodeEngine));
registerService(MultiMapService.SERVICE_NAME, new MultiMapService(nodeEngine));
registerService(ListService.SERVICE_NAME, new ListService(nodeEngine));
registerService(SetService.SERVICE_NAME, new SetService(nodeEngine));
registerService(DistributedExecutorService.SERVICE_NAME, new DistributedExecutorService());
registerService(AtomicLongService.SERVICE_NAME, new AtomicLongService());
registerService(AtomicReferenceService.SERVICE_NAME, new AtomicReferenceService());
registerService(CountDownLatchService.SERVICE_NAME, new CountDownLatchService());
registerService(SemaphoreService.SERVICE_NAME, new SemaphoreService(nodeEngine));
registerService(IdGeneratorService.SERVICE_NAME, new IdGeneratorService(nodeEngine));
registerService(MapReduceService.SERVICE_NAME, new MapReduceService(nodeEngine));
registerService(ReplicatedMapService.SERVICE_NAME, new ReplicatedMapService(nodeEngine));
registerService(RingbufferService.SERVICE_NAME, new RingbufferService(nodeEngine));
registerService(XAService.SERVICE_NAME, new XAService(nodeEngine));
registerCacheServiceIfAvailable();
}
Map通过AbstractMapServiceFactory创建,使用MapRemoteService处理远程操作,RemoteService服务有两个方法createDistributedObject和destroyDistributedObject方法,最终是通过MapProxyImpl。Map的partition策略在MapContainer里面。
PutOperation用于执行集群节点的put操作。一致性哈希根据key使用MurmurHash哈希计算出结果,再根据分区数(默认271)取余。每个节点都是使用new ConcurrentHashMap<Data, Record>(1000, 0.75f, 1)存放记录。
成员组MemberGroup一共有若干个组,多少个成员就多少个组,最大的复制节点数为7,如果成员组小于7则使用成员组数量。
address[271][4]
Address[271][7]
1、addr0 addr9 addr3 addr4 null null null
2、.
.
.
.
.
271、addr1 addr2 addr3 addr4 null null null
对分配的结果尝试重新分配,把过载的组分配一些成员给不足的组,并检测每个成员组内的节点数相差不会超过系数1.1,否则重新分组,尽可能达到均匀。
最终形成一张表,271个分区每个分区都对应着若干个复制的成员地址。
int avgPartitionPerGroup = partitionCount / groupSize;
就是说最多4个组,假如5个结点,分组情况为2,1,1,1,则每个组分到的partition个数为271/4,可能有些组多1个partition。
一共有271条线程处理operation,OperationThread,里面有队列ScheduleQueue,线程会不断处理,ScheduleQueue用于operation执行缓冲队列,里面的有两种队列,normalQueue和priorityQueue,一种用于正常的排队,一种用于设置优先级的队列,take方法会优先从priorityQueue中获取需要优先处理的operation。
有个同步复制、异步复制。
OperationServiceImpl.createInvocationBuilder(此方法有两个,一个用于partition、一个用于指定的address),-> 实例化一个InvocationBuilderImpl对象,调用invoke方法会创建Invocation,Invocation包含PartitionInvocation(针对分区)和TargetInvocation(针对指定节点)两种。
在集群中传输一切以Data形式传输。
Map的服务名为hz:impl:mapService。
假如HazelcastInstance执行instance.getMap("customers")则,通过HazelcastInstanceProxy的getMap方法,代理是调HazelcastInstanceImpl的getMap方法,调用getDistributedObject方法,它会通过ProxyService(它代理了所有服务)代理找到MapService,调用mapservice的createDistributedObject方法创建DistributedObject,间接调用MapRemoteService的createDistributedObject方法创建MapProxyImpl,调用MapProxyImpl的put方法把数据放到集群,主要如下操作
final Data key = toData(k, partitionStrategy);
final Data value = toData(v);
final Data result = putInternal(key, value, ttl, timeunit);
return (V) toObject(result);
分别将key和value转化为Data,其实是序列化,方便网络传输。putInternal方法使用PutOperation,并且要根据key计算出partitionId,接着再完成operation的调用。
SerializationServiceImpl提供各种类型的序列化支持,toData提供由object到Data的转化,toObject提供由Data到object的转化,Data默认是使用DefaultData,DefaultData里面其实就是包含了一个字节数组还有不同的偏移量,例如从几位到几位表示类型,序列化工作其实也跟这种类似,把某一对象的相关信息转化为字节数组,传递到目的地后再根据约定反向组装成指定对象。
OperationThread专门用于执行接收到的请求,process方法,可以有三种请求,包括Operation、Packet、PartitionSpecificRunnable,分别不同的处理逻辑,Operation则直接反序列化后调用beforeRun、run、afterRun等方法。
HealthMonitor是独立一条线程用于监控健康,包括
private class HealthMetrics {
private final long memoryFree;
private final long memoryTotal;
private final long memoryUsed;
private final long memoryMax;
private final double memoryUsedOfTotalPercentage;
private final double memoryUsedOfMaxPercentage;
//following three load variables are always between 0 and 100.
private final double processCpuLoad;
private final double systemLoadAverage;
private final double systemCpuLoad;
private final int threadCount;
private final int peakThreadCount;
private final long clusterTimeDiff;
private final int asyncExecutorQueueSize;
private final int clientExecutorQueueSize;
private final int queryExecutorQueueSize;
private final int scheduledExecutorQueueSize;
private final int systemExecutorQueueSize;
private final int eventQueueSize;
private final int pendingInvocationsCount;
private final double pendingInvocationsPercentage;
private final int operationServiceOperationExecutorQueueSize;
private final int operationServiceOperationPriorityExecutorQueueSize;
private final int operationServiceOperationResponseQueueSize;
private final int runningOperationsCount;
private final int remoteOperationsCount;
private final int proxyCount;
private final int clientEndpointCount;
private final int activeConnectionCount;
private final int currentClientConnectionCount;
private final int connectionCount;
private final int ioExecutorQueueSize;
}
PerformanceMonitor表示性能监控,监控的参数包括inSelector的已读取数量,outSelect的已写入事件数量,OperationService相关的性能参数,例如挂起的调用比例、总体调用比例、最大调用数量、271个分区线程已执行数量、271个分区operation线程正在挂起线程(即任务排队队列中的数量),常规operation线程的排队队列数量、常规operation线程已执行数量、响应线程已执行数量、响应线程排队队列数量。
hazelcast核心——Node,包含各种各样重要的基础服务,日志、节点关闭钩子、序列化服务、节点引擎、客户端引擎、分区服务、集群服务、广播服务、连接管理服务、命令服务、配置文件服务、群组属性服务、本节点地址、本地集群成员对象、主节点地址、hazelcast实例引用、日志服务、集群节点加入服务、节点扩展服务、管理中心服务、安全上下文、创建信息服务、版本校对服务、hazelcast线程组。
public class Node {
private final ILogger logger;
private final NodeShutdownHookThread shutdownHookThread = new NodeShutdownHookThread("hz.ShutdownThread");
private final SerializationService serializationService;
public final NodeEngineImpl nodeEngine;
public final ClientEngineImpl clientEngine;
public final InternalPartitionService partitionService;
public final ClusterServiceImpl clusterService;
public final MulticastService multicastService;
public final ConnectionManager connectionManager;
public final TextCommandServiceImpl textCommandService;
public final Config config;
public final GroupProperties groupProperties;
public final Address address;
public final MemberImpl localMember;
private volatile Address masterAddress = null;
public final HazelcastInstanceImpl hazelcastInstance;
public final LoggingServiceImpl loggingService;
private final Joiner joiner;
private final NodeExtension nodeExtension;
private ManagementCenterService managementCenterService;
public final SecurityContext securityContext;
private final ClassLoader configClassLoader;
private final BuildInfo buildInfo;
private final VersionCheck versionCheck = new VersionCheck();
private final HazelcastThreadGroup hazelcastThreadGroup;
}
NodeEngineImpl作为节点引擎包含了许多服务,重要的例如,事件服务、operation服务、执行服务、等待通知服务、service(内置许多service,例如Map、Queue,用户自定义的服务可配置到hazelcast.xml,启动时会加载进来)管理服务、事务管理服务、代理服务、wan复制服务、包传输服务、证明人服务。
public class NodeEngineImpl implements NodeEngine {
private final Node node;
private final ILogger logger;
private final EventServiceImpl eventService;
private final OperationServiceImpl operationService;
private final ExecutionServiceImpl executionService;
private final WaitNotifyServiceImpl waitNotifyService;
private final ServiceManagerImpl serviceManager;
private final TransactionManagerServiceImpl transactionManagerService;
private final ProxyServiceImpl proxyService;
private final WanReplicationService wanReplicationService;
private final PacketTransceiver packetTransceiver;
private final QuorumServiceImpl quorumService;
}
ServiceManagerImpl用于管理所有服务,启动时默认会实例化核心的service、默认的service,如果用户通过配置文件配置了自定义service则也会实例化。启动时注册即将service实例put到ConcurrentMap中,核心service包括ClusterServiceImpl、InternalPartitionService、ProxyServiceImpl、TransactionManagerServiceImpl、ClientEngineImpl、QuorumServiceImpl。默认service包括MapService、LockService、QueueService、TopicService、ReliableTopicService、MultiMapService、ListService、SetService、DistributedExecutorService、AtomicLongService、AtomicReferenceService、CountDownLatchService、SemaphoreService、IdGeneratorService、MapReduceService、ReplicatedMapService、RingbufferService、XAService。如果允许还将把缓存服务CacheService添加进来。
public final class ServiceManagerImpl implements ServiceManager {
private final ConcurrentMap<String, ServiceInfo> services = new ConcurrentHashMap<String, ServiceInfo>(20, .75f, 1);
}
节点加入,Joiner负责加入工作,例如广播则使用MulticastJoiner、单播则使用TcpIpJoiner、AWS则使用TcpIpJoinerOverAWS。Node启动时会根据情况启动个线程,
multicast只是做节点发现工作,真正的节点加入工作是交由tcpip做,向主节点发送加入请求,主节点把请求节点添加到成员列表中,然后返回请求节点让它把主节点地址设置为本人。
DefaultSerializers包括DateSerializer、ObjectSerializer、ClassSerializer等等序列化器,实现StreamSerializer的read和write方法完成序列化和反序列化处理。
Date.class, new DateSerializer());
BigInteger.class, new BigIntegerSerializer());
BigDecimal.class, new BigDecimalSerializer());
Externalizable.class, new Externalizer(enableCompression));
Serializable.class, new ObjectSerializer(enableSharedObject, enableCompression));
Class.class, new ClassSerializer());
Enum.class, new EnumSerializer());
DataSerializable.class, dataSerializerAdapter);
Portable.class, portableSerializerAdapter);
Byte.class, new ByteSerializer());
Boolean.class, new BooleanSerializer());
Character.class, new CharSerializer());
Short.class, new ShortSerializer());
Integer.class, new IntegerSerializer());
Long.class, new LongSerializer());
Float.class, new FloatSerializer());
Double.class, new DoubleSerializer());
byte[].class, new TheByteArraySerializer());
char[].class, new CharArraySerializer());
short[].class, new ShortArraySerializer());
int[].class, new IntegerArraySerializer());
long[].class, new LongArraySerializer());
float[].class, new FloatArraySerializer());
double[].class, new DoubleArraySerializer());
String.class, new StringSerializer());
每个service都有自己的context,例如mapservice的MapServiceContext,它里面是保留了一份partition映射表的副本,在底层完成迁移之前并不会更新,当然底层的map数据也不会一边迁移一边删除,而是复制一份进行删除
喜欢java的同学可以交个朋友:
以上是关于分布式内存网格Hazelcast源码导读的主要内容,如果未能解决你的问题,请参考以下文章