Alluxio 源码完整解析 | 你不知道的开源数据编排系统(下篇)

Posted Alluxio

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Alluxio 源码完整解析 | 你不知道的开源数据编排系统(下篇)相关的知识,希望对你有一定的参考价值。

回顾

在《Alluxio-源码解析-上》主要讲述了Alluxio本地环境搭建,源码项目结构,服务进程的启动流程和服务间RPC调用。

本篇将在上篇的基础上,继续为大家讲述Alluxio中重点类详解,Alluxio中Block底层读写流程,Alluxio Client调用流程和 Alluxio内置的轻量级调度框架。

Part 1 重点类讲述

1.1****Journaled

Journaled接口定义可被Journaled持久化维护的通用方法,通过JournalEntryIterable#getJournalEntryIterator获取Journal元素遍历信息,该接口提供默认checkpoint方法。Journaled接口继承Checkpointed、JournalEntryIterable,定义的方法包括:
getJournalEntryIterator:获取Journal所有元素;

getCheckpointName:获取checkpoint class类名称;

writeToCheckpoint:持久化写入所有状态的checkpoint;

restoreFromCheckpoint:checkpoint恢复;

processJournalEntry:处理指定的Journal元素,Journal处理核心方法

resetState:重置Journal状态;

applyAndJournal:对Journal元素执行和应用Journal操作。

1.2****UnderFileSystem

Alluxio管理和适配数据在底层各个存储系统执行操作,实现UnderFileSystem接口的底层存储可以作为Alluxio的合法UFS。

1.2.1. 类图

UnderFileSystem的类图如下所示,主要由抽象类BaseUnderFileSystem实现,而BaseUnderFileSystem下主要分为两大类:

ConsistentUnderFileSystem:具备一致性的UFS实现,主要包括:LocalUnderFileSystem、HdfsUnderFileSystem、CephFSUnderFileSystem等;

ObjectUnderFileSystem:对象存储UFS实现,主要包括:S3AUnderFileSystem、COSUnderFileSystem、OSSUnderFileSystem等。

1.2.2. 接口方法

在UnderFileSystem中有两类接口API:
存储系统通用操作,如:创建/删除文件,文件重命名;

处理数据持久化最终一致性的操作(eventual consistency),如:解决当AlluxioMaster维护元数据成功时,但执行UFS操作失败的问题。

1.2.2.1. 存储系统操作
create:指定path路径,在UFS中创建数据文件(父目录不存在会自动创建),可通过CreateOptions设置创建文件的用户组和ACL策略;

deleteDirectory:删除指定目录,可通过DeleteOptions设置删除的策略和遍历方式;

deleteFile:删除指定文件;

getDirectoryStatus:获取UFS指定目录状态,需传入已存在的目录文件;

getFileStatus:获取UFS指定文件状态;

getStatus:获取UFS状态,可指定目录或文件;

isDirectory:判断指定路径在UFS是否是目录;

open:打开UFS上指定文件,可通过OpenOptions设置文件打开参数;

renameDirectory:UFS上指定目录重命名;

renameFile:UFS上指定文件重命名;

exists:判断指定的文件或目录是否存在;

getAclPair:获取UFS的ACL策略;

getBlockSizeByte:获取指定目录下UFS的每个Block文件大小,单位bytes;

getFileLocations:获取指定路径在UFS关联的存储Location列表;

getFingerprint:计算并获取指定路径的文件标识(指纹),文件标识(指纹)的计算必须是确定且不可变的;

getOperationMode:获取底层UFS的操作模式,Alluxio的底层存储可以由多种类型UFS组成,该方法用来确定底层UFS的操作模式,例子:底层UFS为:hdfs://ns1/,hdfs://ns2/,则返回结果:hdfs://ns1/:NO_ACCESS,hdfs://ns2/:READ_WRITE;

getPhysicalStores:获取所有UFS类型,包括数据结构和对应权限

getSpace:通过制定SpaceType获取UFS中指定路径的存储空间信息,SpaceType包括:SPACE_TOTAL、SPACE_FREE、SPACE_USED;

getUnderFSType:获取UFS类型,如hdfs;

isFile:判断文件文件在UFS是否存在;

isObjectStorage:判断UFS是否是对象存储;

isSeekable:判断UFS是否支持搜索;

listStatus:指定UFS路径下的文件状态列表,该列表不保证顺序,可通过ListOptions设置是否支持遍历;

