dataX的流量控制
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dataX的流量控制相关的知识,希望对你有一定的参考价值。
参考技术A 这个和流量控制貌似没有关系,其实理解Task的执行逻辑对理解流量控制有很大的帮助。TaskGroupContainer的架构:
TaskGroupContainer由JobContainer#Scheduler利用线程池启动,然后根据Channel的个数启动Task。一个Task会对应一个TaskExecutor,TaskExecutor包含一个Channel,一个ReaderRunner线程和一WriterRunner线程。
BufferedRecordExchanger本身也带有一定的缓存功能,BufferedRecordExchanger是ReaderRunner和WriterRunner是线程私有的,同一时刻只有一个线程操作BufferedRecordExchanger,所以BufferedRecordExchanger不需要同步机制。
如果一次TaskExecutor执行失败会尝试shutdown这个TaskExecutor,TaskExecutor#shutdown会触发ReaderRunner#shutdwon和WriterRunner#shutdown最终完成对BufferedRecordExchanger和Channel中资源的清理。当有一个Task执行失败之后会清除其对应的TaskExecutor,如果Task配置了Fialover(失败自动切换)机制下次重新运行的时候,会重新构建一个TaskExecutor来执行当前的任务。。。并没有复用之前的TaskExecutor。
可以看到做数据转换的地方是在Channel中,dataX中的限流是限制单个Task的对Channel的写入速度Channel的默认实现是MemoryChannel,将所有的数据保存在一个ArrayBlocking中。
Channel中相关数据指标:
可以看到dataX不仅配置了最大的Channel最大容纳的字节数和record数,还配置了写入速度 byteSpeed 的默认值是1MB/s, recordSpeed 的默认值是10000条/s。具体限流逻辑是在Channel#statPush中,每次ReaderRunner执行push之后都会触发这个逻辑。 currentCommunication 是当前Channel注册对应的Task注册在TaskGroup的communication, lastCommunication 是Channel的内部成员变量,用于保存上次push完之后所有数据的技术统计,包括读成功字节数、读成功record数、读失败字节数和读失败record数。每次push完数据之后,如果距上次时间超过了配置的flowControlInterval就会做流量监控,就是相关数据统计除以时间即可得到速度,如果当前的流量超过了配置的速度,ReaderRunner休眠下即可。
以上是关于dataX的流量控制的主要内容,如果未能解决你的问题,请参考以下文章