用于向 Web 服务发出 http post 请求的多线程

Posted

技术标签:

【中文标题】用于向 Web 服务发出 http post 请求的多线程【英文标题】:Multithreading for making http post requests to web service 【发布时间】:2015-08-25 07:59:06 【问题描述】:

我想在 C# 中向 Web 服务发送多个 HTTP 发布请求。例如,如果 n=3,则应发出来自 3 个 xml 文件的 HTTP 发布请求,并且响应应写入文件中。第一次发出 3 个请求,然后将发出接下来的 3 个请求。 所以我做了下面的代码,但我一开始得到的是随机输出。但是现在我在内部 for 循环中出现超出索引范围异常或内部服务器错误(500)。请建议适当的更改。我正在使用 .NET4.0

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;
using System.Xml;
using System.Net;
using System.Threading.Tasks;
namespace ConsoleApplication5

class Program

    static void Main(string[] args)
    
        int n = 0;
        Console.WriteLine("Enter the number");
        string s = Console.ReadLine();
        int.TryParse(s, out n);
        string path = "C:\\";
        string[] files = null;
        files = Directory.GetFiles(path, "*.xml", SearchOption.TopDirectoryOnly);


        List<Task> tasks = new List<Task>(files.Length);

        for (int i = 0; i < files.Length; i += n)
        
            for (int j = 0; j < n; j++)
            
                int x = i + j;

                if (x < files.Length && files[x] != null)
                
                    Task t = new Task(() => function(files[x]));
                    t.Start();
                    tasks.Add(t);
                
            

            if (tasks.Count > 0)
            
                Task.WaitAll(tasks.ToArray(), Timeout.Infinite); // or less than infinite
                tasks.Clear();
            
        
    
    public static void function(string temp)
    
        XmlDocument doc = new XmlDocument();
        doc.Load(temp);
        HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://10.76.22.135/wpaADws/ADService.asmx");

        request.ContentType = "text/xml;charset=\"utf-8\"";
        request.Accept = "text/xml";
        request.Method = "POST";
        Stream stream = request.GetRequestStream();
        doc.Save(stream);
        stream.Close();
        HttpWebResponse response = (HttpWebResponse)request.GetResponse();
        using (StreamReader rd = new StreamReader(response.GetResponseStream()))
        
            string soapResult = rd.ReadToEnd();
            doc.LoadXml(soapResult);
            File.WriteAllText(temp, doc.DocumentElement.InnerText);

            //XmlTextWriter xml=new XmlTextWriter(
            Console.WriteLine(soapResult);
            Console.ReadKey();
        

    


【问题讨论】:

【参考方案1】:

此代码有效。 说明:

首先,用户提供 .xml 文件的源路径和目标路径。

Directory.getFiles() 帮助我们获取字符串数组中的 .xml 文件。 (我们必须将 .xml 作为参数传递)。

所以现在基本上发生的是对于我们在源 pat 处获得的每个文件,都会创建一个线程。

但是假设用户想一次发送“n”个请求,那么一次会创建 n 个线程。 除非前面的线程完成执行,否则不会创建下一组线程。 thread.Join() 确保了这一点。

在向 Web 服务发出请求后,我们通过 getResponse() 获得响应,并且响应写入存储在目标路径的 .xml 文件中。

 using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Text;
 using System.IO;
 using System.Threading;
 using System.Xml;
 using System.Net;
 namespace ConsoleApplication4
 
     class Program
     
      int flag = 1;
      string destination;
      string source;
      static void Main(string[] args)
    
    Console.ForegroundColor = ConsoleColor.Red;

    Console.WriteLine("**************************** Send HTTP Post Requests **************************");
    int n = 0;
    Program p = new Program();
    Console.WriteLine("Enter the number of requests you want to send at a time");
    string s = Console.ReadLine();
    int.TryParse(s, out n);
    Console.WriteLine("Enter Source");
    p.source = Console.ReadLine();
    Console.WriteLine("Enter Destination");
    p.destination = Console.ReadLine();

    string[] files = null;
    files = Directory.GetFiles(p.source, "*.xml", SearchOption.TopDirectoryOnly);

    Thread[] thread = new Thread[files.Length];

    int len = files.Length;
    for (int i = 0; i<len; i+=n)
    
        int x = i;
        //Thread.Sleep(5000);
        for (int j = 0; j < n && x < len; j++)
        

            var localx = x;
            thread[x] = new Thread(() => function(files[localx], p));
            thread[x].Start();
            Thread.Sleep(50);
            //thread[x].Join();
            x++;
        
        int y = x - n;
        for (; y < x; y++)
        
            int t = y;
            thread[t].Join();

        

    

    // thread[0] = new Thread(() => function(files[0]));
    //thread[0].Start();
    Console.ReadKey();


