创建异步任务队列 - 使用 BlockingCollection C#

Posted

技术标签:

【中文标题】创建异步任务队列 - 使用 BlockingCollection C#【英文标题】:Create a queue of async tasks - using BlockingCollection C# 【发布时间】:2019-02-07 17:50:48 【问题描述】:

我有一个 WPF C# 应用程序,它使用 API 向 Facebook 和 Twitter 发送消息。为此,我有一个主要的静态类,我可以在其中发送字符串消息和一些参数。当用户只间歇性地发送通知时,代码可以正常工作。但是当用户一次发送多个公告时,它就不起作用了。我想要的是:

按照先进先出的顺序一次发布一个公告 不要破坏 GUI 允许对请求进行排队。 允许在程序中的任何位置和任何时间提出请求

我已经研究过使用 BlockingCollection,但没有太多运气了解如何使其工作。

这是我当前的代码,我希望它尽可能接近:

class PublishAnnouncement 
 //This function is callled upon in many parts of the program and acts as a general publisher
 public static void PostAnnoucment(string message, string TwAccountKey, string FbAccountKey, string[] JourneyRefID, double latness, MainWindow mainWindow) 
  //First it is published to facebook
  BackgroundWorker FacebookWorker = new BackgroundWorker();
  FacebookWorker.DoWork += (obj, e) => FacebookDoWork(message, FbAccountKey);
  FacebookWorker.RunWorkerAsync();

  //Then it is published to twitter - This is where it appears to fail
  BackgroundWorker TwitterWorker = new BackgroundWorker();
  TwitterWorker.DoWork += (obj, e) => TwitterDoWork(message, TwAccountKey, JourneyRefID, latness, mainWindow, 0);
  TwitterWorker.RunWorkerAsync();
 

 private static void FacebookDoWork(string message, string FbAccountKey) 
  //STAGE 1 - Facebook
  //First the program will attempt to post a Facebook post.
  try 
   //If it is to be posted by one of the additional Facebook Pages and 
   //not by the default page.

   var client = new RestClient("https://graph.facebook.com/v3.0/");

   var request = new RestRequest("pageId/feed", Method.POST);
   request.AddParameter("message", message); // adds to POST or URL querystring based on Method
   request.AddParameter("access_token", Properties.Settings.Default.FBPageAccessToken);
   request.AddUrlSegment("pageId", Properties.Settings.Default.FBPageID); // replaces matching token in request.Resource
   IRestResponse response = client.Execute(request);

   if (response.IsSuccessful == false) 
    Console.WriteLine(response.Content);
    Console.WriteLine("");
   

   catch (Exception ex) 
   Console.WriteLine(ex.ToString());
  
 

 private static void TwitterDoWork(string message, string TwAccountKey, string[] JourneyRefID, double latness, MainWindow mainWindow, int Attempts) 
  //STAGE 2 - Twitter
  //Once a Facebook post has/has not been posted the program will attempt to send a tweet.
  try 

   Auth.SetUserCredentials(Properties.Settings.Default.TwConsumerKey, Properties.Settings.Default.TwConsumerSecret, Properties.Settings.Default.TwUserAccessToken, Properties.Settings.Default.TwUserAccessSecret);
   var tweet = Tweet.PublishTweet(message);
   foreach(var ID in JourneyRefID)
        AddTweetID(tweet.Id, ID, latness, mainWindow);


   catch (Exception ex) 
   foreach(var ID in JourneyRefID)
        AddTweetID(0, ID, latness, mainWindow);

   Console.WriteLine(ex.Message);
  
 

【问题讨论】:

【参考方案1】:

我建议在 PostAnnouncement 中使用 Mutex。 请参阅此处接受的答案 - 使用锁定示例: Usage of Mutex in c#

class PublishAnnouncement 
private static readonly object syncLock = new object();

public static void PostAnnoucment(string message, string TwAccountKey, string FbAccountKey, string[] JourneyRefID, double latness, MainWindow mainWindow) 
    lock(syncLock) 
        //First it is published to facebook
        BackgroundWorker FacebookWorker = new BackgroundWorker();
        FacebookWorker.DoWork += (obj, e) => FacebookDoWork(message, FbAccountKey);
        FacebookWorker.RunWorkerAsync();

        //Then it is published to twitter - This is where it appears to fail
        BackgroundWorker TwitterWorker = new BackgroundWorker();
        TwitterWorker.DoWork += (obj, e) => TwitterDoWork(message, TwAccountKey, JourneyRefID, latness, mainWindow, 0);
        TwitterWorker.RunWorkerAsync();

        //etc
    

