TPL 数据流使用旧数据而不是最新数据

Posted

技术标签:

【中文标题】TPL 数据流使用旧数据而不是最新数据【英文标题】:TPL Dataflow uses old data instead of the newest 【发布时间】:2021-06-05 13:59:37 【问题描述】:

我正在使用 TPL 数据流,以便一次对每个符号执行一项任务。前两个Operation taking... 消息是正确的,但接下来的消息使用的是旧数据。换句话说,它使用下面屏幕截图中标记为绿色的旧数据,而不是最新数据(标记为蓝色的数据)。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects.Spot;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Logging;

namespace SubscribeToCandlesEventFixTest

    public class BinanceSocketHandler
    
        private readonly IBinanceClient _client;
        private readonly IBinanceSocketClient _socketClient;

        public BinanceSocketHandler()
        
            _client = new BinanceClient(new BinanceClientOptions
            
                ApiCredentials = new ApiCredentials("not required", "not required"),
                AutoTimestamp = true,
                AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
#if DEBUG
                LogVerbosity = LogVerbosity.Debug
#endif
            );

            _socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
            
                ApiCredentials = new ApiCredentials("not required", "not required"),
                AutoReconnect = true,
                ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
                LogVerbosity = LogVerbosity.Debug
#endif
            );
        

        private Dictionary<string, ActionBlock<IBinanceStreamKlineData>> _ab = new();

        public async Task StartAsync(CancellationToken cancellationToken)
        
            var symbols = new[]  "TRXUSDT", "BTCUSDT" ;
            var interval = KlineInterval.OneMinute;
            
            foreach (var symbol in symbols)
            
                _ab[symbol] = new ActionBlock<IBinanceStreamKlineData>(
                    async data =>
                    
                        Console.WriteLine($"Operation taking 10 seconds to execute... | Symbol: data.Symbol | Timestamp: data.Data.OpenTime | Price: data.Data.Close");

                        await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
                    ,
                    new ExecutionDataflowBlockOptions
                    
                        MaxDegreeOfParallelism = 1
                    );

                await _socketClient.Spot.SubscribeToKlineUpdatesAsync(symbol, interval,
                    async data =>
                    
                        if (data.Data.Final)
                        
                            Console.WriteLine(
                                $"[DateTime.UtcNow] [data.Symbol] New final candle | Timestamp: data.Data.OpenTime | Price: data.Data.Close");
                        
                        else
                        
                            Console.WriteLine(
                                $"[DateTime.UtcNow] [data.Symbol] Candle update | Timestamp: data.Data.OpenTime | Price: data.Data.Close");

                            // TODO: Use the most up-to-date value
                            await _ab[symbol].SendAsync(data, cancellationToken).ConfigureAwait(false);
                        
                    ).ConfigureAwait(false);
            
        

        public async Task StopAsync()
        
            foreach (var symbol in _ab.Keys)
            
                _ab[symbol].Complete();
                await _ab[symbol].Completion.ConfigureAwait(false);
            
        
    

    class Program
    
        static async Task Main(string[] args)
        
            var test = new BinanceSocketHandler();
            await test.StartAsync(new CancellationToken()).ConfigureAwait(false);

            Console.ReadLine();
        
    


【问题讨论】:

【参考方案1】:

TPL Dataflow 将按顺序处理所有项目;这就是它的目的。您可以尝试使用BroadcastBlock 执行最新的方法,但由于该块链接到另一个块,您可能最终会得到一个正在处理、一个等待处理和第三个真正被覆盖。

如果您希望它比这更严格(即,一个正在处理,一个等待也被覆盖),那么我建议使用 Channels。具体来说,使用BoundedChannelFullMode.DropOldest 的有界通道。

【讨论】:

感谢您的回答!我听从了你关于频道的建议。用频道代码更新了我的问题。为什么它适用于TRX/USDT,却忽略了BTC/USDT? foreach (var item in _channels.Values) 告诉它一次处理一个通道。你可能想要异步并发:var tasks = _channels.Values.Select(async item =&gt; ... ); await Task.WhenAll(tasks); 谢谢!有效!您还有什么可以作为代码提示的吗?

以上是关于TPL 数据流使用旧数据而不是最新数据的主要内容,如果未能解决你的问题,请参考以下文章

TPL数据流处理N条最新消息

用 TPL 数据流中的最新值替换缓冲值

TPL 数据流是并行的或有序的,但不是两者

NSManagedContext 返回旧数据而不是新数据

如何从Kafka中的旧偏移点获取数据?

为网站抓取工具实施的 TPL 数据流