mkdirs:在UFS上创建指定目录,可通过MkdirsOptions设置目录创建规则,如ACL和递归父目录创建;

setAclEntries:指定路径,设置UFS的ALC策略集合;

setMode:指定路径,设置UFS ALC Mode,如0777;

setOwner:指定路径,设置UFS ALC的user和group;

supportsFlush:判断UFS是否支持文件Flush;

supportsActiveSync:判断UFS是否支持ActiveSync(访问内部文件共享),ActiveSync相关的接口包括:getActiveSyncInfo、startSync、stopSync、startActiveSyncPolling、stopActiveSyncPolling。

1.2.2.2. 最终一致性操作
createNonexistingFile:创建不存在的文件,若文件存在,则退出;

deleteExistingDirectory:删除指定目录;

deleteExistingFile:删除指定文件;

getExistingDirectoryStatus:获取UFS指定目录状态;

getExistingFileStatus:获取UFS指定文件状态;

getExistingStatus:获取UFS状态,可指定目录或文件;

isExistingDirectory:判断指定路径在UFS是否是目录;

openExistingFile:打开UFS上指定文件,可通过OpenOptions设置文件打开参数;

renameRenamableDirectory:UFS上指定目录重命名;

renameRenamableFile:UFS上指定文件重命名。

1.2.2.3. 其他操作
cleanup:当数据文件创建时没有正常的成功结束或被抛弃处理,则对底层UFS清理;

connectFromMaster:指定AlluxioMaster主机地址,建立指定Master与UFS连接;

connectFromWorker:指定AlluxioWorker主机地址,建立指定Worker与UFS连接;

resolveUri:给定Alluxio基础URI和路径,返回拼装后的Alluxio路径。

1.3 UfsManager

Alluxio中对底层UFS(Under FileSystem)管理操作的通用统一接口类定义,定义的接口方法包括:
addMount:UFS挂载到Alluxio,该方法仅针对Alluxio处理,不对底层UFS操作;

removeMount:移除Alluxio中的UFS挂载;

get:根据mountId获取挂载的UFS信息;

getRoot:获取Alluxio上挂载的根目录信息;

getJournal:获取Journal的Location地址;

其中AbstractUfsManager抽象类对UFS管理接口进行基本实现。

1.3.1. UfsClient
维护底层UFS的Client连接信息和其他相关UFS的描述信息,基于UfsClient实现Alluxio对UnderFileSystem的操作。
1.4 BlockClient

BlockClient抽象类定义调用方对Block基本的读写操作,其类图示意如下,主要包括:BlockWriter、BlockReader

读写Block的定义的方法类:
1.5 DefaultFileSystemMaster

Master服务维护所有FileSystem(文件系统)元数据变更的管理操作,DefaultFileSystemMaster内部基于InodeTree维护文件系统结构,并将InodeTree持久化到日志文件(journal);除此之外,其内部维护多个管理操作,如:InodeLockManager、MasterUfsManager、MountTable等;
备注:DefaultFil1.5. DefaultFileSystemMastereSystemMaster的启动start方法详情前面所述内容。

1.5.1. 接口方法

FileSystemMaster接口定义master中针对FS的操作方法,DefaultFileSystemMaster继承FileSystemMaster,其接口方法主要包括:
cleanupUfs:周期性清理底层UFS;

getFileId:基于Alluxio路径URI 获取文件ID,若文件不缓存Alluxio,则调用UFS获取;

getFileInfo:根据文件ID获取文件详情,该接口仅对内部服务开放,不对用户直接开放;

getPersistenceState:根据文件ID,获取该文件的持久化状态;

listStatus:指定Alluxio路径,获取文件状态列表;

checkAccess:校验指定Alluxio路径的权限;

checkConsistency:校验指定Alluxio路径的文件数据一致性;

completeFile:关闭/结束指定Alluxio路径,关闭后,则该文件不可写;

createFile:基于指定Alluxio文件路径,创建文件FileInfo;

getNewBlockIdForFile:指定Alluxio文件路径,获取下个待操作Block文件的Block ID;

getMountPointInfoSummary:获取Alluxio中mount(挂载)路径的快照信息;

getDisplayMountPointInfo:获取Alluxio用户展示的Mount信息;

delete:删除指定Alluxio路径的文件元信息;

getFileBlockInfoList:获取指定Alluxio路径下的所有Block列表信息;

getInAlluxioFiles:获取Alluxio中所有的文件列表路径;

