[TOC]
简单实例
- 配置驱动
假如使用database这个队列驱动,首先要创建数据表进行记录
//以下命令会在数据库中生成jobs表
php artisan queue:table
php artisan migrate
然后更改驱动配置,可以修改.env 中的配置
QUEUE_DRIVER=database
- 创建任务类
php artisan make:job InsertData
以上命令生成app/Jobs/InsertData.php,然后修改该文件的handle方法
public function handle()
{
// 可以在这里定义你想做的任何事情。。。
// 以向数据库写入文件为例
\DB::table('orders')->insert(
array(
'user_id' => 1,
'remark' => time(),
)
);
}
- 分发任务
<?php
namespace App\Http\Controllers\Home;
use App\Jobs\InsertData;
class TestController extends CommonController
{
public function testQueue()
{
InsertData::dispatch();
}
}
定义一条路由,方便访问
Route::get('/test/queue','Home\[email protected]');
访问该条路由后,
- 查看队列数据表jobs,发现多了一条数据,证明分发数据成功,已经写入队列。
- 查看数据表orders,并没有任何变化,说明队列并没有运行
监听队列
php artisan queue:work
- 查看队列数据表jobs,发现上面写入的数据消失
- 查看数据表orders,写入一条新的数据
总结: 实际上应该先运行队列监听,上面3 和 4 颠倒是为了便于观察过程,以上就是队列运行的最基本过程,先往jobs数据表中写入任务信息, 然后通过queue排队读取该任务,运行成功后清除该条数据。
1. 简介和配置
1.1 好处
将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和相应的时间
1.2 配置文件
队列配置文件存放在 config/queue.php
// 设置使用哪种队列驱动
'default' => env('QUEUE_DRIVER', 'sync'),
// 队列驱动方式可选项的详细配置
'connections' => [
'sync' => [
'driver' => 'sync',
],
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
],
// 其他一些选项 。。。
'redis' => [
'driver' => 'redis',
'connection' => 'default', # config/database.php 中redis连接选项
'queue' => 'default', # 默认队列任务被发给指定连接的时候会被分发到这个队列中
'retry_after' => 90,
],
],
1.3 队列驱动的必要配置
- 如果使用database队列驱动
首先要创建数据表
//以下命令会在数据库中生成jobs表
php artisan queue:table
php artisan migrate
然后更改驱动配置,可以修改.env 中的配置
QUEUE_DRIVER=database
- 如果使用redis队列驱动
2. 创建任务
2.1 生成任务类
php artisan make:job InsertData
2.2 修改任务类
一般来说,最基本的就是补充handle方法
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use App\Http\Model\Order;
class InsertData implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
// 最大尝试次数
public $tries = 5;
// 超时时间
public $timeout = 120;
// 上面两个属性也可以通过命令行指定,但是优先级低于上面的属性定义
// php artisan queue:work --tries=3
// php artisan queue:work --timeout=30
public $order = '';
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Order $order)
{
$this->order = $order;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
\DB::table('orders')->insert(
array(
'user_id' => 1,
'remark' => $this->order->remark,
)
);
}
}
这里有一点需要注意,官方文档说
在任务类的构造器中直接传递了一个 Eloquent 模型。因为我们在任务类里引用了 SerializesModels 这个 trait,使得 Eloquent 模型在处理任务时可以被优雅地序列化和反序列化。如果你的队列任务类在构造器中接收了一个 Eloquent 模型,那么只有可识别出该模型的属性会被序列化到队列里。当任务被实际运行时,队列系统便会自动从数据库中重新取回完整的模型。
这句话怎么理解呢,何为优雅的序列化和反序列化?
简单说,当使用Eloquent模型的时候,存入队列的时候只存了 关键数据 ,实际运行的时候 重新 去数据库取完成模型。
我们用上面的例子测试下
<?php
namespace App\Http\Controllers\Home;
use App\Jobs\InsertData;
use App\Http\Model\Order;
class TestController extends CommonController
{
public function testQueue()
{
$order = Order::find(1);
$order->remark = 'modify';
InsertData::dispatch($order);
}
}
最终的结果是,尽管我们修改了order的属性remark,但是最终存入到数据库中的数据仍旧是我们使用Order::ind(1)对象的remark的值
2.3 分发任务
- 使用dispatch方法
// 这个任务将被分发到默认队列...
YourJob::dispatch();
YourJob::dispatch($order);
// 这个任务将被发送到「emails」队列...
YourJob::dispatch()->onQueue('emails');
延迟分发
//我们指定一个任务在分配后 10 分钟内不可被处理: InsertData::dispatch($order) ->delay(Carbon::now()->addMinutes(10));
工作链
工作链允许你指定应该按顺序运行的队列列表。如果一个任务失败了,则其余任务将不会运行。你可以在分发任务的时候使用 withChain 方法来执行具有工作链的队列任务。
Job1::withChain([
new otherJob2,
new otherJob3
])->dispatch();
$order = Order::find(1);
$post = Post::find(1);
InsertData::withChain([
new InsertDataToPost($post)
])->dispatch($order);
2.4 自定义队列 & 连接
指定队列,使用onQueue()
ProcessPodcast::dispatch($podcast)->onQueue('processing');
指定连接,使用onConnection()
ProcessPodcast::dispatch($podcast)->onConnection('redis');
连起来使用
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
3. 运行队列处理器
3.1 运行
命令:
php artisan queue:work
队列处理器是长时间运行的进程,如果你修改代码那这些改变是不会应用到处理器中的。所以在你重新部署过程中,一定要 重启
3.2 处理单一任务
每次只执行一个队列任务
php artisan queue:work --once
3.3 指定连接 & 队列
指定需要监听的连接
//只启动了redis队列的驱动,如果你将数据推到其它连接比如database,则不会自动处理
php artisan queue:work redis
指定队列
//启动一个只处理那个特定队列的队列处理器
php artisan queue:work redis --queue=emails
3.4 资源注意事项
守护程序队列不会在处理每个作业之前 「重新启动」 框架。因此,在每个任务完成后,您应该释放任何占用过大的资源。例如,如果你使用 GD 库进行图像处理,你应该在完成后用 imagedestroy 释放内存。
3.5 队列优先级
把一个任务推到 high 优先级的队列中
dispatch((new Job)->onQueue('high'));
要保证high队列的任务在low队列任务之前处理
// 队列名称先后排列,中间用逗号隔开
php artisan queue:work --queue=high,low
3.6 队列重启
php artisan queue:restart
3.7 任务过期 & 超时
- 任务过期
config/queue.php 配置文件里,每一个队列连接都定义了一个 retry_after 选项, 比如 ‘retry_after‘ => 90, 那么当任务运行超过90s之后,该任务会 重新回到队列中
- 队列处理超时
指定了 Laravel 队列处理器最多执行多长时间后就应该被 关闭掉
php artisan queue:work --timeout=60
--timeout 应该永远都要比 retry_after 短至少几秒钟的时间。这样就能保证任务进程总能在失败重试前就被杀死了。如果你的 --timeout 选项大于 retry_after 配置选项,你的任务可能被执行两次,甚至更多次。
3.8 队列进程睡眠时间
当队列需要处理任务时,进程将继续处理任务,它们之间没有延迟。但是,如果没有新的工作可用,sleep 参数决定了工作进程将 「睡眠」 多长时间:
php artisan queue:work --sleep=3
4. 配置Supervisor
4.1 下载程序并安装
yum install python-setuptools
easy_install supervisor
// 使用root身份创建一个全局配置文件
#echo_supervisord_conf > /etc/supervisord.conf
4.2 编辑配置文件
vim /etc/supervisord.conf
加入以下内容,directory 指向工程所在的根目录
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
directory = /data/zhengde
command=php artisan queue:work database --sleep=3 --tries=3
autostart=true
autorestart=true
user=root
numprocs=8
redirect_stderr=true
stdout_logfile=/var/log/worker.log
4.3 使用
这个工具主要就两个命令:
- supervisord : supervisor的服务器端部分,启动supervisor就是运行这个命令
supervisorctl:启动supervisor的命令行窗口。
```3.1. 启动supervisord
$supervisord -c /etc/supervisord.conf
3.2. 关闭supervisord
$supervisorctl shutdown
3.3. 重新载入配置
$supervisorctl reload
3.4.更新新的配置到supervisord
$supervisorctl update
3.5.启动某个进程(program_name=你配置中写的程序名称)
$supervisorctl start program_name
3.6.查看正在守候的进程
$supervisorctl
3.7.停止某一进程 (program_name=你配置中写的程序名称)
$pervisorctl stop program_name
3.8.重启某一进程 (program_name=你配置中写的程序名称)
$supervisorctl restart program_name
3.9.停止全部进程
$supervisorctl stop all
注意:显示用stop停止掉的进程,用reload或者update都不会自动重启 。
# 5. 处理失败的任务
## 5.1 生成队列失败数据表
php artisan queue:failed-table
php artisan migrate
在调用 queue worker,命令时你应该通过 --tries 参数指定任务的最大重试次数。如果不指定,任务就会永久重试:
php artisan queue:work redis --tries=3
## 5.2 处理失败任务
你可以在任务类里直接定义 failed 方法,它能在任务失败时运行任务的清除逻辑。这个地方用来发一条警告给用户或者重置任务执行的操作等再好不过了。导致任务失败的异常信息会被传递到 failed 方法:
<?php
namespace App\Jobs;
use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
public function handle(AudioProcessor $processor)
{
// 处理上传播客...
}
public function failed(Exception $exception)
{
// 给用户发送失败通知,等等...
}
}
## 5.3 任务失败事件
如果你想注册一个当队列任务失败时会被调用的事件,则可以用 Queue::failing 方法。这样你就有机会通过这个事件来用 e-mail 或 HipChat 通知你的团队。
例如我们可以在 Laravel 内置的 AppServiceProvider 中对这个事件附加一个回调函数:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
public function register()
{
//
}
}
## 5.4 重试失败任务
// 查看所有任务
php artisan queue:failed
// 重试id为5的任务
php artisan queue:retry 5
// 重试所有失败任务
php artisan queue:retry all
// 删除一条失败任务
php artisan queue:forget 5
// 删除所有失败任务
php artisan queue:flush
## 5.5 任务事件
使用队列的 before 和 after 方法,你能指定任务处理前和处理后的回调处理。在这些回调里正是实现额外的日志记录或者增加统计数据的好时机。
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 启动任意服务。
@return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* 注册服务提供者。
*
* @return void
*/
public function register()
{
//
}
}
```