从 Laravel(NodeJS)外部推送到 Laravel 队列

Posted

技术标签:

【中文标题】从 Laravel(NodeJS)外部推送到 Laravel 队列【英文标题】:Push to Laravel queue from outside Laravel (NodeJS) 【发布时间】:2017-07-16 20:50:28 【问题描述】:

我有一个作为纯 API 应用程序运行的 Laravel 5.3 安装,需要从多个不同的应用程序连接。

一切正常(毕竟我们谈论的是 Laravel :P),除了我无法弄清楚一件事:

我有一个 MQTT 服务器,它正在侦听来自多个设备的消息(不管是什么)。这些消息包含有关需要在后端调用的作业类和方法的信息。

我不能直接调用 API,设备根本不支持这个(他们支持,但比使用 MQTT 传输数据要努力得多)。我的想法是将新作业推送到定义要调用哪个 Laravel 作业类(以及哪个方法)的队列中。问题在于 JSON 序列化...

MQTT 服务器在 NodeJS 上运行,我的队列在 Redis 上运行。我记得 Taylor 的一条推文,他提到理论上可以序列化所需的 JSON 并从 Laravel 外部推送到队列,并让 Laravel 处理工作。

有人知道如何解决这个问题吗?是否有关于 JSON 结构的文档?

我还应该提到这个解决方案NodeJS push queue, consumed by Laravel worker 对我不起作用。与上面的结果相同,作业被放入队列但没有被处理或抛出任何错误就被丢弃了。

Redis 中排队事件的示例数据结构如下所示:

"\"job\":\"Illuminate\\\\Broadcasting\\\\BroadcastEvent\",\"data\":\"event\":\"O:28:\\\"App\\\\Events\\\\NotificationEvent\\\":5:s:7:\\\"\\u0000*\\u0000name\\\";s:12:\\\"notification\\\";s:4:\\\"data\\\";a:4:s:4:\\\"testkey\\\";s:14:\\\"testval\\\";s:9:\\\"timestamp\\\";s:19:\\\"2017-02-24 11:07:48\\\";s:5:\\\"event\\\";s:12:\\\"notification\\\";s:5:\\\"class\\\";s:28:\\\"App\\\\Events\\\\NotificationEvent\\\";s:10:\\\"\\u0000*\\u0000channel\\\";N;s:7:\\\"\\u0000*\\u0000user\\\";O:45:\\\"Illuminate\\\\Contracts\\\\Database\\\\ModelIdentifier\\\":2:s:5:\\\"class\\\";s:8:\\\"App\\\\User\\\";s:2:\\\"id\\\";i:2;s:6:\\\"socket\\\";N;\",\"id\":\"XuUKRTf8CTSdzaVgp2gRcvmxQqLcpBUG\",\"attempts\":1"

基于该结构,我认为对象(需要序列化)应该类似于:


"job":"EventClass@method", //<-- Just a name
"data":
    "event":"EventClass", //<-- Just a name
    "name":"EventName", //<-- Just a name
    "data":
    "key":"value"
    "event":"EventName" //<-- Same as data.name
    "class":"EventClass@method" //<-- This is actually being called
    

Laravel 实际放入队列的内容中包含其他信息(如时间戳、用户模型标识符等),但我认为这不是触发工作所必需的。

数据需要在 JS 中进行序列化,以实现与 php serialize() 类似的输出(或者更好,以获得可以通过 php 的 unserialize() 进行反序列化的字符串。

我通过 php-serialization NPM 模块实现了这一点(感谢 Simon Svensson),但 Laravel 仍然没有使用该作业(已丢弃但未执行)

提前感谢您的帮助:)

编辑解决方案

感谢 Simon 的回答,这是关于如何在 javascript 中序列化作业数据并推送到 Laravel 队列(并让 Laravel 自动处理整个事情)的解决方案。

请注意,这是在 Redis 中使用队列的示例。当使用 Beanstalkd 或基于数据库的队列时,这可能看起来不同(或不同)。

这是我成功使用的代码:

var serialize,Class,job,jobUser,jobData,serialized,result;

serialize = require('php-serialization').serialize;
Class = require('php-serialization').Class;

job = new Class("App\\Events\\NotificationEvent");

job.__addAttr__("name","string","notification","string","protected");

jobData = new Class();
jobData.__addAttr__("testkey","string","testval","string");
jobData.__addAttr__("timestamp","string","2017-02-24 11:07:48","string");
jobData.__addAttr__("event","string","notification","string");
jobData.__addAttr__("class","string","App\\Events\\NotificationEvent","string");
job.__addAttr__("data","string",jobData,"array","public");

