C# 多线程 大量数据实时接收\解析\存储 问题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C# 多线程 大量数据实时接收\解析\存储 问题相关的知识,希望对你有一定的参考价值。

RT,给定情景:有一数据通道以每条3ms的速率向软件传输数据,每条数据可认识是一个BYTE。现需要将收到的数据进行解析,解析后显示在UI,解析的同时需要对原始数据及解析后的数据进行实时存储。
设想思路:设置接收线程A、解析线程 B、存储线程C、调度线程D
要实现的目标:由接收线程A实时接收数据,线程B对A收到的数据进行解析并传给UI,同时C存储A、B的数据。线程D去调度协调A\B\C线程。
具体实现感觉比较有难度,如何能解决多线程问题的同时保证效率?求大牛讲解,最好能举几个小例子,有大神愿意传道授业解惑者可加QQ:69000597.
不吝惜分,能解决问题的,分都给你。

参考技术A 1、定义两个线程安全的队列(System.Collections.Concurrent.ConcurrentQueue<T>)a跟b,其中a用于储存接受的数据,b用于储存要持久化的数据。
2、线程A循环读取数据并储存到队列a中。
3、线程B循环从队列a中读取数据。
3.1、如果读取到数据
3.1.1、将解析前的数据跟解析后的数据赋值给专门储存它们的类c。
3.1.2、将类c添加到队列b中。
3.1.3、将解析后的数据显示到UI线程中。
3.2、如果没有读取到数据,则sleep一定时间。
4、线程C循环从队列b中读取数据。
4.1、如果读取到数据,则储存数据。
4.2、如果没有读取到数据,则sleep一定时间。
5、线程D可以不要,不过假设数据的处理时间过长,将导致队列长度不断增长,所以线程D可以循环判断队列a跟队列b的长度。假设队列中的对象数量超过特定阀值,则进行一定处理。比如终止程序,比如跳过部分数据,比如停止接收数据等。追问

可否举一个小例子呢?对线程安全这块了解的不深。是否需要用到线程锁呢,用那种锁更合适一些?再考虑线程A\B\C等于都在调用同一个数组,怕产生死锁。接收到的数据时数据采集设备发来的连续实时数据。可否加下QQ解答下呢?

追答

好我加你。
如果按照上面的方法写,并且没有引入其它变量的话,不需要加锁。
ConcurrentQueue本身是线程安全的队列。

本回答被提问者采纳
参考技术B 不明白你要解决什么问题。。。
只是存储线程只有一个的话,貌似是不够的,而且数据库很有可能是一个瓶颈追问

不涉及到数据库,所谓的数据通道是通过硬件进行的数据采集得到的高速大量数据。

在c#中多线程处理大量Web请求

【中文标题】在c#中多线程处理大量Web请求【英文标题】:Multithreading a large number of web requests in c# 【发布时间】:2010-11-25 13:59:15 【问题描述】:

我有一个程序,我需要在其中创建大量文件夹到外部共享点站点(外部意味着我不能使用共享点对象模型)。 Web 请求在这方面工作得很好,但一次只做一个(发送请求、等待响应、重复)相当慢。我决定对请求进行多线程处理,以尝试加快速度。程序已经大大加快了速度,但经过一段时间(1-2 分钟左右)后,开始抛出并发异常。

代码如下,这是最好的方法吗?

Semaphore Lock = new Semaphore(10, 10);
List<string> folderPathList = new List<string>();
//folderPathList populated

foreach (string folderPath in folderPathList)

    Lock.WaitOne();
    new Thread(delegate()
    
        WebRequest request = WebRequest.Create(folderPath);
        request.Credentials = DefaultCredentials;
        request.Method = "MKCOL";

        WebResponse response = request.GetResponse();
        response.Close();
        Lock.Release();
    ).Start();

for(int i = 1;i <= 10;i++)

    Lock.WaitOne();

例外情况类似于

未处理的异常:System.Net.WebException:无法连接到远程服务器 ---> System.Net.Sockets.SocketException:每个套接字地址通常只允许使用一次 192.0.0.1:81 在 System.Net.Sockets.Socket.DoConnect(端点 endPointSnapshot,SocketAddre ss socketAddress) 在 System.Net.Sockets.Socket.InternalConnect(EndPoint remoteEP) 在 System.Net.ServicePoint.ConnectSocketInternal(布尔连接失败,Socket s4,Socket s6,Socket& 套接字,IPAddress& 地址,ConnectSocketState 状态, IAsyncResult asyncResult, Int32 timeout, Exception&异常)

【问题讨论】:

您应该发布异常消息,以便这里的人可以帮助您 【参考方案1】:

