Amphp之事件循环API

Posted Flybeta

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Amphp之事件循环API相关的知识,希望对你有一定的参考价值。

目录

Amphp之事件循环API

本文档描述了 Amp\\Loop访问器。您可能还想阅读源文件中包含的文档,它有大量文档并且不包含太多令人分心的代码。

run()

应用程序与事件循环交互的主要方式是安排事件执行,然后简单地让程序运行。

一旦 Loop::run()被调用,事件循环将无限期地运行,直到没有可观察的计时器事件IO 流信号可供观察。长时间运行的程序通常完全在单个 Loop::run()调用的范围内执行。

Loop::run()接受一个可选的回调作为第一个参数。传递这样的回调相当于在之后调用 Loop::defer($callback)Loop::run()

stop()

事件循环可以在运行时随时停止。当 Loop::stop()被调用时,事件循环将在事件循环的当前滴答结束时将控制权返回给用户空间脚本。即使事件或可观察的 IO 流仍处于未决状态,此方法也可用于从事件循环中获得控制权。

now()

以毫秒为单位返回当前的“循环时间”。此方法返回的值不一定与挂钟时间相关,而是用于与此方法返回的先前值进行相对比较(例如:间隔计算、到期时间等)。此方法返回的值仅在每个循环滴答时更新一次。

计时器观察者(Timer Watchers)

Amp 公开了几种安排计时器观察者的方法。让我们看一下每个函数的一些细节。

defer() //主动推迟

  • 安排回调在事件循环的下一次迭代中执行
  • 此方法保证干净的调用堆栈,以避免循环的当前迭代中其他事件的饥饿。延迟回调总是在事件循环的下一个滴答声中执行。
  • defer计时器观察者执行后,它会被事件循环自动收集为垃圾,因此应用程序无需手动取消关联的观察者。
  • 像所有观察者一样,defer计时器可能会被禁用和重新启用。如果您在安排它的时间和它实际运行的时间之间禁用此观察程序,则事件循环将无法垃圾收集它,直到它执行。因此,如果defer观察者从未真正执行以释放任何相关资源,则您必须自己手动取消它。

示例

<?php // using Loop::defer()

use Amp\\Loop;

Loop::run(function () 
    echo "line 1\\n";
    Loop::defer(function () 
        echo "line 3\\n";
    );
    echo "line 2\\n";
);

Callback Signature

function (string $watcherId, mixed $cbData = null)

delay() //被动延迟

  • 安排回调在延迟 n 毫秒后执行
  • “延迟”观察者在执行后也会被反应堆自动收集垃圾,应用程序不应手动取消它,除非他们希望在执行之前完全丢弃观察者。
  • 禁用的“延迟”观察程序将其延迟时间重置,以便在重新启用后原始延迟时间再次从零开始。
  • 与 defer watchers 一样,如果由于在创建后被应用程序禁用而从未运行,则必须手动取消调度为一次性执行的计时器以释放资源。

示例

<?php // using delay()

use Amp\\Loop;

Loop::run(function () 
    // event loop will stop in three seconds
    Loop::delay($msDelay = 3000, "Amp\\\\Loop::stop");
);

Callback Signature

function (string $watcherId, mixed $cbData = null)

repeat()

  • 安排回调以每 n 毫秒重复执行一次。
  • 像所有其他观察者一样,重复计时器可以随时禁用/重新启用。
  • defer()delay()观察者不同,必须显式取消 repeat() 计时器以释放相关资源。一旦达到目的,未能通过 cancel()释放repeat观察者将导致应用程序中的内存泄漏。仅仅禁用repeat观察者是不够的,因为他们的数据只有在取消时才会被释放。

示例

<?php // using repeat()

use Amp\\Loop;

Loop::run(function () 
    Loop::repeat($msInterval = 100, function ($watcherId) 
        static $i = 0;
        if ($i++ < 3) 
            echo "tick\\n";
         else 
            Loop::cancel($watcherId);
        
    );
);

Callback Signature

function (string $watcherId, mixed $cbData = null)

流 IO 观察者(Stream IO Watchers)

流观察者是我们知道何时可以读取和写入套接字和其他流的方式。这些事件是我们能够使用事件循环实际创建诸如 HTTP 服务器和异步数据库库之类的东西的方式。因此,流 IO 观察器构成了任何有用的非阻塞 Amp 应用程序的主干。

IO 观察者有两种类型:

  • 可读性观察者(Readability watchers)
  • 可写性观察者(Writability watchers)

onReadable()

这是一个高级的低级 API。大多数用户应该改用 amphp/byte-stream

通过 Loop::onReadable()注册的观察者会在以下情况下触发它们的回调:

  • 当数据可以在被观察的流上读取时
  • 当流处于 EOF 时(对于套接字,这意味着连接断开)

对可读数据做出反应的常见使用模式类似于以下示例:

<?php