getInMemoryFiles:获取Alluxio中所有缓存在内存的文件列表路径;

createDirectory:创建Alluxio对应的目录,并返回目录ID;

rename:Alluxio中文件重命名操作的元数据变更;

free:指定Alluxio目录下,释放所有alluxio缓存的block文件信息,支持目录下遍历的文件释放;

getPath:根据指定FileId获取Alluxio URI路径;

getPinIdList:获取被固定的inode id列表;

getUfsAddress:获取master所需的UFS地址;

getUfsInfo:根据挂载ID获取对应UFS信息;

getLostFiles:获取worker节点丢失的文件列表;

mount:核心操作,将UFS路径挂载在Alluxio指定路径;

unmount:取消指定Alluxio路径上的UFS挂载;

updateMount:更新指定Alluxio路径挂载信息;

setAcl:设置Alluxio路径ACL;

updateUfsMode:设置底层UFS Mode;

validateInodeBlocks:验证inode block信息是否具备完整性;

workerHeartbeat:指定worker ID,通知对应worker进行文件的存储持久化;

getWorkerInfoList:获取所有worker节点信息列表;

getTimeSeries:获取alluxio master中元数据存储的时间版本信息;

1.6 DefaultBlockWorker
1.6.1. 接口

Worker Server针对Block的管理操作,实现接口类:BlockWorker,其接口方法主要包括:
getWorkerId:获取worker id;

abortBlock:丢弃session中临时创建的block文件;

accessBlock:访问指定session和block id下的block信息,该方法可能会在block缓存释放被访问;

commitBlock:提交block到Alluxio的管理空间,待提交的block必须是临时的,当block提交成功之前,block是不支持读写访问;

commitBlockInUfs:将block提交到UFS持久化;

createBlock:在Alluxio管理空间创建block,基于BlockWriter类可对block进行写操作,在block commit提交之前都是临时的;

getTempBlockMeta:获取临时block元数据;

createBlockWriter:基于session和block id创建BlockWriter,用于block的写操作;

getReport:获取worker与master周期性心跳的报告;

getStoreMeta:获取整个block存储的元数据信息,包括block中每个存储目录映射和每层存储的容量情况;

getStoreMetaFull:与getStoreMeta相似,但包括完整的blockId列表,获取代价更高;

getVolatileBlockMeta:根据指定blockId获取block元数据信息;

lockBlock:对block进行加锁操作;

moveBlock:将block从当前存储Location移动到目标Location;当前仅支持分层存储移动;

moveBlockToMedium:block移动并指定对应的存储介质类型(MediumType);

createBlockReader:创建BlockReader进行Block读操作,可读取Alluxio Block和 UFS Block;

createUfsBlockReader:创建BlockReader进行UFS Block读操作;

removeBlock:从Allxuio管理空间移除Block;

requestSpace:为指定block获取存储空间,该block必须为临时block;

unlockBlock:对block去除锁操作;

asyncCache:提交异步缓存请求进行异步的缓存管理;

updatePinList:更新底层block存储占用的pin列表;

getFileInfo:基于指定file id获取文件信息。

1.6.2. TieredBlockStore

BlockStore定义block的存储接口,用于管理本地block存储,其接口核心目的:具体实现BlockWorker中定义的方法类,接口如下:
TieredBlockStore是BlockStore的实现类,实现了Alluxio中核心功能点:分层存储,使得对应的存储对象可基于block形式进行分层存储管理,并对外暴露提供API进行block管理。TieredBlockStore中内置分配算法确定新block的存取和旧block的释放,基于BlockMetadataManager维护分层存储状态、block读写锁管理等元数据信息。

TieredBlockStore是线程安全的,所有基于block级别的操作都需要调用BlockLockManager来获取对应的读写锁,保证该block下的元数据操作和I/O操作是线程安全的。任何block的元数据操作都需要基于BlockMetadataManager来获取元数据的ReentrantReadWriteLock 读写锁。

Allocator接口定义Alluxio中数据管理的分配策略,接口方法:allocateBlockWithView,目前内部有三种实现类:

RoundRobinAllocator:基于round-robin轮训分配,默认从最高层开始分配,当最高层存储空间不足则会到下一层,该分配策略不支持指定存储具体的分层。

MaxFreeAllocator:分配到存储中最大剩余空间,当没有指定具体存储分层,默认从最高层开始分配;