job.__addAttr__("channel","string",null,"null","protected");

jobUser = new Class("Illuminate\\Contracts\\Database\\ModelIdentifier")
jobUser.__addAttr__("class","string","App\\User","string","public");
jobUser.__addAttr__("id","string",2,"integer","public");
job.__addAttr__("user","string",jobUser,"object","protected");

job.__addAttr__("socket","string",null,"null","public");

serialized = serialize(job,"object");

result = 
    job:"Illuminate\\Broadcasting\\BroadcastEvent",
    data:
        event:serialized
    ,
    id:"XuUKRTf8CTSdzaVgp2gRcvmxQqLcpBUG",
    attempts:1
;

queue.rpush('queues:default',JSON.stringify(result));

我还没有弄清楚 ID 的确切用途,我成功地将作业推送到队列中,并且始终具有相同的 ID。我想如果你正在快速推动工作并且它们同时被存储,这可能是一个问题。因为它是一个字符串,你可以用任何你喜欢的随机 ID 替换它(Laravel 生成的随机 ID 是 32 个字符,我认为保持这个长度是个好主意)。

最初推送作业时应将尝试次数设置为 1。如果 Laravel 无法处理该作业,它会将其推回队列并增加尝试次数。

【问题讨论】:

如果我的最新更新没有解决您的问题,并且您仍然需要帮助调试队列(这可能涉及将调试语句添加到 vendor/laravel/framework 文件夹中),请尝试在 IRC(@ 987654325@) 或 Slack (larachat.co)。 反对票是为了什么?解决在其他任何地方都没有答案的问题? 我赞成你的问题 Jan - 我很高兴我不是唯一一个尝试这样做的人 :) 【参考方案1】:

首先,请注意这是 Laravel 5.3 中基于数据库的队列中作业的格式。较新版本的 Laravel 包含更改。

有效载荷列应包含以下格式的 json 对象。在这种情况下,可以对作业 (...\\CallQueuedHandler@call) 进行硬编码。我相信 commandName 键仅用于显示目的。然而,命令键是更难的部分,它应该是unserialize() 支持的有效对象。看起来 npm 上有用于此目的的包,快速搜索出现了 php-serialization。


    "job": "Illuminate\\Queue\\CallQueuedHandler@call",
    "data": 
        "commandName": "App\\Jobs\\MyJobClass",
        "command": "O:19:\"App\\Jobs\\MyJobClass\"... /* stuff */"
    


您提供的 json 有效负载会生成以下对象。作业和数据键都很重要。


  "job": "Illuminate\\Broadcasting\\BroadcastEvent",
  "data": 
    "event": "O:28:\"App\\Events\\NotificationEvent\":5:s:7:\"\u0000*\u0000name\";s:12:\"notification\";s:4:\"data\";a:4:s:4:\"testkey\";s:14:\"testval\";s:9:\"timestamp\";s:19:\"2017-02-24 11:07:48\";s:5:\"event\";s:12:\"notification\";s:5:\"class\";s:28:\"App\\Events\\NotificationEvent\";s:10:\"\u0000*\u0000channel\";N;s:7:\"\u0000*\u0000user\";O:45:\"Illuminate\\Contracts\\Database\\ModelIdentifier\":2:s:5:\"class\";s:8:\"App\\User\";s:2:\"id\";i:2;s:6:\"socket\";N;"
  ,
  "id": "XuUKRTf8CTSdzaVgp2gRcvmxQqLcpBUG",
  "attempts": 1

我认为有问题的部分是序列化对象。以更易于阅读的方式重新格式化(但完全破坏了它)......

O:28:"App\Events\NotificationEvent":5:
    // protected $name = 'notification'
    s:7:" * name";s:12:"notification";

    // public $data = array(...)
    s:4:"data";a:4:
        // 'testkey => 'testval'
        s:4:"testkey";s:14:"testval";

        // 'timestamp' => '2017-02-24 11:07:48';
        s:9:"timestamp";s:19:"2017-02-24 11:07:48";

        // 'event' => 'notification';
        s:5:"event";s:12:"notification";

        // 'class' => App\Events\NotificationEvent::class;
        s:5:"class";s:28:"App\Events\NotificationEvent";
    

    // protected $channel = null;
    s:10:"\0*\0channel";N;

    // protected $user = (instance of ModelIdentifier)
    s:7:"\0*\0user";O:45:"Illuminate\Contracts\Database\ModelIdentifier":2:
        // public $class = App\User::class;
        s:5:"class";s:8:"App\User";

        // public $id = 2;
        s:2:"id";i:2;
    

    // public $socket = null;
    s:6:"socket";N;