use Amp\\Loop;

const IO_GRANULARITY = 32768;

function isStreamDead($socket) 
    return !is_resource($socket) || @feof($socket);


Loop::onReadable($socket, function ($watcherId, $socket) 
    $socketId = (int) $socket;
    $newData = @fread($socket, IO_GRANULARITY);
    if ($newData != "") 
        // There was actually data and not an EOF notification. Let's consume it!
        parseIncrementalData($socketId, $newData);
     elseif (isStreamDead($socket)) 
        Loop::cancel($watcherId);
    
);

在上面的例子中,我们做了一些非常简单的事情:

  • 为套接字注册一个可读性观察器,当有数据可供读取时触发我们的回调。
  • 当我们在触发的回调中从流中读取数据时,我们将其传递给状态解析器,当满足某些条件时,该解析器会执行特定于域的操作。
  • 如果 fread()调用表明套接字连接已失效,我们会清理为存储该流分配的所有资源。此过程应始终包括在我们注册的与流相关的任何事件循环观察者上调用 Loop::cancel()

您应该始终读取配置的块大小的倍数(默认值:8192),否则您的代码可能无法在 stream_select() 以外的循环后端按预期工作,有关更多信息,请参阅 amphp/amp#65

onWritable()

这是一个高级的低级 API。大多数用户应该改用 amphp/byte-stream

流本质上是“总是”可写的。唯一没有出现的情况是它们各自的写入缓冲区已满。

对可写性做出反应的一种常见使用模式包括在客户端首次连接到服务器时初始化可写性观察程序而不启用它。一旦发生不完整的写入,我们就可以使用 Loop::enable()来“取消暂停”写入观察器,直到数据完全发送,而无需在同一流上多次创建和取消新的观察器资源。

暂停、恢复和取消观察者

除了通过 Loop::cancel()清除之外,所有观察者,无论类型如何,都可以暂时禁用和启用。这允许高级功能,例如在达到同时限制时禁用服务器应用程序中新套接字客户端的接受。一般来说,与重复取消和重新注册观察者相比,通过暂停/恢复重用观察者的性能特征是有利的。

disable()

一个简单的禁用示例:

<?php

use Amp\\Loop;

// Register a watcher we'll disable
$watcherIdToDisable = Loop::delay($msDelay = 1000, function () 
    echo "I'll never execute in one second because: disable()\\n";
);

// Register a watcher to perform the disable() operation
Loop::delay($msDelay = 500, function () use ($watcherIdToDisable) 
    echo "Disabling WatcherId: ", $watcherIdToDisable, "\\n";
    Loop::disable($watcherIdToDisable);
);

Loop::run();

在我们的第二个观察者回调执行后,事件循环退出,因为不再有任何已启用的观察者注册处理。

enable()

enable() 是上面演示的 disable() 示例的直径模拟:

<?php

use Amp\\Loop;

// Register a watcher
$myWatcherId = Loop::repeat($msInterval = 1000, function() 
    echo "tick\\n";
);

// Disable the watcher
Loop::disable($myWatcherId);

// Remember, nothing happens until the event loop runs, so it doesn't matter that we
// previously created and disabled $myWatcherId
Loop::run(function () use ($myWatcherId) 
    // Immediately enable the watcher when the reactor starts
    Loop::enable($myWatcherId);
    // Now that it's enabled we'll see tick output in our console every 1000ms.
);

对于稍微复杂一点的用例,让我们看一个常见的场景,服务器可能会创建一个最初禁用但随后根据需要启用的写入观察器:

<?php

use Amp\\Loop;

class Server

    private $clients = [];

    public function startServer()
    
        // ... server bind and accept logic would exist here
        Loop::run();
    

    private function onNewClient($sock)
    
        $socketId = (int) $sock;
        $client = new ClientStruct;
        $client->socket = $sock;
        $readWatcher = Loop::onReadable($sock, function () use ($client) 
            $this->onReadable($client);
        );
        $writeWatcher = Loop::onWritable($sock, function () use ($client) 
            $this->doWrite($client);
        );

        Loop::disable($writeWatcher); // <-- let's initialize the watcher as "disabled"

        $client->readWatcher = $readWatcher;
        $client->writeWatcher = $writeWatcher;

        $this->clients[$socketId] = $client;
    

    // ... other class implementation details here ...

    private function writeToClient($client, $data)
    
        $client->writeBuffer .= $data;
        $this->doWrite($client);
    

    private function doWrite(ClientStruct $client)
    
        $bytesToWrite = strlen($client->writeBuffer);
        $bytesWritten = @fwrite($client->socket, $client->writeBuffer);

        if ($bytesToWrite === $bytesWritten) 
            Loop::disable($client->writeWatcher);
         elseif ($bytesWritten >= 0) 
            $client->writeBuffer = substr($client->writeBuffer, $bytesWritten);
            Loop::enable($client->writeWatcher);
         elseif ($this->isSocketDead($client->socket)) 
            $this->unloadClient($client);
        
    

    // ... other class implementation details here ...