GreedyAllocator:返回满足存储block大小的第一层存储空间,是存储分配的示例类;

其中BlockStoreLocation定义存储block的location地址和分层信息,描述了三个存储维度:存储层别名、对应存储层目录地址,存储层介质信息。

1.6.2.1. createBlock
当存在可用空间(space)时,基于block分配算法创建临时block;特别的:创建block不会触发其他block的销毁释放,通过BlockMetadataAllocatorView 获取只读的Block元数据信息,为Allocator调度提供数据来源,Allocator分配调度后返回StorageDirView对象并创建TempBlockMeta 并通过BlockMetadataManager管理。存储分配后的元数据会基于createBlockFile方法持久化到Block元文件。
Allocator接口定义Alluxio中数据管理的分配策略,接口方法:allocateBlockWithView,目前内部有三种实现类:

RoundRobinAllocator:基于round-robin轮询分配,默认从最高层开始分配,当最高层存储空间不足则会到下一层,该分配策略不支持指定存储具体的分层。

MaxFreeAllocator:分配到存储中最大剩余空间,当没有指定具体存储分层,默认从最高层开始分配;

GreedyAllocator:返回满足存储block大小的第一层存储空间,是存储分配的示例类;

其中BlockStoreLocation定义存储block的location地址和分层信息,描述了三个存储维度:存储层别名、对应存储层目录地址,存储层介质信息。

1.6.2.2. freeSpace
同步方法执行Block缓存存储空间执行立刻删除释放,当所有存储分层的空间释放操作结束后才能支持新Block创建。根据BlockMetadataEvictorView 获取Block存储中可移除的Block信息。判断当前缓存存储中是否满足最小连续空间和最小可用空间,若同时满足,则不进行后续空间清理操作;若不满足,则遍历Block信息,判断是否可清理,若可以清理,则删除对应的Block文件及元数据,通过BlockStoreEventListener事件监听器同步Block释放操作。
BlockStoreEventListener 监听BlockStore中元数据变化成功结束的触发事件,主要包括的接口方法类:
onAccessBlock:访问Block 事件触发;

onAbortBlock:清理和释放临时Block 事件触发;

onCommitBlock:提交临时Block并关联Block的存储信息BlockStoreLocation 事件触发;

onMoveBlockByClient:基于Client移动Block的BlockStoreLocation 事件触发;

onMoveBlockByWorker:基于Worker移动Block的BlockStoreLocation 事件触发;

onRemoveBlockByClient:基于Client移除并释放Block的BlockStoreLocation 事件触发;

onRemoveBlock:移除并释放Block 事件触发;

onBlockLost:Block丢失 事件触发;

onStorageLost:存储目录丢失 事件触发。
1.7 PlanDefinition

Alluxio中内置轻量级角度系统的Job执行计划定义,有两个核心部分,1. PlanDefinition#selectExecutors:该方法在Master节点调用,用于选择执行任务的AlluxioJobWorker,2.PlanDefinition#runTask:在JobWorker中运行指定作业计划。PlanDefinition 主要包括的作业定义实现有:

MoveDefinition:在FileSystemMaster校验层级上触发Block的移动操作;
ReplicateDefinition:在FileSystemMaster校验层级上触发Block的复制操作;

EvictDefinition:在FileSystemMaster校验层级上触发Block释放操作;

PersistDefinition:将Alluxio Block缓存存储持久化到底层UFS;

CompactDefinition:在指定目录下降结构化表的数据文件进行压缩;

MigrateDefinition:Block移动,源和目标Block可以挂载在不同的UFS节点;

LoadDefinition:实现简单的Block文件的Load操作。
1.7.1. TaskExecutorManager

管理JobWorker Task执行器,真正的执行任务通过线程池调用TaskExecutor#run,而TaskExecutor#run底层通过PlanDefinition#runTask 实现;同时TaskExecutorManager内部也管理Task的执行容量和Task生命周期管理,如:获取执行的线程池,对任务执行限流/解除限流,任务启停。

Part 2 Block读写操作

2.1 读操作

BlockWorker RPC服务提供的客户端的读操作,大致流程如下:
BlockWorkerClientServiceHandler.readBlock方法定义Block读取,默认创建请求参数StreamObserverresponseObserver 创建 CallStreamObserver;若支持零拷贝,则使用DataMessageServerStreamObserver

基于CallStreamObserver 创建BlockReadHandler,并调用BlockReadHandler#onReady 开启数据读取,基于线程池提交创建DataReader线程执行;

