如何在Delphi中使用Pipeline模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在Delphi中使用Pipeline模式相关的知识,希望对你有一定的参考价值。

我正在尝试在我的测试项目(How to make a Mutlithreded idhttp calls to do work on a StringList)中实现Pipeline模式,但我正在努力使TThread代码适应Pipeline模式代码。关于如何使用它的资源不多。

我在下面尽力了,请不要downvote,我知道我的代码很乱,但如果需要我会编辑我的问题。

type
  TForm2 = class(TForm)
    ...
  private
    procedure Retriever(const input: TOmniValue; var output: TOmniValue);
    procedure Inserter(const input, output: IOmniBlockingCollection);
    function HttpGet(url: string; var page: string): boolean;
  end;

procedure TForm2.startButton1Click(Sender: TObject);
var
  pipeline: IOmniPipeline;
  i       : Integer;
  v       : TOmniValue;
  s       : string;
  urlList : TStringList;
begin
  pipeline := Parallel.Pipeline;
  pipeline.Stage(Retriever);
  pipeline.Stage(Inserter).NumTasks(10);
  pipeline.Run;
  for s in urlList do
    pipeline.Input.Add(s);
  pipeline.Input.CompleteAdding;
  // wait for pipeline to complete
  pipeline.WaitFor(INFINITE);
end;

function TForm2.HttpGet(url: string; var page: string): boolean;
var
  lHTTP: TIdHTTP;
  i : integer;
  X : Tstrings;
  S,M,fPath : String;
begin
  lHTTP := TIdHTTP.Create(nil);
  X := TStringList.Create;
  try
    X.Text := lHTTP.Get('https://instagram.com/'+fPath);
    S:= ExtractDelimitedString(X.Text);
    X.Clear;
    Memo2.Lines.Add(fPath+ ' :     '+ M ); //how to pass the result to Inserter
  finally
    lHttp.Free;
  end;
end;

procedure TForm2.Inserter(const input, output: IOmniBlockingCollection);
var
  result   : TOmniValue;
  lpage     : string;
begin
  for result in input do begin
    Memo2.Lines.Add(lpage);
    FreeAndNil(lpage);
  end;
  // correect?
end;

procedure TForm2.Retriever(const input: TOmniValue; var output: TOmniValue);
var
  pageContents: string;
begin
  if HttpGet(input.AsString, pageContents) then
    output := //???
end;
答案

首先 - 描述您的具体问题。没有人可以站在你的背后,看着你的电脑,看看你在做什么。 http://www.catb.org/esr/faqs/smart-questions.html#beprecise

你确实暗示你的程序行为不端。但是你没有描述如何以及为什么。我们不知道。

作为一般性评论,您过度使用了管道。

  1. 您传递给OTL的所有工作程序 - 在您的情况下,这些是InserterRetriever在随机线程中工作。这意味着如果没有synchronizing,它们都不应该触摸GUI - VCL不是多线程的。使用TThread.Synchronize也是一个糟糕的选择,正如我在链接问题中向您解释的那样。它使程序变慢,并使表单不可读。要更新表单,请使用固定帧率的轮询。不要从OTL工作者内部更新您的表单。

换句话说,Inserter不是你需要的。这里你需要的只是它的输入集合,一个下载程序和输出集合。是的,管道复杂的事情是非常简单的任务,这就是为什么我之前提到了另外两个更简单的模式。

您需要在表单上使用TTimer,以固定帧速率每秒2-3次轮询输出集合,并检查集合是否尚未最终确定(如果是 - 管道已停止)并且应该从主线程更新GUI 。

  1. 您不应该等待管道在主VCL线程内完成。相反,您应该拆下pipeleine并让它完全在后台运行。将对已创建管道的引用保存到Form的成员变量中,以便您可以从TTimer事件访问其Output集合,并且还可以在其进程运行后释放管道。

您应该将该变量保持链接到管道对象,直到下载完成并在此之后设置为nil(释放对象),而不是之前。您了解Delphi中的接口和引用计数,对吧?

对于其他OTL模式,如parallel-FOR读取OTL文档关于他们的.NoWait()调用。

  1. 您应该将此表单设为双模式,以便在下载运行时和不运行时具有不同的启用控件集。我通常使用特殊的布尔属性来做,就像我在你链接的主题中向你展示的那样。管道正在进行时,您的用户不应该更改列表和设置(除非您实现实时任务更改,但您还没有)。当切换从工作模式进入空闲模式时,该模式切换器也是释放完成的管道对象的好地方。
  2. 如果你想玩管道工作者链接,那么你可以放入输入集合而不是URL字符串本身,但是那些数组 - Memo1.Lines.ToArray(),那么你可以从Unpacker阶段开始,从输入集合中获取字符串数组(实际上只有一个)并枚举它并将字符串放入舞台输出集合中。然而这几乎没有什么实际价值,它甚至会使你的程序减慢一点,因为Memo1.Lines.ToArray()函数仍然可以在主VCL线程中工作。但只是为了试验管道,这可能很有趣。

