FlinkFlink Sort-Shuffle写流程简析
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink Sort-Shuffle写流程简析相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink Sort-Shuffle写流程简析 转载并且补充。
2.配置
taskmanager.network.sort-shuffle.min-parallelism
核心配置。设置Hash Shuffle和Sort Shuffle的分界。并发数大于该值时,使用Sort Shuffle。默认是int最大值,即使用Hash Shuffle。
taskmanager.network.sort-shuffle.min-buffers
每个Sort Shuffle的Result Partition使用的最小buffer数,默认64,推荐生产使用2048,但可能需要增大网络内存配置。
taskmanager.network.blocking-shuffle.compression.enabled
是否启用压缩
3.初始创建
实现类:SortMergeResultPartition
类的创建在ResultPartitionFactory
的create()
方法中,根据不同的配置,会选择使用不同的ResultPartition
,总共三种:PipelinedResultPartition
、SortMergeResultPartition
、BoundedBlockingResultPartition
。其中PipelinedResultPartition
用于流模式,其他的用于批模式。
创建SortMergeResultPartition的分支条件如下
else if (type == ResultPartitionType.BLOCKING
|| type == ResultPartitionType.BLOCKING_PERSISTENT)
/**
* 在ResultPartitionFactory的create()方法中,根据不同的配置,会选择使用不同的
* ResultPartition,总共三种:PipelinedResultPartition、SortMergeResultPartition、
* BoundedBlockingResultPartition。其中PipelinedResultPartition用于流模式,
* 其他的用于批模式。
*/
if (numberOfSubpartitions >= sortShuffleMinParallelism)
partition =
new SortMergeResultPartition(
taskNameWithSubtaskAndId,
partitionIndex,
id,
type,
subpartitions.length,
maxParallelism,
batchShuffleReadBufferPool,
batchShuffleReadIOExecutor,
partitionManager,
channelManager.createChannel().getPath(),
bufferCompressor,
bufferPoolFactory);
4、成员变量
NUM_WRITE_BUFFER_BYTES
int类型的数值,表示为数据写入设置的buffer大小,目前固定16M,不可配置。
resultFile
PartitionedFile类型,是Sort-Merge Shuffle的持久化文件代表,包含两个文件:.shuffle.data、.shuffle.index。文件根目录在tmp。
数据文件内分为多个区域,每个区域内,相同的子分区的数据相邻存储。索引条目是(long,int)类型,long代表文件偏移量,int代表buffer数量。
writeSegments
List<MemorySegment>
类型,从网络buffer切出来的用于数据写入的buffer。其中,numRequiredBuffer来源在ResultPartitionFactory中,根据shuffle类型,选择不同的值,其值来源配置:taskmanager.network.sort-shuffle.min-buffers
int expectedWriteBuffers = NUM_WRITE_BUFFER_BYTES / networkBufferSize;
if (networkBufferSize > NUM_WRITE_BUFFER_BYTES)
expectedWriteBuffers = 1;
int numRequiredBuffer = bufferPool.getNumberOfRequiredMemorySegments();
int numWriteBuffers = Math.min(numRequiredBuffer / 2, expectedWriteBuffers);
int numRequiredBuffers =
!type.isPipelined() && numberOfSubpartitions >= sortShuffleMinParallelism
? sortShuffleMinBuffers
: numberOfSubpartitions + 1;
networkBufferSize
int类型的数值,网络缓冲区和写缓冲区的大小(buffer的大小),其值来源pageSize,由taskmanager.memory.segment-size设定
fileWriter
PartitionedFileWriter类型,此ResultPartition的文件输出器。
subpartitionOrder
int[]类型,分区的顺序,用于写入数据文件时的分区顺序。
readScheduler
SortMergeResultPartitionReadScheduler类型,分区数据读取调度器。
numBuffersForSort
int类型的数值,unicastSortBuffer和broadcastSortBuffer可使用的buffer数。
broadcastSortBuffer
SortBuffer类型,用于broadcastRecord使用的buffer
unicastSortBuffer
SortBuffer类型,用于飞broadcastRecord使用的buffer
5.写shuffle文件
基于数据收发的内容,数据发送按RecordWriteOutput的collect方法开始
RecordWriteOutput.collect()
->pushToRecordWriter()
->RecordWriter.emit()
->ResultPartitionWriter.emitRecord()
->SortMergeResultPartition.emitRecord()
5.1 获取SortBuffer
首先判断是否是Broadcast数据,然后根据条件,创建新的buffer并返回
private void emit(
ByteBuffer record, int targetSubpartition, DataType dataType, boolean isBroadcast)
throws IOException
checkInProduceState();
SortBuffer sortBuffer = isBroadcast ? getBroadcastSortBuffer() : getUnicastSortBuffer();
if (sortBuffer.append(record, targetSubpartition, dataType))
return;
getUnicastSortBuffer()方法中,主要做两件事:1、flush Broadcast的buffer;2、创建新的buffer并返回。
private SortBuffer getBroadcastSortBuffer() throws IOException
flushUnicastSortBuffer();
if (broadcastSortBuffer != null && !broadcastSortBuffer.isFinished())
return broadcastSortBuffer;
broadcastSortBuffer =
new PartitionSortedBuffer(
lock,
bufferPool,
numSubpartitions,
networkBufferSize,
numBuffersForSort,
subpartitionOrder);
return broadcastSortBuffer;
5.2、追加数据
此步骤将产生的数据写入上一节产生的SortBuffer当中。注意这边的判断条件,当数据过大没有足够buffer写入时才会向下执行,否则写入完成后退出方法
private void emit(
ByteBuffer record, int targetSubpartition, DataType dataType, boolean isBroadcast)
throws IOException
checkInProduceState();
SortBuffer sortBuffer = isBroadcast ? getBroadcastSortBuffer() : getUnicastSortBuffer();
// 这一句
if (sortBuffer.append(record, targetSubpartition, dataType))
return;
这里调用org.apache.flink.runtime.io.network.partition.PartitionSortedBuffer#append
@Override
public boolean append(ByteBuffer source, int targetChannel, DataType dataType)
throws IOException
checkArgument(source.hasRemaining(), "Cannot append empty data.");
checkState(!isFinished, "Sort buffer is already finished.");
checkState(!isReleased, "Sort buffer is already released.");
int totalBytes = source.remaining();
// return false directly if it can not allocate enough buffers for the given record
if (!allocateBuffersForRecord(totalBytes))
return false;
写入数据的时候会在前部先写入一个元数据信息
// return false directly if it can not allocate enough buffers for the given record
if (!allocateBuffersForRecord(totalBytes))
return false;
// write the index entry and record or event data
// 写入数据的时候会在前部先写入一个元数据信息
writeIndex(targetChannel, totalBytes, dataType);
writeRecord(source);
++numTotalRecords;
numTotalBytes += totalBytes;
5.3、buffer不足的处理
此步骤是4.2步骤buffer不足的后续处理,如果数据已经全部读出,则释放该buffer并采用其他方式写入过大的数据
private void emit(
ByteBuffer record, int targetSubpartition, DataType dataType, boolean isBroadcast)
throws IOException
checkInProduceState();
SortBuffer sortBuffer = isBroadcast ? getBroadcastSortBuffer() : getUnicastSortBuffer();
if (sortBuffer.append(record, targetSubpartition, dataType))
return;
// 此步骤是4.2步骤buffer不足的后续处理,如果数据已经全部读出,则释放该buffer并采用其他方式写入过大的数据
if (!sortBuffer.hasRemaining())
// the record can not be appended to the free sort buffer because it is too large
sortBuffer.finish();
sortBuffer.release();
writeLargeRecord(record, targetSubpartition, dataType, isBroadcast);
return;
5.4、buffer不足数据未读完
此步骤接续4.3,当buffer不足以写入新数据且数据未被写入shuffle文件时,增加shuffle出文件的操作并重新调用写数据方法
// 此步骤接续4.3,当buffer不足以写入新数据且数据未被写入shuffle文件时,增加shuffle出文件的操作并重新调用写数据方法
flushSortBuffer(sortBuffer, isBroadcast);
emit(record, targetSubpartition, dataType, isBroadcast);
6.关于排序
PartitionSortedBuffer
是会进行排序的buffer,依赖于内部的MemorySegment
列表作为缓冲。相关的一些成员变量如下,index和segment使用的是同一份MemorySegment列表
/** A segment list as a joint buffer which stores all records and index entries. */
@GuardedBy("lock")
private final ArrayList<MemorySegment> segments = new ArrayList<>();
/** Addresses of the first record's index entry for each subpartition. */
private final long[] firstIndexEntryAddresses;
/** Addresses of the last record's index entry for each subpartition. */
private final long[] lastIndexEntryAddresses;
/** Array index in the segment list of the current available buffer for writing. */
private int writeSegmentIndex;
/** Next position in the current available buffer for writing. */
private int writeSegmentOffset;
6.1、segment申请
根据第四章内容,添加数据有如下调用链:emit()->append()->allocateBuffersForRecord()
allocateBuffersForRecord是申请segment用来存储数据的。当segment不足时,向bufferPool申请新资源。注意初始的时候,segment的列表是空的,所以最初必然是会申请的。
注意,一个segment是可能写多个数据的,如下,writeSegmentOffset是当前segment的写入位置,如果剩余量充足,是会继续写入数据的。
int availableBytes =
writeSegmentIndex == segments.size() ? 0 : bufferSize - writeSegmentOffset;
// return directly if current available bytes is adequate
if (availableBytes >= numBytesRequired)
return true;
6.2、writeIndex
在落地文件层,index和数据是分文件的,在PartitionedFile的定义如下
public static final String DATA_FILE_SUFFIX = ".shuffle.data";
public static final String INDEX_FILE_SUFFIX = ".shuffle.index";
PartitionSortedBuffer的writeIndex方法完成index向segment的写入,详细如下
6.2.1.获取当前可用segment
获取当前可用的segment,内部使用writeSegmentIndex记录segments列表当中segment的下表
MemorySegment segment = segments.get(writeSegmentIndex);
6.2.2、写入index到segment
写入index到segment,一个index是一个long数据,占64位。其中,高32位记录数据长度,低32位记录数据类型。此处用到了long64位、int32位、位运算相关知识。<<是左移符号
// record length takes the high 32 bits and data type takes the low 32 bits
segment.putLong(writeSegmentOffset, ((long) numRecordBytes << 32) | dataType.ordinal());
6.2.3、更新partition最后数据的索引
更新对应partition的最后数据的索引。
lastIndexEntryAddresses是一个列表,大小与分区数对应,每一项记录对应分区的最新数据的索引地址。
索引地址即indexEntryAddress,也是一个long类型的数据,高32位只想segments列表中对应segment的下标,低32位指向segment内部的偏移量。此结构式后续排序的一个基础。
// segment index takes the high 32 bits and segment offset takes the low 32 bits
long indexEntryAddress = ((long) writeSegmentIndex << 32) | writeSegmentOffset;
long lastIndexEntryAddress = lastIndexEntryAddresses[channelIndex];
lastIndexEntryAddresses[channelIndex] = indexEntryAddress;
6…2.4、分区前后数据关联
此步骤是将新数据的索引附加在上一个数据索引的后面,如果没有上一个数据,直接放入firstIndexEntryAddresses,表示当前数据是此分区最早的数据
if (lastIndexEntryAddress >= 0)
// link the previous index entry of the given channel to the new index entry
segment = segments.get(getSegmentIndexFromPointer(lastIndexEntryAddress));
segment.putLong(getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8, indexEntryAddress);
else
firstIndexEntryAddresses[channelIndex] = indexEntryAddress;
以上,getSegmentIndexFromPointer和getSegmentOffsetFromPointer分别获取segment在列表中的下标以及segment内部的偏移量
private int getSegmentIndexFromPointer(long value)
return (int) (value >>> 32);
private int getSegmentOffsetFromPointer(long value)
return (int) (value);
getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8
的意思是:8即8 bytes,也就是64 bit,这是一个数据的索引的长度,也就是在前一个数据的索引后面加入添加上当前数据的索引。
关于segment索引后面预留一个index空间的来源。
成员变量INDEX_ENTRY_SIZE,这是一个4+4+8的值,也就是一个当前索引的长度+预留下一个索引的长度。
6.2.5、更新公共变量的值
此步骤主要更新writeSegmentOffset的值,也就是segment的内部偏移量,可以看到,一次性偏移了两个64位的量,也就是两个索引的位置
// move the write position forward so as to write the corresponding record
updateWriteSegmentIndexAndOffset(INDEX_ENTRY_SIZE);
private void updateWriteSegmentIndexAndOffset(int numBytes)
writeSegmentOffset += numBytes;
// using the next available free buffer if the current is full
if (writeSegmentOffset == bufferSize)
++writeSegmentIndex;
writeSegmentOffset = 0;
6.3、writeRecord
此步骤用于写数据进segment。写数据步骤相对写index简单很多,就是直接将数据不断追加进segment
private void writeRecord(ByteBuffer source)
while (source.hasRemaining())
MemorySegment segment = segments.get(writeSegmentIndex);
int toCopy = Math.min(bufferSize - writeSegmentOffset, source.remaining());
segment.put(writeSegmentOffset, source, toCopy);
// move the write position forward so as to write the remaining bytes or next record
updateWriteSegmentIndexAndOffset(toCopy);
6.4、flushSortBuffer
此步骤用于将buffer中的数据写出到shuffle文件当中
6.4.1、启动新region
shuffle文件是按region存储的,每个region内,相同分区的数据写在一起,不同的region之间不保证。向shuffl文件输出的写操作的实现类是PartitionedFileWriter
fileWriter.startNewRegion(isBroadcast);
此处会调用到PartitionedFileWriter的writeRegionIndex方法,这个方法初次进入不做操作,开启第二个region开始才会进行执行。
private void writeRegionIndex() throws IOException
if (Arrays.stream(subpartitionBuffers).sum() > 0)
for (int channel = 0; channel < numSubpartitions; ++channel)
writeIndexEntry(subpartitionOffsets[channel], subpartitionBuffers[channel]);
currentSubpartition = -1;
++numRegions;
Arrays.fill(subpartitionBuffers, 0);
private void writeIndexEntry(long subpartitionOffset, int numBuffers) throws IOException
if (!indexBuffer.hasRemaining())
if (!extendIndexBufferIfPossible())
flushIndexBuffer();
indexBuffer.clear();
allIndexEntriesCached = false;
indexBuffer.putLong(subpartitionOffset);
indexBuffer.putInt(numBuffers);
相关的PartitionedFileWriter的成员如下
- subpartitionBuffers,分区写入的buffer数
- subpartitionOffsets,分区写入的偏移,也就是记录写入的数据量(bytes)
- indexBuffer,用于写入index的buffer,满了会溢出写到文件,此处写入index文件
- subpartitionBuffers是一个数组,每一项记录了对应分区写出的buffer数,写出数据的时候会增加。此处
Arrays.stream(subpartitionBuffers).sum() > 0
就是判断已经存在文件输出了 - subpartitionOffsets代表数据在文件中的偏移量,写数据的时候会更新,就是统计输出到文件的bytes数
6.4.2、构建基础对象
这一步构建两个基础对象List<BufferWithChannel> toWrite、Queue<MemorySegment> segments
。其中toWrite用于后续向文件输出,segments是基于writeSegments列表克隆出来的一个队列。
private Queue<MemorySegment> getWriteSegments()
synchronized (lock)
checkState(!writeSegments.isEmpty(), "Task has been canceled.");
return new ArrayDeque<>(writeSegments);
6.4.3、copyIntoSegment
这一步是将segment的数据封装进buffer形成一个BufferWithChannel用于后续写出到文件。
6.4.3.1、获取分区号
subpartitionReadOrder列表设置了分区读取顺序,可以自定义;readOrderIndex设置了当前读取的分区
// 获取
int channelIndex = subpartitionReadOrder[readOrderIndex];
// subpartitionReadOrder定义
this.subpartitionReadOrder = new int[numSubpartitions];
if (customReadOrder != null)
checkArgument(customReadOrder.length == numSubpartitions, "Illegal data read order.");
System.arraycopy(customReadOrder, 0, this.subpartitionReadOrder, 0, numSubpartitions);
else
for (int channel = 0; channel < numSubpartitions; ++channel)
this.subpartitionReadOrder[channel] = channel;
6…4.3.2、获取元数据信息
根据其中的数据,反向解析出对应的index信息。此步开始是一个循环调用的操作,注意如果已经读取部分数据并且下一个读的数据是event事件类型,则跳出循环
int sourceSegmentIndex = getSegmentIndexFromPointer(readIndexEntryAddress);
int sourceSegmentOffset = getSegmentOffsetFromPointer(readIndexEntryAddress);
MemorySegment sourceSegment = segments.get(sourceSegmentIndex);
long lengthAndDataType = sourceSegment.getLong(sourceSegmentOffset);
int length = getSegmentIndexFromPointer(lengthAndDataType);
DataType dataType = DataType.values()[getSegmentOffsetFromPointer(lengthAndDataType)];
// return the data read directly if the next to read is an event
if (dataType.isEvent() && numBytesCopied > 0)
break;
bufferDataType = dataType;
// get the next index entry address and move the read position forward
long nextReadIndexEntryAddress = sourceSegment.getLong(sourceSegmentOffset + 8);
sourceSegmentOffset += INDEX_ENTRY_SIZE;
6.4.3.3、copyRecordOrEvent
这一步就是将数据拷贝进克隆出来的segment中,注意这里只拷贝了数据。
6.4.3.4、读同分区下一个数据
前面读出了下一个数据的地址,此处如果当前读取的数据不是分区的最后一个数据,则继续读下一个数据。基于这一步的操作,完成了同分区写在一起的目的。
if (recordRemainingBytes == 0)
// move to next channel if the current channel has been finished
if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex])
updateReadChannelAndIndexEntryAddress();
break;
readIndexEntryAddress = nextReadIndexEntryAddress;
6.4.3.5、封装buffer
这一步将segment封装成Buffer,再进一步添加分区号封装成BufferWithChannel
numTotalBytesRead += numBytesCopied;
Buffer buffer = new NetworkBuffer(target, (buf) -> , bufferDataType, numBytesCopied);
return new BufferWithChannel(buffer, channelIndex);
6.4.4、更新统计数据
这一步是更新统计相关的数据
private void updateStatistics(Buffer buffer, boolean isBroadcast)
numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
long readableBytes = buffer.readableBytes();
numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions : readableBytes);
6.4.5、compressBuffer
这一步根据情况,对buffer做压缩
private BufferWithChannel compressBufferIfPossible(BufferWithChannel bufferWithChannel)
Bu以上是关于FlinkFlink Sort-Shuffle写流程简析的主要内容,如果未能解决你的问题,请参考以下文章