DataReader是Alluxio用于I/O数据读取的线程类,封装了核心的Alluxio读操作逻辑,(1).获取Alluxio数据输入流DataBuffer;(2)调用CallStreamObserver.onNext触发和监听数据流读取;

DataReader获取DataBuffer是整个读取处理的核心逻辑,判断数据读取来源:Local、UFS,是否进行Block移动实现短路读;

  • 创建打开Block,若请求需要加速(promote=true)则操作BlockWorker.moveBlock,将Block移动到存储更高层;

  • 调用DefaultBlockWorker#createBlockReader 创建BlockReader,判断本地Worker是否可以直接访问,若支持则返回LocalFileBlockReader;若为UFS中,则调用UnderFileSystemBlockReader;

  • 调用BlockReader.transferTo 读取数据,并将I/O封装为NettyDataBuffer返回。

    2.1.1. UnderFileSystemBlockReader
    UnderFileSystemBlockReader 类实现直接从UFS读取并将读取的信息缓存到读取的Worker Block中,大致流程如下:

UfsInputStreamCache.acquire 根据ufs、路径、blockId获取输入流InputStream,若InputStream在缓存中直接获取,若不存在,则根据ufs.openExistingFile 获取底层UFS的文件输入流InputStream;

获取并更新BlockWriter,判断是否存在有对应Block存在,不存在则调用BlockStore.createBlock新建临时Block,并返回对应BlockWriter;

根据第一步骤获取的输入流InputStream和参数offset读取文件,读取的数据:(1).通过BlockWriter写入Block缓存对应Worker;(2).返回调用方读取信息。
备注:
LocalFileBlockReader:基于FileChannel.map方法的I/O操作读取文本文件信息

RemoteBlockReader:基于远端的Worker(非本地Worker)读取,暂不支持;

DelegatingBlockReader根据不同的使用场景,判断和选择使用的BlockReader实现类。

2.1.2. ShortCircuitBlockReadHandler

ShortCircuitBlockReadHandler类是RPC服务实现提供短路读能力,首先Grpc的StreamObserver(观察者模式),一次onNext调用说明一次消息读取,大致的执行流程:

根据OpenLocalBlockRequest获取是否进行加速读取,若加速(promote=true)则调用BlockWorker.moveBlock将存储移动更高层存储分层;

调用BlockWorker.lockBlock 获取Block的读写操作锁,最后BlockWorker.accessBlock获取访问Block

2.2 写操作
BlockWorker RPC服务提供的客户端的写操作,大致流程如下:
BlockWorkerClientServiceHandler.writeBlock方法定义Block写入,默认创建请求参数StreamObserverresponseObserver 创建 CallStreamObserver;若支持零拷贝,则使用 BlockWorkerClientServiceHandler;

基于CallStreamObserver 创建DelegationWriteHandler,并调用DelegationWriteHandler#onCancel关闭数据写操作;调用onNext方法进行数据流监听写操作;

DelegationWriteHandler 根据请求Command类型获取对应的AbstractWriteHandler 实现类:

  • ALLUXIO_BLOCK:BlockWriteHandler,数据仅写入Alluxio Block,基于BlockWriter实现写操作;

  • UFS_FILE:UfsFileWriteHandler,数据仅写入UFS,基于UFS Client创建目录文件并进行I/O操作;

  • UFS_FALLBACK_BLOCK:UfsFallbackBlockWriteHandler,先基于BlockWriteHandler写入Alluxio Block再写入UFS;
    AbstractWriteHandler 抽象类关系如下:
    2.2.1. LocalFileBlockWriter

基于本地Worker写入Block文件信息,调用FileChannel.map

2.2.2. ShortCircuitBlockWriteHandler

ShortCircuitBlockWriterHandler实现短路读的创建本地Block能力,基于onNext调用,大致执行流程:

若仅申请空间资源,则基于BlockWorker.requestSpace 获取Block创建的请求空间资源;

若需创建临时Block,则调用BlockWorker.createBlock创建Block并返回对应Block路径。

Part 3 Catalog管理

AlluxioCatalog进行Alluxio中Catalog管理对象,封装和维护了Alluxio中注册的DB信息及各个DB下的Table等元数据信息,其基本的方法操作如下,包括:获取数据库db信息,db元数据同步,db绑定/解绑等操作。
attachDatabase:将绑定的db元数据信息维护在内存中并同步持久化到Journal中;