锁一次只允许一个线程通过,将所有线程堆叠起来。请注意,我已将锁设为静态。 (无论有多少类实例在任何时候都在使用,只有一个锁对象被所有调用线程创建和引用)。

如果你想把锁移到后台线程中:

class   PublishAnnouncement 
private static  readonly    object  syncLockForTwitter  =   new object();
private static  readonly    object  syncLockForFacebook =   new object();
//This  function    is  callled upon    in  many    parts   of  the program and acts    as  a   general publisher
public  static  void    PostAnnoucment(string   message,    string  TwAccountKey,   string  FbAccountKey,   string[]    JourneyRefID,   double  latness,    MainWindow  mainWindow) 
    //First it  is  published   to  facebook
    BackgroundWorker    FacebookWorker  =   new BackgroundWorker();
    FacebookWorker.DoWork   +=  (obj,   e)  =>  FacebookDoWork(message, FbAccountKey);
    FacebookWorker.RunWorkerAsync();

    //Then  it  is  published   to  twitter -   This    is  where   it  appears to  fail
    BackgroundWorker    TwitterWorker   =   new BackgroundWorker();
    TwitterWorker.DoWork    +=  (obj,   e)  =>  TwitterDoWork(message,  TwAccountKey,   JourneyRefID,   latness,    mainWindow, 0);
    TwitterWorker.RunWorkerAsync();


private static  void    FacebookDoWork(string   message,    string  FbAccountKey)   
    lock(syncLockForFacebook)   
    //STAGE 1   -   Facebook
    //First the program will    attempt to  post    a   Facebook    post.
        try 
            //If    it  is  to  be  posted  by  one of  the additional  Facebook    Pages   and 
            //not   by  the default page.

            var client  =   new RestClient("https://graph.facebook.com/v3.0/");

            var request =   new RestRequest("pageId/feed",    Method.POST);
            request.AddParameter("message", message);   //  adds    to  POST    or  URL querystring based   on  Method
            request.AddParameter("access_token",    Properties.Settings.Default.FBPageAccessToken);
            request.AddUrlSegment("pageId", Properties.Settings.Default.FBPageID);  //  replaces    matching    token   in  request.Resource
            IRestResponse   response    =   client.Execute(request);

            if  (response.IsSuccessful  ==  false)  
                Console.WriteLine(response.Content);
                Console.WriteLine("");
            

           catch   (Exception  ex) 
            Console.WriteLine(ex.ToString());
        
    


private static  void    TwitterDoWork(string    message,    string  TwAccountKey,   string[]    JourneyRefID,   double  latness,    MainWindow  mainWindow, int Attempts)   
    lock(syncLockForTwitter)    
    //STAGE 2   -   Twitter
    //Once  a   Facebook    post    has/has not been    posted  the program will    attempt to  send    a   tweet.
        try 

            Auth.SetUserCredentials(Properties.Settings.Default.TwConsumerKey,  Properties.Settings.Default.TwConsumerSecret,   Properties.Settings.Default.TwUserAccessToken,  Properties.Settings.Default.TwUserAccessSecret);
            var tweet   =   Tweet.PublishTweet(message);
            foreach(var ID  in  JourneyRefID)
                                AddTweetID(tweet.Id,    ID, latness,    mainWindow);


           catch   (Exception  ex) 
            foreach(var ID  in  JourneyRefID)
                                AddTweetID(0,   ID, latness,    mainWindow);

            Console.WriteLine(ex.Message);
        
    

【讨论】:

您好,非常感谢您的回答,我已经实现了,但是,我应该删除我的后台工作人员并让他们成为任务吗?正如我现在担心的那样,在后台工作人员完成之前不会开始尝试发送新消息吗?如果是这样,我将如何将它们从后台工作人员更改为任务并使用等待而不冻结 GUI。抱歉,我对线程很陌生。 你可以做任何你想做的事。一种方法是将静态锁移到后台工作人员中。

以上是关于创建异步任务队列 - 使用 BlockingCollection C#的主要内容,如果未能解决你的问题,请参考以下文章

如何使用hhxsv5/laravel-s的异步任务队列

如何使用hhxsv5/laravel-s的异步任务队列

GCD使用 串行并行队列 与 同步异步执行的各种组合 及要点分析

Django使用Celery异步任务队列

异步任务队列Celery在Django中的使用

Celery异步任务队列/周期任务+ RabbitMQ + Django