草案就这样了,

 TfrmMain = class(TForm)
  private
    var pipeline: IOmniPipeline;

    property inProcess: Boolean read ... write SetInProcess;
...
  end.

procedure Retriever(const input: TOmniValue; var output: TOmniValue);
var
  pageContents, URL: string;
  lHTTP: TIdHTTP;
begin
  URL := input.AsString;

  lHTTP := TIdHTTP.Create(nil);
  try
    lHTTP.ReadTimeout := 30000;
    lHTTP.HandleRedirects := True;

    pageContents := ExtractDelimitedString( lHTTP.Get('https://instagram.com/' + URL) );

    if pageContents > '' then
       Output := pageContents;
  finally
    lHTTP.Destroy;
  end;
end;

procedure TfrmMain.FormCloseQuery(Sender: TObject; var CanClose: Boolean);
begin
  if InProgress then begin
     CanClose := False;
     ShowMessage( 'You cannot close this window now.'^M^J+
                  'Wait for downloads to complete first.' ); 
  end;
end;

procedure TfrmMain.SetInProcess(const Value: Boolean);
begin
  if Value = InProcess then exit; // form already is in this mode

  FInProcess := Value;

  memo1.ReadOnly := Value;
  StartButton.Enabled := not Value;
  if Value then 
     Memo2.Lines.Clear;

  Timer1.Delay := 500; // twice per second
  Timer1.Enabled := Value;

  If not Value then  // for future optimisation - make immediate mode change 
     FlushData;      // when last worker thread quits, no waiting for timer event

  If not Value then
     pipeline := nil; // free the pipeline object

  If not Value then
     ShowMessage('Work complete');
end;

procedure TfrmMain.Timer1Timer(const Sender: TObject);
begin
  If not InProcess then exit;

  FlushData;

  if Pipeline.Output.IsFinalized then
     InProcess := False;
end;

procedure TForm2.startButton1Click(Sender: TObject);
var
  s       : string;
  urlList : TStringList;
begin
  urlList := Memo1.Lines;

  pipeline := Parallel.Pipeline;

  pipeline.Stage(Retriever).NumTasks(10).Run;

  InProcess := True; // Lock the input data GUI - user no more can edit it
  for s in urlList do
    pipeline.Input.Add(s);
  pipeline.Input.CompleteAdding;
end;

procedure TfrmMain.FlushData;
var v: TOmniValue;
begin
  if pipeline = nil then exit;
  if pipeline.Output = nil then exit;
  if pipeline.Output.IsFinalized then
  begin
    InProcess := False;  
    exit;
  end;

  Memo2.Lines.BeginUpdate;
  try
    while pipeline.Output.TryTake(v) do
      Memo2.Lines.Add( v.AsString );
  finally
    Memo2.Lines.EndUpdate;
  end;

  // optionally - scroll output memo2 to the last line 
end;

注意一些细节,考虑它们并理解它们的本质:

  1. 只有FlushData正在更新输出备忘录。从TTimer事件或表单模式属性设置器调用FlushData。它们都只是从主VCL线程中调用过来的。因此,FlushData永远不会被称为形式背景线程。
  2. Retriever是一个免费的独立函数,它不是表单的成员,它对表单一无所知,也没有对表单实例的引用。这样你就可以实现这两个目标:避免“紧耦合”并避免错误地从后台线程访问表单控件,这在VCL中是不允许的。 Retriever函数在后台线程中工作,它们会加载数据,它们会存储数据,但它们从不接触GUI。这就是主意。

经验法则 - 表单的所有方法仅从主VCL线程调用。所有管道阶段子例程 - 后台线程的主体 - 都被声明并在任何VCL表单之外工作,并且不能访问任何这些子程序。这些领域之间不应该有任何混合。

  1. 您将GUI更新限制为固定刷新率。而且这个比率应该不会太频繁。 Windows GUI和用户眼睛应该有时间赶上。
  2. 您的表单以两种明确描述的模式运行 - InProcessnot InProcess。在这些模式中,用户可以使用不同的功能和控件集。它还管理模式到模式的转换,例如清除输出备忘录文本,警告用户状态更改,释放使用过的线程的内存 - 管理对象(此处为:管道)等。因此,此属性仅更改(调用setter)从主要的VCL线程,从不从后台工作者。而#2也有助于此。
  3. 可能的未来增强功能是使用pipeline.OnStop事件向表单发出带有自定义Windows消息的PostMessage,因此它会在工作完成后立即切换模式,而不是等待下一个计时器olling事件。这可能是管道知道表单任何内容并且有任何引用的唯一地方。但这打开了Windows消息传递,HWND娱乐和其他微妙的事情,我不想放在这里。

以上是关于如何在Delphi中使用Pipeline模式的主要内容,如果未能解决你的问题,请参考以下文章

RedisCluster如何实现Pipeline批量模式?

Azure Pipeline 错误:找不到与模板文件模式匹配的任何文件

在覆盖模式下如何更改 Delphi 的光标形状?

delphi程序运行时别的功能无法使用

让delphi2010操作界面回到delphi7模式

UVA 690 Pipeline Scheduling