syncDatabase:会基于底层udb获取最新元数据database信息,如Hive则调用HMS客户端接口方法IMetaStoreClient#getDatabase获取数据库信息。

Part 4 Client操作

4.1 Client

Client接口抽象定义Alluxio中Client操作,其继承和实现类如下所示,封装了对接各个组件的RPC接口:

FileSystemMasterClient:封装 FileSystemMasterClientServiceHandler 相关RPC调用,进行元数据管理操作

BlockMasterClient:封装 BlockMasterClientServiceHandler相关RPC调用,进行Block管理操作

TableMasterClient:封装 TableMasterClientServiceHandler 相关RPC调用,进行Alluxio Table Catalog管理操作

MetaMasterClient:封装 MetaMasterClientServiceHandler 相关RPC调用

MetaMasterConfigClient:封装 MetaMasterConfigurationServiceHandler 相关RPC调用

JobMasterClient:封装JobMasterClientServiceHandler 相关RPC调用,进行Alluxio Job的调用操作;

4.1.1. FileSystem

Client中定义的文件系统操作接口类,用于元数据管理和数据管理,用户可根据其实现类BaseFileSystem 扩展Client文件操作行为。
FileSystem 中定义的接口方法主要包括以下几类:
checkAccess:检查指定路径权限;

createDirectory:基于AlluxioURI 创建文件目录;

createFile:基于AlluxioURI 创建数据文件;

delete:基于AlluxioURI 删除指定文件/目录;

exists:基于AlluxioURI判断指定文件/目录是否存在;

free:基于AlluxioURI 释放Alluxio空间,但不删除UFS数据文件了;

listStatus:列出AlluxioURI文件/目录信息;

mount/updateMount/unmount:挂载/更新/取消挂载指定AlluxioURI目录;

openFile:打开并读取AlluxioURI文件输入流;

persist:将Alluxio中缓存的数据异步持久化底层UFS;

rename:Alluxio文件重命名。

4.1.2. FileSystemContext

维护Alluxio基于Client进行文件系统操作的上下文信息,通常的,一个Client JVM进程会使用同个FileSystem连接Alluxio,因此Client对象会在不同线程中共享。FileSystemContext 只有当用户需要个性化配置和认证时才被创建,线程共享的Client会针对FileSystemContext维护独立的线程空间,FileSystemContext 线程不共享(线程安全)会增加Client连接的资源使用,因此当用户停止Alluxio操作后,需要关闭FileSystemContext释放资源。

4.1.3. FileInStream/FileOutStream

Client中定义基于Alluxio文件操作的输入/输出流,如下所示:

输出流:AlluxioFileOutStream,Alluxio输出流写入,底层操作BlockOutStream

输入流:AlluxioFileInStream:Alluxio输入流读取,封装了本地/远端节点数据读取,或者直接基于底层UFS;底层操作BlockInStream,LocalCacheFileInStream,AlluxioHdfsInputStream

4.2 AbstractShell
Client的功能可以通过Shell对外提供操作,AbstractShell抽象类定义Alluxio中Shell命令操作,其继承子类包括:
FileSystemShell:Alluxio Shell文件操作入口类;

FileSystemAdminShell:Alluxio文件系统管理操作;

CollectInfo:Alluxio中从所有Woker节点采集信息命令;

TableShell:Alluxio表管理操作;

JobShell:Alluxio执行job管理操作。
4.2.1. CatCommand

以CatCommand为例,简述Alluxio Client进行文件读取的大致流程如:

FileSystemShell接收shell命令,执行"cat"打开文件操作,调用CatCommand.run命令,shell命令支持正则和多目录,对每个指定目录执行自定义实现的runPlainPath操作;

CatCommand#runPlainPath 方法通过getStatus判断文件类型,若为目录则退出,若为文件则基于FileSystem打开文件获取客户端输入流对象FileInStream(AlluxioFileInStream);

基于AlluxioFileInStream#read读取文件内容,URIStatus维护Alluxio中目录和文件元数据快照信息,基于URIStatus获取指定Alluxio文件对应Block信息,通过Client AlluxioBlockStore中维护的Block信息获取BlockInStream(Block输入流);

基于BlockInStream调用输入流读取操作,底层基于Block的数据读取接口DataReader实现,基于DataReader读取Block详情下述的Block读操作。
4.2.2. TouchCommand

