C# 多线程 大量数据实时接收\解析\存储 问题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C# 多线程 大量数据实时接收\解析\存储 问题相关的知识,希望对你有一定的参考价值。
RT,给定情景:有一数据通道以每条3ms的速率向软件传输数据,每条数据可认识是一个BYTE。现需要将收到的数据进行解析,解析后显示在UI,解析的同时需要对原始数据及解析后的数据进行实时存储。
设想思路:设置接收线程A、解析线程 B、存储线程C、调度线程D
要实现的目标:由接收线程A实时接收数据,线程B对A收到的数据进行解析并传给UI,同时C存储A、B的数据。线程D去调度协调A\B\C线程。
具体实现感觉比较有难度,如何能解决多线程问题的同时保证效率?求大牛讲解,最好能举几个小例子,有大神愿意传道授业解惑者可加QQ:69000597.
不吝惜分,能解决问题的,分都给你。
2、线程A循环读取数据并储存到队列a中。
3、线程B循环从队列a中读取数据。
3.1、如果读取到数据
3.1.1、将解析前的数据跟解析后的数据赋值给专门储存它们的类c。
3.1.2、将类c添加到队列b中。
3.1.3、将解析后的数据显示到UI线程中。
3.2、如果没有读取到数据,则sleep一定时间。
4、线程C循环从队列b中读取数据。
4.1、如果读取到数据,则储存数据。
4.2、如果没有读取到数据,则sleep一定时间。
5、线程D可以不要,不过假设数据的处理时间过长,将导致队列长度不断增长,所以线程D可以循环判断队列a跟队列b的长度。假设队列中的对象数量超过特定阀值,则进行一定处理。比如终止程序,比如跳过部分数据,比如停止接收数据等。追问
可否举一个小例子呢?对线程安全这块了解的不深。是否需要用到线程锁呢,用那种锁更合适一些?再考虑线程A\B\C等于都在调用同一个数组,怕产生死锁。接收到的数据时数据采集设备发来的连续实时数据。可否加下QQ解答下呢?
追答好我加你。
如果按照上面的方法写,并且没有引入其它变量的话,不需要加锁。
ConcurrentQueue本身是线程安全的队列。
只是存储线程只有一个的话,貌似是不够的,而且数据库很有可能是一个瓶颈追问
不涉及到数据库,所谓的数据通道是通过硬件进行的数据采集得到的高速大量数据。
在c#中多线程处理大量Web请求
【中文标题】在c#中多线程处理大量Web请求【英文标题】:Multithreading a large number of web requests in c# 【发布时间】:2010-11-25 13:59:15 【问题描述】:我有一个程序,我需要在其中创建大量文件夹到外部共享点站点(外部意味着我不能使用共享点对象模型)。 Web 请求在这方面工作得很好,但一次只做一个(发送请求、等待响应、重复)相当慢。我决定对请求进行多线程处理,以尝试加快速度。程序已经大大加快了速度,但经过一段时间(1-2 分钟左右)后,开始抛出并发异常。
代码如下,这是最好的方法吗?
Semaphore Lock = new Semaphore(10, 10);
List<string> folderPathList = new List<string>();
//folderPathList populated
foreach (string folderPath in folderPathList)
Lock.WaitOne();
new Thread(delegate()
WebRequest request = WebRequest.Create(folderPath);
request.Credentials = DefaultCredentials;
request.Method = "MKCOL";
WebResponse response = request.GetResponse();
response.Close();
Lock.Release();
).Start();
for(int i = 1;i <= 10;i++)
Lock.WaitOne();
例外情况类似于
未处理的异常:System.Net.WebException:无法连接到远程服务器 ---> System.Net.Sockets.SocketException:每个套接字地址通常只允许使用一次 192.0.0.1:81 在 System.Net.Sockets.Socket.DoConnect(端点 endPointSnapshot,SocketAddre ss socketAddress) 在 System.Net.Sockets.Socket.InternalConnect(EndPoint remoteEP) 在 System.Net.ServicePoint.ConnectSocketInternal(布尔连接失败,Socket s4,Socket s6,Socket& 套接字,IPAddress& 地址,ConnectSocketState 状态, IAsyncResult asyncResult, Int32 timeout, Exception&异常)
【问题讨论】:
您应该发布异常消息,以便这里的人可以帮助您 【参考方案1】:您可能会创建过多的连接,从而耗尽所有可以使用的本地端口。
关闭端口后可以重用端口有一个超时期限。
WebRequest
为您隐藏了所有低级套接字处理,但我猜它最终会耗尽端口,或者尝试(重新)绑定到已经处于 TIME_WAIT 状态的套接字。
您应该确保阅读响应流,即使您不关心响应。这应该有助于不产生过多的延迟连接。
WebResponse response = request.GetResponse();
new StreamReader(response.GetResponseStream()).ReadToEnd();
我会从here粘贴一些相关信息:
当连接关闭时,在关闭连接的一侧,5 元组 Protocol, Local IP, Local Port, Remote IP, Remote Port 默认进入 TIME_WAIT 状态 240 秒。 在这种情况下,协议是固定的——TCP 本地 IP、远程 IP 和远程端口通常也是固定的。所以变量是本地端口。 发生的情况是,当您不绑定时,将使用 1024-5000 范围内的端口。 所以大致你有4000个端口。如果您在 4 分钟内使用所有这些 - 大致意思是您 每秒进行 16 次 Web 服务调用,持续 4 分钟,您将耗尽所有端口。这就是这个异常的原因。
好的,现在如何解决这个问题?
其中一种方法是增加动态端口范围。默认最大值为 5000。您可以将其设置为 65534。
HKLM\System\CurrentControlSet\Services\Tcpip\Parameters\MaxUserPort
是使用的关键。
您可以做的第二件事是,一旦连接确实进入 TIME_WAIT 状态,您可以减少它的时间
在该状态下,默认为 4 分钟,但您可以将其设置为 30 秒
HKLM\System\CurrentControlSet\Services\Tcpip\Parameters\TCPTimedWaitDelay
是使用的关键。
将此设置为 30 秒
【讨论】:
【参考方案2】:您没有关闭 Web 请求,这可能会导致连接打开的时间过长。这听起来像是 Parallel.Net 的 Parallel.Foreach 的完美工作,只需确保指明您希望它在多少个线程上运行
ParallelOptions parallelOptions = new ParallelOptions();
parallelOptions.MaxDegreeOfParallelism = 10;
Parallel.ForEach(folderPathList, parallelOptions, folderPathList =>
using(WebRequest request = WebRequest.Create(folderPath))
request.Credentials = DefaultCredentials;
request.Method = "MKCOL";
GetResponse request = WebRequest.Create(folderPath);
request.Credentials = DefaultCredentials;
request.Method = "MKCOL";
using (WebResponse response = request.GetResponse());
);
另外要记住的是 maxConnections,一定要在你的 app.config 中设置它:
<configuration>
<system.net>
<connectionManagement>
<add address = "*" maxconnection = "100" />
</connectionManagement>
</system.net>
</configuration>
当然,在现实世界的场景中,您必须添加 try-catch 并重试连接,这可能会超时导致更复杂的代码
【讨论】:
啊,这确实是一个很好的解决方案,但不幸的是我使用的是 .NET 3.5 框架。 太糟糕了,WebRequest 不是一次性的 我很困惑,因为我正在关闭请求,但是在 App.config 中设置maxconnection
为我做到了。谢谢!【参考方案3】:
对于这种 IO 密集型任务,asynchronous programming model 非常有用。但是,在 C# 中使用有点困难。C# 现在也有语言级别的异步支持,你可以试试CTP release。
【讨论】:
【参考方案4】:试试这个
folderPathList.ToList().ForEach(p =>
ThreadPool.QueueUserWorkItem((o) =>
WebRequest request = WebRequest.Create(p);
request.Credentials = DefaultCredentials;
request.Method = "MKCOL";
WebResponse response = request.GetResponse();
response.Close();
);
编辑 - 不同的 webrequest 方法
folderPathList.ToList().ForEach(p =>
ThreadPool.QueueUserWorkItem((o) =>
using (WebClient client = new WebClient())
client.Credentials = DefaultCredentials;
client.UploadString(p, "MKCOL", "");
);
);
【讨论】:
比我当前的解决方案更简洁的代码方式,但这确实会引发相同的错误。 不应该有并发问题,所以我修改了网络请求方法,看看它是否更适合你以上是关于C# 多线程 大量数据实时接收\解析\存储 问题的主要内容,如果未能解决你的问题,请参考以下文章