如何兼顾性能+实时性处理缓冲数据?
Posted tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何兼顾性能+实时性处理缓冲数据?相关的知识,希望对你有一定的参考价值。
我们经常会遇到这样的数据处理应用场景:我们利用一个组件实时收集外部交付给它的数据,并由它转发给一个外部处理程序进行处理。考虑到性能,它会将数据存储在本地缓冲区,等累积到指定的数量后打包发送;考虑到实时性,数据不能在缓冲区存太长的时间,必须设置一个延时时间,一旦超过这个时间,缓冲的数据必须立即发出去。看似简单的需求,如果需要综合考虑性能、线程安全、内存分配,要实现起来还真有点麻烦。这个问题有不同的解法,本文提供一种实现方案。
一、实例演示
二、待处理的批量数据:Batch<T>
三、感知数据处理的时机:BatchChangeToken
四、接收、缓冲、打包和处理数据:Batcher<T>
一、实例演示
我们先来看看最终达成的效果。在如下这段代码中,我们使用一个Batcher<string>对象来接收应用分发给它的数据,该对象最终会在适当的时机处理它们。 调用Batcher<string>构造函数的三个参数分别表示:
- processor:批量处理数据的委托对象,它指向的Process方法会将当前时间和处理的数据量输出到控制台上;
- batchSize:单次处理的数据量,当缓冲的数据累积到这个阈值时会触发数据的自动处理。我们将这个阈值设置为10;
- interval:两次处理处理的最长间隔,我们设置为5秒;
var batcher = new Batcher<string>( processor:Process, batchSize:10, interval: TimeSpan.FromSeconds(5)); var random = new Random(); while (true) var count = random.Next(1, 4); for (var i = 0; i < count; i++) batcher.Add(Guid.NewGuid().ToString()); await Task.Delay(1000); static void Process(Batch<string> batch) using (batch) Console.WriteLine($"[DateTimeOffset.Now]batch.Count items are delivered.");
如上面的代码片段所示,在一个循环中,我们每隔1秒钟随机添加1-3个数据项。从下图中可以看出,Process方法的调用具有两种触发条件,一是累积的数据量达到设置的阈值10,另一个则是当前时间与上一次处理时间间隔超过5秒。
二、待处理的批量数据:Batch<T>
除了上面实例涉及的Batcher<T>,该解决方案还涉及两个额外的类型,如下这个Batch<T>类型表示最终发送的批量数据。为了避免缓冲数据带来的内存分配,我们使用了一个单独的ArrayPool<T>对象来创建池化的数组,这个功能体现在静态方法CreatePooledArray方法上。由于构建Batch<T>对象提供的数组来源于对象池,在处理完毕后必须回归对象池,所以我们让这个类型实现了IDisposable接口,并将这一操作实现在Dispose方法种。在调用ArrayPool<T>对象的Return方法时,我们特意将数组清空。由于提供的数组来源于对象池,所以并不能保证每个数据元素都承载了有效的数据,实现的迭代器和返回数量的Count属性对此作了相应的处理。
public sealed class Batch<T> : IEnumerable<T>, IDisposable where T : class private bool _isDisposed; private int? _count; private readonly T[] _data; private static readonly ArrayPool<T> _pool = ArrayPool<T>.Create(); public int Count get if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>)); if(_count.HasValue) return _count.Value; var count = 0; for (int index = 0; index < _data.Length; index++) if (_data[index] is null) break; count++; return (_count = count).Value; public Batch(T[] data) => _data = data ?? throw new ArgumentNullException(nameof(data)); public void Dispose() _pool.Return(_data, clearArray: true); _isDisposed = true; public IEnumerator<T> GetEnumerator() => new Enumerator(this); IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); public static T[] CreatePooledArray(int batchSize) => _pool.Rent(batchSize); private void EnsureNotDisposed() if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>)); private sealed class Enumerator : IEnumerator<T> private readonly Batch<T> _batch; private readonly T[] _data; private int _index = -1; public Enumerator(Batch<T> batch) _batch = batch; _data = batch._data; public T Current get _batch.EnsureNotDisposed(); return _data[_index]; object IEnumerator.Current => Current; public void Dispose() public bool MoveNext() _batch.EnsureNotDisposed(); return ++_index < _data.Length && _data[_index] is not null; public void Reset() _batch.EnsureNotDisposed(); _index = -1;
三、感知数据处理的时机:BatchChangeToken
Batcher具有两个触发数据处理的设置:缓冲的数据量和两次数据处理之间的最长间隔。当累积的数据量或者当前时间与上一次处理的间隔达到阈值,缓冲的数据将自动被处理。.NET Core经常利用一个IChangeToken作为通知的令牌,为此我们定义了如下这个实现了该接口的BatchChangeToken类型。如下面的代码片段所示,上述两个触发条件体现在两个CancellationToken对象上,我们利用它们创建了对应的CancellationChangeToken对象,最后利用这两个CancellationChangeToken创建了一个CompositeChangeToken对象。这个CompositeChangeToken对象最终被用来实现了IChangeToken接口的三个成员。
internal sealed class BatchChangeToken : IChangeToken private readonly IChangeToken _innerToken; private readonly int _countThreshold; private readonly CancellationTokenSource _expirationTokenSource; private readonly CancellationTokenSource _countTokenSource; private int _counter; public BatchChangeToken(int countThreshold, TimeSpan timeThreshold) _countThreshold = countThreshold; _countTokenSource = new CancellationTokenSource(); _expirationTokenSource = new CancellationTokenSource(timeThreshold); var countToken = new CancellationChangeToken(_countTokenSource.Token); var expirationToken = new CancellationChangeToken(_expirationTokenSource.Token); _innerToken = new CompositeChangeToken(new IChangeToken[] countToken, expirationToken ); public bool HasChanged => _innerToken.HasChanged; public bool ActiveChangeCallbacks => _innerToken.ActiveChangeCallbacks; public IDisposable RegisterChangeCallback(Action<object?> callback, object? state) => _innerToken.RegisterChangeCallback(s => callback(s); _countTokenSource.Dispose(); _expirationTokenSource.Dispose(); , state); public void Increase() Interlocked.Increment(ref _counter); if (_counter >= _countThreshold) _countTokenSource.Cancel();
上述两个CancellationToken来源于对应的CancellationTokenSource,对应的字段为_countTokenSource和_expirationTokenSource。_expirationTokenSource根据设置的数据处理时间间隔创建而成。为了确定缓冲的数据量,我们提供了一个计数器,并利用Increase方法进行计数。在超过设置的数据量时,该方法会调用_expirationTokenSource的Cancel方法。在实现的ActiveChangeCallbacks方法种,我们将针对这两个CancellationTokenSource的释放放在注册的回调中。
四、接收、缓冲、打包和处理数据:Batcher<T>
最终用于打包的Batcher类型定义如下。在构造函数中,我们除了提供上述两个阈值外,还提供了一个Action<Batch<T>>委托完成针对打包数据的处理。通过Add方法接收的数据存储在_data字段返回的数组上,它时通过Batch<T>的静态方法CreatePooledArray提供的。我们使用字段_index表示添加数据在_data数组中存储的位置,并使用InterLocked.Increase方法解决并发问题。
public sealed class Batcher<T> : IDisposable where T : class private readonly Action<Batch<T>> _processor; private T[] _data; private BatchChangeToken _changeToken = default!; private readonly int _batchSize; private int _index = -1; private readonly IDisposable _scheduler; public Batcher(Action<Batch<T>> processor, int batchSize, TimeSpan interval) _processor = processor ?? throw new ArgumentNullException(nameof(processor)); _batchSize = batchSize; _data = Batch<T>.CreatePooledArray(batchSize); _scheduler = ChangeToken.OnChange(() => _changeToken = new BatchChangeToken(_batchSize, interval), OnChange); void OnChange() var data = Interlocked.Exchange(ref _data, Batch<T>.CreatePooledArray(batchSize)); if (data[0] is not null) Interlocked.Exchange(ref _index, -1); _ = Task.Run(() => _processor.Invoke(new Batch<T>(data))); public void Add(T item) if (item is null) throw new ArgumentNullException(nameof(item)); var index = Interlocked.Increment(ref _index); if (index >= _batchSize) SpinWait.SpinUntil(() => _index < _batchSize - 1); Add(item); _data[index] = item; _changeToken.Increase(); public void Dispose() => _scheduler.Dispose();
在构造函数中,我们调用了ChangeToken的静态方法OnChange将数据处理操作绑定到创建的BatchChangeToken对象上,并确保每次发送“数据处理”后将重新创建的BatchChangeToken对象赋值到_changeToken字段上,因为Add放到需要调用它的Increase增加计数。当接收到数据处理通知后,我们会调用Batch<T>的静态方法CreatePooledArray构建一个数组将字段 _data引用的数组替换下来,并将其封装成Batch<T>对象进行处理(如果数据存在)。于此同时,表示添加数据存储索引的_index恢复成-1。Add方法在对_index做自增操作后,如果发现累积的数据量达到阈值,需要等待数据处理完毕。由于数据处理以异步的方式处理,这里的耗时时很低的,所以我们这里选择了自旋的方式等待它完成。
Java基础学习总结(199)—— NacosApolloConfig 配置中心如何选型?
为什么需要配置中心
配置实时生效:
传统的静态配置方式要想修改某个配置只能修改之后重新发布应用,要实现动态性,可以选择使用数据库,通过定时轮询访问数据库来感知配置的变化。轮询频率低感知配置变化的延时就长,轮询频率高,感知配置变化的延时就短,但比较损耗性能,需要在实时性和性能之间做折中。配置中心专门针对这个业务场景,兼顾实时性和一致性来管理动态配置。
配置管理流程:
配置的权限管控、灰度发布、版本管理、格式检验和安全配置等一系列的配置管理相关的特性也是配置中心不可获取的一部分。
开源配置中心基本介绍
目前市面上用的比较多的配置中心有:(按开源时间排序)
Disconf
2014 年 7 月百度开源的配置管理中心,同样具备配置的管理能力,不过目前已经不维护了,最近的一次提交是两年前了。
Spring Cloud Config
2014 年 9 月开源,Spring Cloud 生态组件,可以和 Spring Cloud 体系无缝整合。
Apollo
2016 年 5 月,携程开源的配置管理中心,具备规范的权限、流程治理等特性。
Nacos
2018 年 6 月,阿里开源的配置中心,也可以做 DNS 和 RPC 的服务发现。
配置中心核心概念的对比
由于 Disconf 不再维护,下面对比一下 Spring Cloud Config、Apollo 和 Nacos。Spring Cloud Config、Apollo 和 Nacos 在配置管理领域的概念基本相同,但是也存在一些不同的点,使用配置的过程中会涉及到一些比较重要的概念。
以上是关于如何兼顾性能+实时性处理缓冲数据?的主要内容,如果未能解决你的问题,请参考以下文章
Java基础学习总结(199)—— NacosApolloConfig 配置中心如何选型?