public static void function(string temp,Program p)


    XmlDocument doc = new XmlDocument();
    doc.Load(temp);

    string final_d=p.destination + "response " + p.flag + ".xml";
    p.flag++;
    HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://10.76.22.135/wpaADws/ADService.asmx");
    request.ContentType = "text/xml;charset=\"utf-8\"";
    request.Accept = "text/xml";
    request.Method = "POST";
    Stream stream = request.GetRequestStream();
    doc.Save(stream);
    stream.Close();

    HttpWebResponse response = (HttpWebResponse)request.GetResponse();
    using (StreamReader rd = new StreamReader(response.GetResponseStream()))
    
        string soapResult = rd.ReadToEnd();
        doc.LoadXml(soapResult);
        File.WriteAllText(final_d, doc.DocumentElement.InnerText);

        //XmlTextWriter xml=new XmlTextWriter(
        Console.WriteLine(soapResult);
        //Console.ReadKey();
    

【讨论】:

【参考方案2】:

您在原始帖子中遇到的IndexOutOfRangeException 是由于您处理的最后一批文件的索引处理不当。最后一批可能不完整,您将其视为常规批次的集合大小

(您的帖子中有 n=3)

由于您要迁移到 TPL 和 Tasks,我建议您使用 Parallel Programming with Microsoft .NET 和 pipeline pattern,这似乎非常适合您的场景。您可以将并发集合和生产者/消费者模式与管道一起利用,如下所示。 BlockingCollection 确保同时添加项目,BlockingCollection.GetConsumingEnumerable 调用会为您的集合生成一个消耗性阻塞枚举器。

const int BUFFER_SIZE = 3; // no concurrent items to process
const string XML_FOLDER_PATH = "<whatever>";


public static void Pipeline()

  var bufferXmlFileNames = new BlockingCollection<string>(BUFFER_SIZE);
  var bufferInputXmlDocuments = new BlockingCollection<XmlDocument>(BUFFER_SIZE);
  var bufferWebRequests = new BlockingCollection<HttpWebRequest>(BUFFER_SIZE);
  var bufferSoapResults = new BlockingCollection<string>(BUFFER_SIZE);

  var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);

  // Stage 1: get xml file paths
  var stage1 = f.StartNew(() => 
  try
  
    foreach (var phrase in Directory.GetFiles(XML_FOLDER_PATH, "*.xml", SearchOption.TopDirectoryOnly))
     // build concurrent collection
      bufferXmlFileNames.Add(phrase);
    
  
  finally
   // no more additions acceptedin
    bufferXmlFileNames.CompleteAdding();
  
);

  // Stage 2: ProduceInputXmlDocuments(bufferXmlFileNames, bufferInputXmlDocuments)
  var stage2 = f.StartNew(() =>  
  try
  
    foreach (var xmlFileName in bufferXmlFileNames.GetConsumingEnumerable())
    
      XmlDocument doc = new XmlDocument();
      doc.Load(xmlFileName);
      bufferInputXmlDocuments.Add(doc);          
    
  
  finally
  
    bufferInputXmlDocuments.CompleteAdding();
  
);

  // Stage 3:  PostRequests(BlockingCollection<XmlDocument> xmlDocs, BlockingCollection<HttpWebRequest> posts)
  var stage3 = f.StartNew(() =>  
  try
  
    foreach (var xmlDoc in bufferInputXmlDocuments.GetConsumingEnumerable())
    
      HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://10.76.22.135/wpaADws/ADService.asmx");
      request.ContentType = "text/xml;charset=\"utf-8\"";
      request.Accept = "text/xml";
      request.Method = "POST";
      //
      Stream stream = request.GetRequestStream();
      xmlDoc.Save(stream);
      stream.Close();
      //
      bufferWebRequests.Add(request);
    
  
  finally
  
    bufferWebRequests.CompleteAdding();
  
);

  // Stage 4: ProcessResponses(bufferWebRequests, bufferSoapResults)
  var stage4 = f.StartNew(() =>
  
    try
    
      foreach (var postRequest in bufferWebRequests.GetConsumingEnumerable())
      
        HttpWebResponse response = (HttpWebResponse)postRequest.GetResponse();
        using (StreamReader rd = new StreamReader(response.GetResponseStream()))
        
          string soapResult = rd.ReadToEnd();
          bufferSoapResults.Add(soapResult);
        
      
    
    finally
    
      bufferSoapResults.CompleteAdding();
    
  );

  // stage 5: update UI
  var stage5 = f.StartNew(() =>
  
    foreach (var soapResult in bufferSoapResults.GetConsumingEnumerable())
    
      Console.WriteLine(soapResult);
    
  );

  // display blocking collection load state, 
  // the number of elements in each blocking collection of the pipeline stages
  // you can supress this call completely, because it is informational only
  var stageDisplay = f.StartNew(
    () =>
    
      while (true)
      
        Console.WriteLine("0,10 1,10 2,10 3,10", bufferXmlFileNames.Count, bufferInputXmlDocuments.Count, bufferWebRequests.Count, bufferSoapResults.Count);
        //check last stage completion
        if (stage5.IsCompleted)
          return;
      
    
      );
  Task.WaitAll(stage1, stage2, stage3, stage4, stage5); //or
  //Task.WaitAll(stage1, stage2, stage3, stage4, stage5, stageDisplay);