您可能会创建过多的连接,从而耗尽所有可以使用的本地端口。 关闭端口后可以重用端口有一个超时期限。 WebRequest 为您隐藏了所有低级套接字处理,但我猜它最终会耗尽端口,或者尝试(重新)绑定到已经处于 TIME_WAIT 状态的套接字。

您应该确保阅读响应流,即使您不关心响应。这应该有助于不产生过多的延迟连接。

WebResponse response = request.GetResponse();
new StreamReader(response.GetResponseStream()).ReadToEnd(); 

我会从here粘贴一些相关信息:

当连接关闭时,在关闭连接的一侧,5 元组 Protocol, Local IP, Local Port, Remote IP, Remote Port 默认进入 TIME_WAIT 状态 240 秒。 在这种情况下,协议是固定的——TCP 本地 IP、远程 IP 和远程端口通常也是固定的。所以变量是本地端口。 发生的情况是,当您不绑定时,将使用 1024-5000 范围内的端口。 所以大致你有4000个端口。如果您在 4 分钟内使用所有这些 - 大致意思是您 每秒进行 16 次 Web 服务调用,持续 4 分钟,您将耗尽所有端口。这就是这个异常的原因。

好的,现在如何解决这个问题?

    其中一种方法是增加动态端口范围。默认最大值为 5000。您可以将其设置为 65534。 HKLM\System\CurrentControlSet\Services\Tcpip\Parameters\MaxUserPort是使用的关键。

    您可以做的第二件事是,一旦连接确实进入 TIME_WAIT 状态,您可以减少它的时间 在该状态下,默认为 4 分钟,但您可以将其设置为 30 秒 HKLM\System\CurrentControlSet\Services\Tcpip\Parameters\TCPTimedWaitDelay 是使用的关键。 将此设置为 30 秒

【讨论】:

【参考方案2】:

您没有关闭 Web 请求,这可能会导致连接打开的时间过长。这听起来像是 Parallel.Net 的 Parallel.Foreach 的完美工作,只需确保指明您希望它在多少个线程上运行

  ParallelOptions parallelOptions = new ParallelOptions();

        parallelOptions.MaxDegreeOfParallelism = 10;
        Parallel.ForEach(folderPathList, parallelOptions, folderPathList =>
        
            using(WebRequest request = WebRequest.Create(folderPath))
            
               request.Credentials = DefaultCredentials;
               request.Method = "MKCOL";

               GetResponse request = WebRequest.Create(folderPath);
               request.Credentials = DefaultCredentials;
               request.Method = "MKCOL";
               using (WebResponse response = request.GetResponse());
            
        );

另外要记住的是 maxConnections,一定要在你的 app.config 中设置它:

<configuration>
  <system.net>
    <connectionManagement>
      <add address = "*" maxconnection = "100" />
    </connectionManagement>
  </system.net>
</configuration>

当然,在现实世界的场景中,您必须添加 try-catch 并重试连接,这可能会超时导致更复杂的代码

【讨论】:

啊,这确实是一个很好的解决方案,但不幸的是我使用的是 .NET 3.5 框架。 太糟糕了,WebRequest 不是一次性的 我很困惑,因为我正在关闭请求,但是在 App.config 中设置 maxconnection 为我做到了。谢谢!【参考方案3】:

对于这种 IO 密集型任务,asynchronous programming model 非常有用。但是,在 C# 中使用有点困难。C# 现在也有语言级别的异步支持,你可以试试CTP release。

【讨论】:

【参考方案4】:

试试这个

folderPathList.ToList().ForEach(p =>
        
            ThreadPool.QueueUserWorkItem((o) =>
                 
                     WebRequest request = WebRequest.Create(p);
                     request.Credentials = DefaultCredentials;
                     request.Method = "MKCOL";
                     WebResponse response = request.GetResponse();
                     response.Close(); 
                 );

编辑 - 不同的 webrequest 方法

folderPathList.ToList().ForEach(p =>
        
            ThreadPool.QueueUserWorkItem((o) =>
                 
                     using (WebClient client = new WebClient())
                     
                         client.Credentials = DefaultCredentials;
                         client.UploadString(p, "MKCOL", "");
                     
                 );
        );

【讨论】:

比我当前的解决方案更简洁的代码方式,但这确实会引发相同的错误。 不应该有并发问题,所以我修改了网络请求方法,看看它是否更适合你

以上是关于C# 多线程 大量数据实时接收\解析\存储 问题的主要内容,如果未能解决你的问题,请参考以下文章

asp.net中大量数据并发

C# 事件不会为新对象触发

QT5 关于object基类实现的多线程操作,movetothread方法的使用

foreach循环中的多线程C#

C# Winform 多线程异步委托进度条

C# 到 C++ 多线程,有啥问题吗?