ElasticSearch Index操作源码分析
Posted hapjin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch Index操作源码分析相关的知识,希望对你有一定的参考价值。
ElasticSearch Index操作源码分析
本文记录ElasticSearch创建索引执行源码流程。从执行流程角度看一下创建索引会涉及到哪些服务(比如AllocationService、MasterService),由于本人对分布式系统理解不是很深,所以很多一些细节原理也是不懂。
curl -X PUT "localhost:9200/twitter"
ElasticSearch服务器端收到Client的创建索引请求后,是从org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction
开始执行索引创建流程的。
创建索引是需要ElasticSearch Master节点参与的,因此TransportCreateIndexAction继承了TransportMasterNodeAction,而创建索引的具体操作由实例属性MetaDataCreateIndexService完成。
/**
* Create index action.
*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {
//
private final MetaDataCreateIndexService createIndexService;
在MetaDataCreateIndexService.createIndex(...)
调用onlyCreateIndex方法执行创建索引操作。
public void createIndex(...)
{
onlyCreateIndex(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards
}
Creates an index in the cluster state and waits for the specified number of shard copies to become active as specified in CreateIndexClusterStateUpdateRequest#waitForActiveShards()before sending the response on the listener.
创建索引需要检查 Active shards,默认情况下:只要Primary Shard是Active的,就可以创建索引。如果Active shards未达到指定的数目,则会创建索引请求会阻塞,直到集群中Active shards恢复到指定数目或者超时返回。可参考:ActiveShardsObserver#waitForActiveShards(...)
方法。
索引的创建封装在org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.IndexCreationTask#IndexCreationTask
对象中,最终由具有优先级任务队列的线程池PrioritizedEsThreadPoolExecutor执行。
创建索引这样的操作需要通知到集群中各个节点,修改集群的状态,因此IndexCreationTask继承了AckedClusterStateUpdateTask。
在MetaDataCreateIndexService#onlyCreateIndex(...)
提交IndexCreationTask。
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
new IndexCreationTask(logger, allocationService, request, listener, indicesService, aliasValidator, xContentRegistry, settings,
this::validate));
跟踪submitStateUpdateTasks(...)调用栈,在org.elasticsearch.cluster.service.MasterService#submitStateUpdateTasks(...)
方法中lambda map函数将IndexCreationTask对象转换可供线程池执行的Runnable任务:Batcher.UpdateTask。
public <T> void submitStateUpdateTasks(...,Map<T, ClusterStateTaskListener> tasks,...)
{
try {
List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor))
.collect(Collectors.toList());
//taskBatcher org.elasticsearch.cluster.service.TaskBatcher
taskBatcher.submitTasks(safeTasks, config.timeout());
}
}
//PrioritizedEsThreadPoolExecutor execute(...)提交创建索引任务
public abstract class TaskBatcher {
private final PrioritizedEsThreadPoolExecutor threadExecutor;
public void submitTasks(...){
if (timeout != null) {
threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
} else {
threadExecutor.execute(firstTask);
}
}
}
org.elasticsearch.cluster.service.MasterService.Batcher.UpdateTask
的继承关系如下:可以看出它是一个Runnable任务,创建索引操作最终由PrioritizedEsThreadPoolExecutor线程池提交任务执行。
PrioritizedEsThreadPoolExecutor扩充自ThreadPoolExecutor,参考这个类的源代码,可以了解ElasticSearch是如何自定义一个带有任务优先级队列的线程池的,也可以学习一些如何扩展线程池的功能。
跟踪threadExecutor.execute(...)
代码,
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
//给Runnable任务再添加一些额外的功能,比如优先级
command = wrapRunnable(command);
//
doExecute(command);
}
//EsThreadPoolExecutor
protected void doExecute(final Runnable command) {
try {
super.execute(command);//提交任务
}catch (EsRejectedExecutionException ex) {
if (command instanceof AbstractRunnable) {
// If we are an abstract runnable we can handle the rejection
// directly and don‘t need to rethrow it.
try {
((AbstractRunnable) command).onRejection(ex);
} finally {
((AbstractRunnable) command).onAfter();
}
}
当然了,由于PrioritizedEsThreadPoolExecutor扩展自ThreadPoolExecutor,最终的执行是在:ThreadPoolExecutor的内部类Worker#runWorker(Worker w)
中执行。可参考探究ElasticSearch中的线程池实现中的第3点分析。
上面分析的是线程执行流程,而具体的业务逻辑代码(创建索引更新集群的状态信息)在Runnable#run()
中,也就是org.elasticsearch.cluster.service.TaskBatcher.BatchedTask#run()
方法中。
//BatchedTask
public void run() {runIfNotProcessed(this);}
void runIfNotProcessed(BatchedTask updateTask) {
//任务的判断、检查是否重复、是否已经执行过了……
//忽略其他无关代码....
run(updateTask.batchingKey, toExecute, tasksSummary);
}
/**
* Action to be implemented by the specific batching implementation
* All tasks have the given batching key.
*/
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
抽象run(...)具体实现在:org.elasticsearch.cluster.service.MasterService.Batcher#run
@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
//TaskInputs Represents a set of tasks to be processed together with their executor
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
}
//最终节点状态更新信息实现逻辑
protected void runTasks(TaskInputs taskInputs) {
final ClusterState previousClusterState = state();
//改变集群的状态(各个分片的处理逻辑)
TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS);
//将变化了的状态同步给其他节点
if (taskOutputs.clusterStateUnchanged()) {
//未检测到集群状态信息变化
}else{
ClusterState newClusterState = taskOutputs.newClusterState;
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
//Returns the DiscoveryNodes.Delta between the previous cluster state and the new cluster state.
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
}
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodeSummary = nodesDelta.shortSummary();
if (nodeSummary.length() > 0) {
logger.info("{}, reason: {}", summary, nodeSummary);
}
}
//Called when the result of the ClusterStateTaskExecutor#execute(ClusterState, List) have
//been processed properly by all listeners.
taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);
//Callback invoked after new cluster state is published
taskOutputs.clusterStatePublished(clusterChangedEvent);
}
在这行代码:TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS);
输入创建索引任务,输出集群状态变化结果。
public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
//...
}
protected ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs,...){
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
//ShardStartedClusterStateTaskExecutor#execute
clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
}
public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState, List<StartedShardEntry> tasks)
{
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
for (StartedShardEntry task : tasks) {
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
//....省略其他代码
shardRoutingsToBeApplied.add(matched);
}
maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied);
builder.successes(tasksToBeApplied);
}
最终是在org.elasticsearch.cluster.action.shard.ShardStateAction.ShardStartedClusterStateTaskExecutor#execute
方法里面更新各个分片的状态,具体实现逻辑我也不是很懂。里面涉及到:ShardRouting路由表、AllocationService。
AllocationService manages the node allocation of a cluster. For this reason the AllocationService keeps AllocationDeciders to choose nodes for shard allocation. This class also manages new nodes joining the cluster and rerouting of shards.
总结
- ElasticSearch Index 操作(Action) 会转化成Runnable的任务,提交给线程池异步执行,创建索引。
- 创建索引涉及集群状态的变化,因此会创建一个更新任务,更新集群状态。
- 最大的收获还是将各种任务提交给线程池执行的这种设计思路。
关于ElasticSearch Index操作的流程,参考ElasticSearch 索引 剖析
ElasticSearch源码阅读相关文章:
- 探究ElasticSearch中的线程池实现
- ElasticSearch 启动时加载 Analyzer 源码分析
- Elasticsearch6.3.2启动过程源码阅读记录
- Elasticsearch High Level Rest Client 发起请求的过程分析
原文:https://www.cnblogs.com/hapjin/p/10219219.html
以上是关于ElasticSearch Index操作源码分析的主要内容,如果未能解决你的问题,请参考以下文章
《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析
《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析
Elasticsearch es primary shard 主分片 PrimaryOperationTransportHandler