这种格式暴露了这样一个事实,即您的作业使用 SerializesModels 特征将模型的引用替换为包含类+标识符的简单条目,并将在 __wakeup 期间恢复它们。

我相信您的问题在于对 json 的心理解析和序列化格式;你猜测的结构是……错误的。

接下来的步骤不会是猜测任何事情。 1. 复制您已经拥有有效负载的确切测试通知。只需复制粘贴即可。 (您可能需要更改 id,我猜它是用于重复数据删除的。) 2. 使用 php-serialization 构建事件数据,旨在构建与原始事件有效负载相同的东西。完全没有变化。 3. 如果到此为止,请随意更改序列化的事件数据,看看会发生什么。

【讨论】:

根据您的建议更新了我的问题,但仍然无法正常工作 您的回答绝对引导我走向正确的方向。我想我的错误是我没有看到不需要序列化整个工作,而只需要序列化“数据”部分。稍后我会将您的答案标记为已接受。将首先发布问题的解决方案。【参考方案2】:

对于需要从事“经典”工作的人,这里是代码,基于 Jan Schuermanns 原始问题和解决方案:

const php = require('php-serialization');

var job_class = 'App\\Jobs\\TestJob';
var data = 

job = new php.Class(job_class);

job.__addAttr__("data","string", JSON.stringify(data),"string","private");

result = 
    id: 1,
    type: 'job',
    displayName: job_class,
    job: "Illuminate\\Queue\\CallQueuedHandler@call",
    maxTries: null,
    delay: null,
    timeout: null,
    timeoutAt: null,
    data: 
        commandName: job_class,
        command: php.serialize(job, "object")
    
;

示例作业类:

<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;

class TestJob implements ShouldQueue

    private $data;

    public function __construct($data)
    
        $this->data = $data;
    

    public function handle() 
        var_dump(json_decode($this->data));
        return;
    

导致:

[2019-08-17 ...] Processing: App\Jobs\TestJob
object(stdClass)#1870 (0) 

[2019-08-17 ...] Processed:  App\Jobs\TestJob

编辑: 如果您必须处理变音符号或其他特殊字符,您可能需要将 latin1 与 iconv-lite 一起使用:

iconv.decode(Buffer.from(JSON.stringify(data)), 'latin1')

job.__addAttr__("data","string", JSON.stringify(data),"string","private");

...
queueChannel.sendToQueue(queuePublish, Buffer.from(JSON.stringify(result), 'latin1'));

【讨论】:

【参考方案3】:

此解决方案相当肮脏,将来可能无法使用,但是...

您可以复制将作业发送到队列的“旧方式”:

\Queue::push('\Namespace\Of\Class@handlerMethod', ['foo' => 'bar']);

在处理程序中:

public function handlerMethods($job, $data)

    $job->delete;
    // do whatever with data

在 Node.js 中,您可以执行以下操作:

SQS.sendMessage(
    MessageBody: JSON.stringify(
        job: '\Namespace\Of\Class@handlerMethod',
        data: 
            foo: 'bar',
        
    ),
    QueueUrl: yourQueueUrl
 , err =>  /* handle */ );

缺点

您必须手动删除作业

我不确定死信队列是否适用于这种方法(可能)

不能以这种方式序列化模型 - 如果您需要在处理程序中从数据中获取模型,则需要传递主键并以这种方式检索它

没有在实际工作中尝试过这个,我的处理程序类相当“实用”,但很可能,它可能不适用于实际的 laravel 工作

这种方法来自 Laravel 4,因此它可能已经被弃用,即使没有,将来也可能会被删除/更改

这样做需要您自担风险,但这里有证据证明这是可能的。

【讨论】:

【参考方案4】:

只需公开一个编写作业处理器的 API。 如果我们谈论的是节点 js 和 laravel。需要公开一个 API,它将接受 HTTP 请求并放置作业并返回到节点 js 服务器。

稍后,如果您必须使用安装了不同语言的不同服务器执行作业,则其他人提供的解决方案将不起作用,并且将仅限于特定框架。

【讨论】:

以上是关于从 Laravel(NodeJS)外部推送到 Laravel 队列的主要内容,如果未能解决你的问题,请参考以下文章

使用 OneSingnal 从 NodeJS 发送通知推送到 React Native

成功登录后将用户数据从 Nodejs 服务器推送到 Angular

如何在nodeJS的mysql查询中将值推送到数组?

从外部附件输入流中读取并推送到 s3 的最有效方法?

Pivotal HDB - 如何将数据从 HAWQ 内部/外部可读表推送到 Greenplum

将元素推送到嵌套数组猫鼬 nodejs