【讨论】:

你能告诉我应该在你的代码中放置的所有命名空间吗?因为三个对我来说是许多新概念。 好的,我得到了命名空间......你能解释一下输出......我没明白。 prntscr.com/7fbpnaprntscr.com/7fbp4f这里是截图。【参考方案3】:

如何使用这样的任务:

    List<Task> tasks = new List<Task>(n);

    for (int i = 0; i < files.Length; i += n)
    
        for (int j = 0; j < n; j++)
        
            int x = i + j;

            if (x < files.Length && files[x] != null)
            
                Task t = new Task(() => function(files[x]));
                t.Start();
                tasks.Add(t);
            
        

        if (tasks.Count > 0)
        
            Task.WaitAll(tasks.ToArray(), Timeout.Infinite); // or less than infinite
            tasks.Clear();
        
    

我试图在索引上更加整洁...

另外,请注意,由于 C# 如何为 lambda 捕获变量,因此内部循环中的 int x = i + j; 很重要。

如果问题在于追踪索引算法,是否可以使用具有有意义名称的索引变量?

    List<Task> tasks = new List<Task>(taskCount);

    for (int filesIdx = 0; filesIdx < files.Length; filesIdx += taskCount)
    
        for (int tasksIdx = 0; tasksIdx < taskCount; tasksIdx++)
        
            int index = filesIdx + tasksIdx;

            if (index < files.Length && files[index] != null)
            
                Task task = new Task(() => function(files[index]));
                task.Start();
                tasks.Add(task);
            
        

        if (tasks.Count > 0)
        
            Task.WaitAll(tasks.ToArray(), Timeout.Infinite); // or less than infinite
            tasks.Clear();
        
    

【讨论】:

错误 1 ​​'System.Threading.Tasks.Task' 不包含 'Run' 的定义我收到此错误。延迟也一样。 哦,我看到您使用的是 dotnet 4... 抱歉。我会在一分钟内编辑。 我编辑了代码以支持 dotnet 4(我最初是为 dotnet 4.5 编写的)。 我在Task.WaitAll(tasks, Timeout.Infinite)语句中得到“任务数组至少包含一个空元素。参数名称:任务” 这肯定需要测试并且可以进一步改进,但是这个版本怎么样?

以上是关于用于向 Web 服务发出 http post 请求的多线程的主要内容,如果未能解决你的问题,请参考以下文章

HTTP请求方法

浏览器向 django 发出 CORS 请求的 OPTIONS 请求,但没有 POST

通过 Expo 向 localhost 发出 HTTP 请求

复习的内容

Flutter dio不适用于flutter web中的post请求

HTTP——概述请求和响应GET和POST请求