cancel()

完成后始终取消持久观察者很重要,否则会在应用程序中造成内存泄漏。此功能的工作方式与上述启用/禁用示例完全相同:

<?php

use Amp\\Loop;

Loop::run(function() 
    $myWatcherId = Loop::repeat($msInterval = 1000, function () 
        echo "tick\\n";
    );

    // Cancel $myWatcherId in five seconds and exit the event loop
    Loop::delay($msDelay = 5000, function () use ($myWatcherId) 
        Loop::cancel($myWatcherId);
    );
);

onSignal()

Loop::onSignal() 可用于对发送到进程的信号作出反应。

<?php

use Amp\\Loop;

Loop::run(function () 
    // Let's tick off output once per second so we can see activity.
    Loop::repeat($msInterval = 1000, function () 
            echo "tick: ", date('c'), "\\n";
    );

    // What to do when a SIGINT signal is received
    $watcherId = Loop::onSignal(UV::SIGINT, function () 
        echo "Caught SIGINT! exiting ...\\n";
        exit;
    );
);

从上面的例子可以清楚地看出,信号观察器可以像任何其他事件一样被启用、禁用和取消。

引用观察者(Referencing Watchers)

观察者可以被引用或不被引用。未引用的观察者不会使循环保持活动状态。默认情况下会引用所有观察者。
使用未引用的观察者的一个例子是使用信号观察者。一般来说,如果所有的观察者都消失了,而只有信号观察者仍然存在,你想退出循环,因为你没有积极地等待那个事件的发生。

reference()

将观察者标记为已引用。将 $watcherId 作为第一个也是唯一的参数。

unreference()

将观察者标记为未引用。将 $watcherId 作为第一个也是唯一的参数。

驱动程序绑定状态(Driver Bound State)

有时拥有全局状态非常方便。虽然通常应该注入依赖项,但将 DnsResolver 传递给需要网络连接的所有内容是不切实际的。Loop 访问器因此提供了两个方法 getState 和 setState 来将状态全局存储到当前事件循环驱动程序。
这些应该小心使用!它们可用于存储循环绑定的单例,例如 DNS 解析器、文件系统驱动程序或全局 ReactAdapter。应用程序通常不应使用这些方法。

事件循环附录(Event Loop Addenda)

Watcher Callback Parameters

使用以下标准化参数顺序调用观察者回调:

Watcher TypeCallback Signature
defer()function(string $watcherId, $callbackData)
delay()function(string $watcherId, $callbackData)
repeat()function(string $watcherId, $callbackData)
onReadable()function(string $watcherId, $stream, $callbackData)
onWritable()function(string $watcherId, $stream, $callbackData)
onSignal()function(string $watcherId, $signo, $callbackData)

Watcher Cancellation Safety

在其自己的回调中取消观察者总是安全的。例如:

<?php

use Amp\\Loop;

$increment = 0;

Loop::repeat($msDelay = 50, function ($watcherId) use (&$increment) 
    echo "tick\\n";
    if (++$increment >= 3) 
        Loop::cancel($watcherId); // <-- cancel myself!
    
);

从多个地方取消观察者也总是安全的。双重取消将被简单地忽略。

An Important Note on Writability

因为流本质上是“总是”可写的,所以你应该只在有数据要发送时启用可写性观察者。如果您在应用程序没有任何内容可写时启用这些观察程序,则观察程序将无休止地触发,直到禁用或取消。这将最大限度地利用你的 CPU。如果您在应用程序中看到莫名其妙的高 CPU 使用率,那么可以肯定您有一个可写性观察器,但您在完成后未能禁用或取消它。
此区域的标准模式是在稍后启用可写性观察程序之前将其初始化为禁用状态,如下所示:

<?php

use Amp\\Loop;

$watcherId = Loop::onWritable(STDOUT, function () );
Loop::disable($watcherId);
// ...
Loop::enable($watcherId);
// ...
Loop::disable($watcherId);

过程信号编号可用性(Process Signal Number Availability)

php-uv公开了可观察信号的 UV::SIG*常量。使用 EventDriver 的应用程序在注册信号观察器时需要手动指定适当的整数信号编号。

计时器漂移(Timer Drift)

重复计时器基本上是简单的延迟计时器,在触发相应的处理程序之前会自动重新安排。它们会受到定时器漂移的影响。多个计时器可能会堆积起来,以防它们作为协程执行。

以上是关于Amphp之事件循环API的主要内容,如果未能解决你的问题,请参考以下文章

Amphp之事件循环

Amphp之事件循环

PHP Amp & Amphp中文文档

PHP Amp & Amphp中文文档

Amphp之Promises(承诺约定)

Amphp之Promises(承诺约定)