以TouchCommand为例,简述Alluxio Client进行文件写入的大致流程如:

FileSystemShell接收shell命令,执行"touch"打开文件操作,调用TouchCommand.run命令,shell命令支持正则和多目录,对每个指定目录执行自定义实现的runPlainPath操作;

TouchCommand#runPlainPath 方法调用FileSystem.createFile 创建文件并在结束后关闭该连接;

FileSystem.createFile的方法详解如下:

  • 基于FileSystemMasterClient获取FileSystemMasterClientServiceHandler 远程的RPC连接信息;

  • 基于FileSystemMasterClient 调用RPC接口创建数据文件(createFile),将新建Alluxio文件元数据信息同步Alluxio Master;

  • FileSystem新建Client端的Alluxio文件输出流对象:AlluxioFileOutStream,其底层调用Block的DataWriter对象进行文件处理;

  • 输出流完成后,执行AlluxioFileOutStream#close方法,调用FileSystemMasterClient#completeFile 判断是否已执行完成,最终基于RPC接口实现completeFile;

Part 5 轻量级调度

Alluxio内部基于AlluxioJobMaster和AlluxioJobWoker实现轻量级内置的Alluxio操作调度,Master负责作业的调度管理,而Worker真正执行作业操作。

5.1调度管理

由前文AlluxioJobMaster启动流程可知,AlluxioJobMaster在启动时会触发JobMaster Server启动,JobMaster内部维护执行计划(plan)的管理追踪器:PlanTracker,用于创建、移除、访问任务作业集合,每个作业都有对应的PlanCoordinator用于分布式作业执行协调。外部服务可通过HTTP和RPC方式调用JobMaster.run 方法根据作业配置(JobConfig)启动并进行作业调度(同步/线程安全的)。JobConfig 定义作业配置接口,分为两类:PlanConfig(单作业执行)、WorkflowConfig(一组作业流执行)。

JobMaster中作业调度管理的大致流程如下:
外部接口可调用JobMaster.run方法触发作业执行,以Plan作业类型为例,调用PlanTracker执行run方法;

PlanTracker先校验并移除已完成的作业,并基于PlanCoordinator创建新的作业实例并启动该作业实例;

PlanCoordinator作业启动流程:

  • 基于JobConfig获取对应的PlanDefinition;

  • 根据可用的Worker列表和PlanDefinition,调用selectExecutors方法获取待执行作业Worker列表;

  • 调用CommandManager提交作业,将作业及待执行作业worker列表信息维护在内存队列中;

最后,Job Master和Job Worker节点通过RPC心跳检测,下发具体的作业信息给Worker执行。

5.2 作业执行
由前文AlluxioJobWorker启动流程可知,AlluxioJobWorker启动时会触发心跳检测线程CommandHandlingExecutor,对接收到的作业执行调度处理,每个作业启动一个线程执行,作业执行大致流程如下:
CommandHandlingExecutor线程启动与JobMaster进行心跳检测,基于JobMasterClient.heartbeat方法获取所有的待执行作业列表;

遍历待执行作业列表,从线程池调用CommandHandler.run线程类执行作业调度,包括的作业类型:启动、取消、注册作业;

CommandHandler启动作业会调用TaskExecutorManager 执行作业,以Future执行TaskExecutor 进行线程级别作业调度;

TaskExecutor真正执行作业调度:

  • 对应作业参数进行反序列化操作;

  • 根据PlanDefinitionRegistry 获取执行Job的PlanDefinition并调动runTask执行作业;

    以PersistDefinition为例,大致说明Job Executor操作,将Alluxio Block存储持久化到底层UFS:
    获取Alluxio的数据存储URI,读取对应的数据输入流in;

获取指定的UFS目标路径,根据UfsClient判断该路径是否存在,若不存在则创建,并基于UnderFileSystem创建输出流out;

根据I/O操作工具类,将数据从数据流拷贝输出流,持久化到UFS。

以上是关于Alluxio 源码完整解析 | 你不知道的开源数据编排系统(下篇)的主要内容,如果未能解决你的问题,请参考以下文章

有哪些你不知道的阅读源码的技巧

你不知道的 requestIdleCallback

Java开发大型互联网-Dubbo源码下你不知道如何实现应用容器内部署

开源大数据:Alluxio 云原生数据编排

深度解析人生重开模拟器?那些你不知道的事。

深度解析人生重开模拟器?那些你不知道的事。