队列
引言
在构建你的 web 应用程序时,你可能有一些任务,比如解析和存储一个上传的 CSV 文件,这些任务在典型的 web 请求期间执行时间过长。幸运的是,Laravel 允许你轻松创建可在后台处理的排队作业。通过将耗时任务转移到队列中,你的应用程序可以以惊人的速度响应 web 请求,并为你的客户提供更好的用户体验。
Laravel 队列为各种不同的队列后端提供了统一的队列 API,例如 Amazon SQS、Redis,甚至是一个关系型数据库。
Laravel 的队列配置选项存储在你的应用程序的 config/queue.php 配置文件中。在这个文件中,你将找到框架附带的每个队列驱动程序的连接配置,包括 database、Amazon SQS、Redis 和 Beanstalkd 驱动程序,以及一个将立即执行作业的同步驱动程序(用于开发或测试)。还包括一个 null 队列驱动程序,它会丢弃排队的作业。
Laravel Horizon 是一个漂亮的仪表板和配置系统,用于你由 Redis 驱动的队列。查看完整的 Horizon 文档以获取更多信息。
连接 vs. 队列
在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别非常重要。在你的 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义了与后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。然而,任何给定的队列连接都可能拥有多个“队列”,这些队列可以被认为是不同的堆栈或堆积的排队作业。
请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是作业被发送到给定连接时将被分派到的默认队列。换句话说,如果你分派一个作业而没有明确定义它应该被分派到哪个队列,该作业将被放置在连接配置的 queue 属性中定义的队列上:
use App\Jobs\ProcessPodcast;
// This job is sent to the default connection's default queue...
ProcessPodcast::dispatch();
// This job is sent to the default connection's "emails" queue...
ProcessPodcast::dispatch()->onQueue('emails');
一些应用程序可能不需要将作业推送到多个队列,而是倾向于只有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序来说特别有用,因为 Laravel 队列工作程序允许你按优先级指定它应该处理的队列。例如,如果你将作业推送到一个 high 队列,你可以运行一个为其提供更高处理优先级的工作程序:
php artisan queue:work --queue=high,default
驱动程序注意事项和先决条件
数据库
为了使用 database 队列驱动程序,你需要一个数据库表来保存作业。通常,这包含在 Laravel 的默认 0001_01_01_000002_create_jobs_table.php 数据库迁移中;但是,如果你的应用程序不包含此迁移,你可以使用 make:queue-table Artisan 命令来创建它:
php artisan make:queue-table
php artisan migrate
Redis
为了使用 redis 队列驱动程序,你需要在 config/database.php 配置文件中配置一个 Redis 数据库连接。
serializer 和 compression Redis 选项不受 redis 队列驱动程序支持。
Redis Cluster
如果你的 Redis 队列连接使用 Redis Cluster,你的队列名称必须包含一个键哈希标签。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],
阻塞
在使用 Redis 队列时,你可以使用 block_for 配置选项来指定驱动程序在遍历工作程序循环并重新轮询 Redis 数据库之前,应该等待作业可用的时间。
根据你的队列负载调整这个值比持续轮询 Redis 数据库以获取新作业更高效。例如,你可以将该值设置为 5,以表明驱动程序在等待作业可用时应该阻塞五秒钟:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],
将 block_for 设置为 0 将导致队列工作程序无限期地阻塞,直到有作业可用。这也将阻止在下一个作业被处理之前处理诸如 SIGTERM 之类的信号。
其他驱动程序先决条件
以下依赖项是所列队列驱动程序所必需的。这些依赖项可以通过 Composer 包管理器安装:
- Amazon SQS:
aws/aws-sdk-php ~3.0 - Beanstalkd:
pda/pheanstalk ~5.0 - Redis:
predis/predis ~2.0或 phpredis PHP 扩展 - MongoDB:
mongodb/laravel-mongodb
创建作业
生成作业类
默认情况下,你的应用程序的所有可排队作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当你运行 make:job Artisan 命令时,它将被创建:
php artisan make:job ProcessPodcast
生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,向 Laravel 表明该作业应该被推送到队列以异步运行。
作业存根可以使用存根发布进行自定义。
类结构
作业类非常简单,通常只包含一个 handle 方法,该方法在队列处理作业时被调用。为了开始,让我们看看一个示例作业类。在这个例子中,我们假装管理一个播客发布服务,需要在上传的播客文件发布之前进行处理:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
}
在这个例子中,请注意我们能够将一个 Eloquent 模型直接传递到排队作业的构造函数中。由于作业使用的 Queueable 特性,当作业处理时,Eloquent 模型及其加载的关系将被优雅地序列化和反序列化。
如果你的排队作业在其构造函数中接受一个 Eloquent 模型,只有模型的标识符将被序列化到队列上。当作业实际被处理时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化方法允许将更小的作业有效载荷发送到你的队列驱动程序。
handle 方法依赖注入
handle 方法在队列处理作业时被调用。请注意,我们可以在作业的 handle 方法上类型提示依赖项。Laravel 服务容器会自动注入这些依赖项。
如果你想完全控制容器如何将依赖项注入到 handle 方法中,你可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收作业和容器。在回调中,你可以按照自己的意愿调用 handle 方法。通常,你应该从你的 App\Providers\AppServiceProvider 服务提供者的 boot 方法中调用此方法:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
二进制数据,如原始图像内容,在传递给排队作业之前应通过 base64_encode 函数进行编码。否则,作业在被放置到队列上时可能无法正确地序列化为 JSON。
排队关系
因为所有加载的 Eloquent 模型关系在作业排队时也会被序列化,所以序列化的作业字符串有时会变得相当大。此外,当作业被反序列化并从数据库中重新检索模型关系时,它们将以其全部形式被检索。在模型在作业排队过程中被序列化之前应用的任何先前关系约束将不会在作业反序列化时应用。因此,如果你希望使用给定关系的子集,你应该在你的排队作业中重新约束该关系。
或者,为了防止关系被序列化,你可以在设置属性值时在模型上调用 withoutRelations 方法。此方法将返回一个不带其加载关系的模型实例:
/**
* Create a new job instance.
*/
public function __construct(
Podcast $podcast,
) {
$this->podcast = $podcast->withoutRelations();
}
如果你正在使用 PHP 构造函数属性提升并希望指示 Eloquent 模型不应序列化其关系,你可以使用 WithoutRelations 属性:
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast,
) {}
为了方便,如果你希望序列化所有没有关系的模型,你可以将 WithoutRelations 属性应用于整个类,而不是将属性应用于每个模型:
<?php
namespace App\Jobs;
use App\Models\DistributionPlatform;
use App\Models\Podcast;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\WithoutRelations;
#[WithoutRelations]
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
public DistributionPlatform $platform,
) {}
}
如果作业接收的是 Eloquent 模型的集合或数组而不是单个模型,则当作业被反序列化和执行时,该集合中的模型将不会恢复其关系。这是为了防止处理大量模型的作业出现过度的资源使用。
唯一作业
唯一作业需要一个支持锁的缓存驱动程序。目前,memcached、redis、dynamodb、database、file 和 array 缓存驱动程序支持原子锁。此外,唯一作业约束不适用于批次中的作业。
有时,你可能希望确保在任何给定时间点,队列中只有一个特定作业的实例。你可以通过在你的作业类上实现 ShouldBeUnique 接口来做到这一点。此接口不需要你在类上定义任何额外的方法:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
// ...
}
在上面的例子中,UpdateSearchIndex 作业是唯一的。因此,如果该作业的另一个实例已经在队列中并且尚未完成处理,则该作业将不会被分派。
在某些情况下,你可能希望定义一个使作业唯一的特定“键”,或者你可能希望指定一个超时,超过该超时后作业不再保持唯一。为此,你可以在你的作业类上定义 uniqueId 和 uniqueFor 属性或方法:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Models\Product
*/
public $product;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}
在上面的例子中,UpdateSearchIndex 作业按产品 ID 保持唯一。因此,任何具有相同产品 ID 的新作业分派都将被忽略,直到现有作业完成处理。此外,如果现有作业在一小时内未被处理,唯一锁将被释放,并且另一个具有相同唯一键的作业可以被分派到队列中。
如果你的应用程序从多个 web 服务器或容器分派作业,你应该确保所有服务器都与同一个中央缓存服务器通信,以便 Laravel 能够准确地确定作业是否唯一。
在处理开始前保持作业唯一
默认情况下,唯一作业在作业完成处理或所有重试尝试失败后才被“解锁”。然而,在某些情况下,你可能希望你的作业在它被处理之前立即解锁。为此,你的作业应该实现 ShouldBeUniqueUntilProcessing 契约而不是 ShouldBeUnique 契约:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}
唯一作业锁
在幕后,当一个 ShouldBeUnique 作业被分派时,Laravel 尝试获取一个带有 uniqueId 键的锁。如果该锁已被持有,则不分派该作业。当作业完成处理或所有重试尝试失败时,该锁将被释放。默认情况下,Laravel 将使用默认缓存驱动程序来获取此锁。但是,如果你希望使用另一个驱动程序来获取锁,你可以定义一个 uniqueVia 方法,该方法返回应使用的缓存驱动程序:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
// ...
/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
如果你只需要限制作业的并发处理,请改用 WithoutOverlapping 作业中间件。
加密作业
Laravel 允许你通过加密来确保作业数据的隐私和完整性。要开始,只需将 ShouldBeEncrypted 接口添加到作业类中。一旦此接口被添加到类中,Laravel 将在将其推送到队列之前自动加密你的作业:
<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}
作业中间件
作业中间件允许你在排队作业的执行周围包装自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下 handle 方法,它利用了 Laravel 的 Redis 速率限制功能,允许每五秒钟只处理一个作业:
use Illuminate\Support\Facades\Redis;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// Handle job...
}, function () {
// Could not obtain lock...
return $this->release(5);
});
}
虽然这段代码是有效的,但 handle 方法的实现变得嘈杂,因为它充满了 Redis 速率限制逻辑。此外,对于我们想要进行速率限制的任何其他作业,这种速率限制逻辑都必须重复。与其在 handle 方法中进行速率限制,我们可以定义一个处理速率限制的作业中间件:
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Process the queued job.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
$next($job);
}, function () use ($job) {
// Could not obtain lock...
$job->release(5);
});
}
}
正如你所看到的,像路由中间件一样,作业中间件接收正在处理的作业和一个回调,该回调应被调用以继续处理作业。
你可以使用 make:job-middleware Artisan 命令生成一个新的作业中间件类。创建作业中间件后,可以通过从作业的 middleware 方法返回它们来将它们附加到作业。make:job Artisan 命令脚手架的作业上不存在此方法,因此你需要手动将其添加到你的作业类中:
use App\Jobs\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
作业中间件也可以分配给可排队事件监听器、可邮寄类和通知。
速率限制
尽管我们刚刚演示了如何编写你自己的速率限制作业中间件,但 Laravel 实际上包含一个你可以利用的速率限制中间件来对作业进行速率限制。像路由速率限制器一样,作业速率限制器是使用 RateLimiter facade 的 for 方法定义的。
例如,你可能希望允许用户每小时备份一次他们的数据,同时不对高级客户施加此类限制。为此,你可以在你的 AppServiceProvider 的 boot 方法中定义一个 RateLimiter:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}
在上面的例子中,我们定义了一个每小时的速率限制;但是,你可以使用 perMinute 方法轻松定义基于分钟的速率限制。此外,你可以将任何你希望的值传递给速率限制器的 by 方法;然而,这个值最常用于按客户分段速率限制:
return Limit::perMinute(50)->by($job->user->id);
一旦你定义了你的速率限制器,你就可以使用 Illuminate\Queue\Middleware\RateLimited 中间件将速率限制器附加到你的作业上。每次作业超过速率限制时,此中间件将根据速率限制持续时间将作业释放回队列,并带有适当的延迟:
use Illuminate\Queue\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}
将速率受限的作业释放回队列仍然会增加作业的总尝试次数。你可能希望相应地调整你的作业类上的 tries 和 maxExceptions 属性。或者,你可能希望使用 retryUntil 方法来定义作业不应再被尝试的时间量。
使用 releaseAfter 方法,你还可以指定在再次尝试释放的作业之前必须经过的秒数:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->releaseAfter(60)];
}
如果你不希望作业在速率受限时被重试,你可以使用 dontRelease 方法:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
如果你正在使用 Redis,你可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,它经过微调,更高效。
防止作业重叠
Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,它允许你根据任意键防止作业重叠。当一个排队作业正在修改一个应该一次只由一个作业修改的资源时,这会很有帮助。
例如,让我们想象你有一个排队作业,它更新用户的信用评分,并且你希望防止同一用户 ID 的信用评分更新作业重叠。为此,你可以从你的作业的 middleware 方法中返回 WithoutOverlapping 中间件:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}
将重叠的作业释放回队列仍然会增加作业的总尝试次数。你可能希望相应地调整你的作业类上的 tries 和 maxExceptions 属性。例如,将 tries 属性保持默认值 1 将阻止任何重叠的作业稍后被重试。
任何相同类型的重叠作业都将被释放回队列。你还可以指定在再次尝试释放的作业之前必须经过的秒数:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}
如果你希望立即删除任何重叠的作业,以便它们不会被重试,你可以使用 dontRelease 方法:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}
WithoutOverlapping 中间件由 Laravel 的原子锁功能提供支持。有时,你的作业可能会意外失败或超时,从而导致锁未被释放。因此,你可以使用 expireAfter 方法显式定义锁的过期时间。例如,下面的示例将指示 Laravel 在作业开始处理后三分钟释放 WithoutOverlapping 锁:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
WithoutOverlapping 中间件需要一个支持锁的缓存驱动程序。目前,memcached、redis、dynamodb、database、file 和 array 缓存驱动程序支持原子锁。
跨作业类共享锁键
默认情况下,WithoutOverlapping 中间件只会阻止相同类的重叠作业。因此,尽管两个不同的作业类可以使用相同的锁键,但它们不会被阻止重叠。但是,你可以使用 shared 方法指示 Laravel 在作业类之间应用该键:
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
节流异常
Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,它允许你节流异常。一旦作业抛出给定数量的异常,所有进一步尝试执行该作业都将被延迟,直到指定的时间间隔过去。这个中间件对于与不稳定的第三方服务交互的作业特别有用。
例如,让我们想象一个与开始抛出异常的第三方 API 交互的排队作业。要节流异常,你可以从你的作业的 middleware 方法中返回 ThrottlesExceptions 中间件。通常,这个中间件应该与一个实现基于时间的尝试的作业配对:
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5 * 60)];
}
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(30);
}
中间件接受的第一个构造函数参数是作业在被节流之前可以抛出的异常数量,而第二个构造函数参数是作业在被节流后再次尝试之前应该经过的秒数。在上面的代码示例中,如果作业连续抛出 10 个异常,我们将等待 5 分钟,然后再尝试该作业,受 30 分钟的时间限制。
当作业抛出异常但尚未达到异常阈值时,该作业通常会立即重试。但是,你可以通过在将中间件附加到作业时调用 backoff 方法来指定此类作业应延迟的分钟数:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}
在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,并且作业的类名被用作缓存“键”。你可以在将中间件附加到作业时通过调用 by 方法来覆盖此键。如果你有多个作业与同一个第三方服务交互,并且你希望它们共享一个共同的节流“桶”,以确保它们遵守单个共享限制,这可能会很有用:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}
默认情况下,此中间件将节流每个异常。你可以通过在将中间件附加到作业时调用 when 方法来修改此行为。只有当提供给 when 方法的闭包返回 true 时,异常才会被节流:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
与 when 方法不同,deleteWhen 方法将作业释放回队列或抛出异常,而 deleteWhen 方法允许你在发生给定异常时完全删除作业:
use App\Exceptions\CustomerDeletedException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(2, 10 * 60))->deleteWhen(CustomerDeletedException::class)];
}
如果你希望将节流的异常报告给你的应用程序的异常处理程序,你可以通过在将中间件附加到作业时调用 report 方法来做到这一点。你可以选择向 report 方法提供一个闭包,只有当给定的闭包返回 true 时,异常才会被报告:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
如果你正在使用 Redis,你可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,它经过微调,比基本的异常节流中间件更高效。
跳过作业
Skip 中间件允许你指定一个作业应该被跳过/删除,而无需修改作业的逻辑。如果给定的条件评估为 true,Skip::when 方法将删除作业,而 Skip::unless 方法将在条件评估为 false 时删除作业:
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when($condition),
];
}
你也可以将一个 Closure 传递给 when 和 unless 方法以进行更复杂的条件评估:
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when(function (): bool {
return $this->shouldSkip();
}),
];
}
分派作业
一旦你编写了你的作业类,你就可以使用作业本身的 dispatch 方法来分派它。传递给 dispatch 方法的参数将被提供给作业的构造函数:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}
如果你想有条件地分派一个作业,你可以使用 dispatchIf 和 dispatchUnless 方法:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新的 Laravel 应用程序中,database 驱动程序是默认的队列驱动程序。你可以在你的应用程序的 config/queue.php 配置文件中指定一个不同的队列驱动程序。
延迟分派
如果你想指定一个作业不应立即可供队列工作程序处理,你可以在分派作业时使用 delay 方法。例如,让我们指定一个作业在分派后 10 分钟内不可用:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}
在某些情况下,作业可能配置了默认延迟。如果你需要绕过此延迟并分派一个作业以立即处理,你可以使用 withoutDelay 方法:
ProcessPodcast::dispatch($podcast)->withoutDelay();
Amazon SQS
Amazon SQS 队列服务的最大延迟时间为 15 分钟。
响应发送到浏览器后分派
或者,如果你的 web 服务器使用 FastCGI,dispatchAfterResponse 方法会延迟分派作业,直到 HTTP 响应发送到用户的浏览器之后。这仍然允许用户开始使用应用程序,即使排队的作业仍在执行中。这通常只用于需要大约一秒钟的作业,例如发送电子邮件。由于它们在当前的 HTTP 请求中被处理,以这种方式分派的作业不需要队列工作程序运行即可被处理:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
你也可以分派一个闭包,并将 afterResponse 方法链接到 dispatch 助手上,以在 HTTP 响应发送到浏览器后执行闭包:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();
同步分派
如果你想立即(同步)分派一个作业,你可以使用 dispatchSync 方法。使用此方法时,作业将不会被排队,并将在当前进程中立即执行:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}
作业和数据库事务
虽然在数据库事务中分派作业是完全可以的,但你应该特别注意确保你的作业实际上能够成功执行。在事务中分派作业时,作业有可能在父事务提交之前就被工作程序处理。当这种情况发生时,你在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能不存在于数据库中。
幸运的是,Laravel 提供了几种解决此问题的方法。首先,你可以在你的队列连接的配置数组中设置 after_commit 连接选项:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],
当 after_commit 选项为 true 时,你可以在数据库事务中分派作业;但是,Laravel 会等到打开的父数据库事务已提交后,才会实际分派作业。当然,如果没有数据库事务当前打开,作业将立即分派。
如果由于事务期间发生的异常而回滚事务,则在该事务期间分派的作业将被丢弃。
将 after_commit 配置选项设置为 true 也会导致任何排队的事件监听器、可邮寄类、通知和广播事件在所有打开的数据库事务提交后分派。
内联指定提交分派行为
如果你没有将 after_commit 队列连接配置选项设置为 true,你仍然可以指示特定作业在所有打开的数据库事务提交后分派。为此,你可以将 afterCommit 方法链接到你的分派操作上:
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();
同样,如果 after_commit 配置选项设置为 true,你可以指示特定作业应立即分派,而无需等待任何打开的数据库事务提交:
ProcessPodcast::dispatch($podcast)->beforeCommit();
作业链
作业链允许你指定一个排队作业列表,这些作业应在主作业成功执行后按顺序运行。如果序列中的一个作业失败,其余的作业将不会运行。要执行排队的作业链,你可以使用 Bus facade 提供的 chain 方法。Laravel 的命令总线是排队作业分派所基于的更低级组件:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
除了链接作业类实例之外,你还可以链接闭包:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
在作业内部使用 $this->delete() 方法删除作业不会阻止链式作业被处理。链只有在链中的作业失败时才会停止执行。
链连接和队列
如果你想指定链式作业应使用的连接和队列,你可以使用 onConnection 和 onQueue 方法。这些方法指定应使用的队列连接和队列名称,除非排队的作业被明确分配了不同的连接/队列:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
向链中添加作业
有时,你可能需要从链中的另一个作业内部将一个作业添加到现有作业链的前面或后面。你可以使用 prependToChain 和 appendToChain 方法来完成此操作:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
// Prepend to the current chain, run job immediately after current job...
$this->prependToChain(new TranscribePodcast);
// Append to the current chain, run job at end of chain...
$this->appendToChain(new TranscribePodcast);
}
链式失败
当链接作业时,你可以使用 catch 方法来指定一个闭包,如果链中的作业失败,该闭包应该被调用。给定的回调将接收导致作业失败的 Throwable 实例:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// A job within the chain has failed...
})->dispatch();
由于链式回调在稍后由 Laravel 队列序列化和执行,因此你不应在链式回调中使用 $this 变量。
自定义队列和连接
分派到特定队列
通过将作业推送到不同的队列,你可以“分类”你的排队作业,甚至可以优先考虑你分配给各种队列的工作程序数量。请记住,这不会将作业推送到你的队列配置文件中定义的不同队列“连接”,而只会推送到单个连接中的特定队列。要指定队列,请在分派作业时使用 onQueue 方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}
或者,你可以在作业的构造函数内部调用 onQueue 方法来指定作业的队列:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onQueue('processing');
}
}
分派到特定连接
如果你的应用程序与多个队列连接交互,你可以使用 onConnection 方法指定将作业推送到哪个连接:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}
你可以将 onConnection 和 onQueue 方法链接在一起,为作业指定连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
或者,你可以在作业的构造函数内部调用 onConnection 方法来指定作业的连接:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}
指定最大作业尝试次数 / 超时值
最大尝试次数
作业尝试是 Laravel 队列系统的核心概念,并支持许多高级功能。虽然它们乍一看可能令人困惑,但在修改默认配置之前了解它们的工作原理非常重要。
当作业被分派时,它被推送到队列上。然后,一个工作程序会拾取它并尝试执行它。这是一次作业尝试。
然而,一次尝试不一定意味着作业的 handle 方法被执行。尝试也可以通过以下几种方式“消耗”:
- 作业在执行期间遇到未处理的异常。
- 使用
$this->release()手动将作业释放回队列。 - 诸如
WithoutOverlapping或RateLimited之类的中间件未能获取锁并释放作业。 - 作业超时。
- 作业的
handle方法运行并完成而没有抛出异常。
你可能不希望无限期地尝试一个作业。因此,Laravel 提供了多种方法来指定作业可以被尝试的次数或持续时间。
默认情况下,Laravel 只会尝试一个作业一次。如果你的作业使用诸如 WithoutOverlapping 或 RateLimited 之类的中间件,或者如果你正在手动释放作业,你可能需要通过 tries 选项来增加允许的尝试次数。
指定作业可被尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这将应用于工作程序处理的所有作业,除非正在处理的作业指定了它可被尝试的次数:
php artisan queue:work --tries=3
如果一个作业超过其最大尝试次数,它将被视为“失败”作业。有关处理失败作业的更多信息,请查阅失败作业文档。如果向 queue:work 命令提供了 --tries=0,则作业将无限期地重试。
你可以通过在作业类本身上定义作业可被尝试的最大次数来采用更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的 --tries 值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
如果你需要对特定作业的最大尝试次数进行动态控制,你可以在作业上定义一个 tries 方法:
/**
* Determine number of times the job may be attempted.
*/
public function tries(): int
{
return 5;
}
基于时间的尝试
作为定义作业在失败之前可以被尝试多少次的替代方法,你可以定义一个作业不应再被尝试的时间。这允许在给定的时间范围内尝试作业任意次数。要定义作业不应再被尝试的时间,请在你的作业类中添加一个 retryUntil 方法。此方法应返回一个 DateTime 实例:
use DateTime;
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
如果同时定义了 retryUntil 和 tries,Laravel 会优先考虑 retryUntil 方法。
你还可以在你的排队事件监听器和排队通知上定义 tries 属性或 retryUntil 方法。
最大异常数
有时你可能希望指定一个作业可以被尝试多次,但如果重试是由给定数量的未处理异常触发的,则应失败(而不是直接由 release 方法释放)。为了实现这一点,你可以在你的作业类上定义一个 maxExceptions 属性:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;
/**
* The maximum number of unhandled exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}
在这个例子中,如果应用程序无法获取 Redis 锁,作业将被释放十秒钟,并将继续重试多达 25 次。但是,如果作业抛出三个未处理的异常,则作业将失败。
超时
通常,你大致知道你的排队作业需要多长时间。因此,Laravel 允许你指定一个“超时”值。默认情况下,超时值为 60 秒。如果一个作业处理时间超过超时值指定的秒数,处理该作业的工作程序将以错误退出。通常,工作程序将由你在服务器上配置的进程管理器自动重新启动。
可以使用 Artisan 命令行上的 --timeout 开关来指定作业可以运行的最大秒数:
php artisan queue:work --timeout=30
如果作业因持续超时而超过其最大尝试次数,它将被标记为失败。
你也可以在作业类本身上定义作业应允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
有时,IO 阻塞进程(如套接字或传出 HTTP 连接)可能不遵守你指定的超时。因此,在使用这些功能时,你应该始终尝试使用它们的 API 来指定超时。例如,在使用 Guzzle 时,你应该始终指定连接和请求超时值。
必须安装 PCNTL PHP 扩展才能指定作业超时。此外,作业的“超时”值应始终小于其**“重试后”**值。否则,作业可能会在实际执行完成或超时之前被重新尝试。
超时失败
如果你想指示作业在超时时应被标记为失败,你可以在作业类上定义 $failOnTimeout 属性:
/**
* Indicate if the job should be marked as failed on timeout.
*
* @var bool
*/
public $failOnTimeout = true;
默认情况下,当作业超时时,它会消耗一次尝试并被释放回队列(如果允许重试)。但是,如果你将作业配置为在超时时失败,则无论为 tries 设置的值如何,它都不会被重试。
错误处理
如果作业在处理时抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续被释放,直到它被尝试了你的应用程序允许的最大次数。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,最大尝试次数可以在作业类本身上定义。有关运行队列工作程序的更多信息,可以在下面找到。
手动释放作业
有时你可能希望手动将作业释放回队列,以便稍后再次尝试。你可以通过调用 release 方法来完成此操作:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->release();
}
默认情况下,release 方法会将作业释放回队列以立即处理。但是,你可以通过向 release 方法传递一个整数或日期实例来指示队列在给定秒数过去之前不使作业可用于处理:
$this->release(10);
$this->release(now()->addSeconds(10));
手动使作业失败
有时你可能需要手动将作业标记为“失败”。为此,你可以调用 fail 方法:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->fail();
}
如果你想因为你捕获的异常而将你的作业标记为失败,你可以将异常传递给 fail 方法。或者,为了方便,你可以传递一个字符串错误消息,该消息将为你转换为异常:
$this->fail($exception);
$this->fail('Something went wrong.');
有关失败作业的更多信息,请查看处理作业失败的文档。
在特定异常上使作业失败
FailOnException 作业中间件允许你在抛出特定异常时短路重试。这允许在瞬时异常(如外部 API 错误)上重试,但在持久异常(如用户的权限被撤销)上使作业永久失败:
<?php
namespace App\Jobs;
use App\Models\User;
use Illuminate\Auth\Access\AuthorizationException;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Middleware\FailOnException;
use Illuminate\Support\Facades\Http;
class SyncChatHistory implements ShouldQueue
{
use Queueable;
public $tries = 3;
/**
* Create a new job instance.
*/
public function __construct(
public User $user,
) {}
/**
* Execute the job.
*/
public function handle(): void
{
$user->authorize('sync-chat-history');
$response = Http::throw()->get(
"https://chat.laravel.test/?user={$user->uuid}"
);
// ...
}
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
new FailOnException([AuthorizationException::class])
];
}
}
作业批处理
Laravel 的作业批处理功能允许你轻松执行一批作业,然后在该批作业完成执行后执行某些操作。在开始之前,你应该创建一个数据库迁移来构建一个表,该表将包含你的作业批次(例如它们的完成百分比)的元信息。此迁移可以使用 make:queue-batches-table Artisan 命令生成:
php artisan make:queue-batches-table
php artisan migrate
定义可批处理作业
要定义一个可批处理作业,你应该像往常一样创建一个可排队作业;但是,你应该将 Illuminate\Bus\Batchable 特性添加到作业类中。此特性提供了对 batch 方法的访问,该方法可用于检索作业正在执行的当前批次:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ImportCsv implements ShouldQueue
{
use Batchable, Queueable;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...
return;
}
// Import a portion of the CSV file...
}
}
分派批次
要分派一批作业,你应该使用 Bus facade 的 batch 方法。当然,当与完成回调结合使用时,批处理主要是有用的。因此,你可以使用 then、catch 和 finally 方法来为批次定义完成回调。当这些回调被调用时,每个回调都将接收一个 Illuminate\Bus\Batch 实例。在这个例子中,我们将想象我们正在排队一批作业,每个作业都处理 CSV 文件中的给定行数:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
// A single job has completed successfully...
})->then(function (Batch $batch) {
// All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected...
})->finally(function (Batch $batch) {
// The batch has finished executing...
})->dispatch();
return $batch->id;
批次的 ID(可通过 $batch->id 属性访问)可用于查询 Laravel 命令总线以获取有关批次分派后的信息。
由于批处理回调由 Laravel 队列序列化并在稍后执行,因此你不应在回调中使用 $this 变量。此外,由于批处理作业被包装在数据库事务中,因此不应在作业中执行触发隐式提交的数据库语句。
命名批次
诸如 Laravel Horizon 和 Laravel Telescope 之类的某些工具如果对批次进行命名,可能会提供更友好的批次调试信息。要为批次分配一个任意名称,你可以在定义批次时调用 name 方法:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import CSV')->dispatch();
批次连接和队列
如果你想指定批处理作业应使用的连接和队列,你可以使用 onConnection 和 onQueue 方法。所有批处理作业必须在相同的连接和队列中执行:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();
链和批次
你可以在批次中通过将链式作业放在一个数组中来定义一组链式作业。例如,我们可以并行执行两个作业链,并在两个作业链都完成处理后执行一个回调:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->dispatch();
相反,你可以在链中通过在链中定义批次来运行作业批次。例如,你可以首先运行一批作业来发布多个播客,然后运行一批作业来发送发布通知:
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();
向批次中添加作业
有时,从批处理作业内部向批次添加更多作业会很有用。当你需要批处理数千个作业时,这种模式会很有用,因为在 web 请求期间分派它们可能需要太长时间。因此,你可以选择分派一批初始的“加载程序”作业,这些作业会用更多的作业来填充批次:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();
在这个例子中,我们将使用 LoadImportBatch 作业来用额外的作业填充批次。为了实现这一点,我们可以使用批处理实例上的 add 方法,该方法可以通过作业的 batch 方法访问:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
你只能从属于同一批次的作业内部向批次添加作业。
检查批次
提供给批次完成回调的 Illuminate\Bus\Batch 实例具有各种属性和方法,可帮助你与给定批次的作业进行交互和检查:
// The UUID of the batch...
$batch->id;
// The name of the batch (if applicable)...
$batch->name;
// The number of jobs assigned to the batch...
$batch->totalJobs;
// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;
// The number of jobs that have failed...
$batch->failedJobs;
// The number of jobs that have been processed thus far...
$batch->processedJobs();
// The completion percentage of the batch (0-100)...
$batch->progress();
// Indicates if the batch has finished executing...
$batch->finished();
// Cancel the execution of the batch...
$batch->cancel();
// Indicates if the batch has been cancelled...
$batch->cancelled();
从路由返回批次
所有 Illuminate\Bus\Batch 实例都是 JSON 可序列化的,这意味着你可以直接从应用程序的一个路由中返回它们,以检索包含批次信息(包括其完成进度)的 JSON 有效载荷。这使得在应用程序的 UI 中方便地显示有关批次完成进度的信息。
要按其 ID 检索批次,你可以使用 Bus facade 的 findBatch 方法:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
取消批次
有时你可能需要取消给定批次的执行。这可以通过调用 Illuminate\Bus\Batch 实例上的 cancel 方法来完成:
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
$this->batch()->cancel();
return;
}
if ($this->batch()->cancelled()) {
return;
}
}
正如你在前面的示例中可能注意到的,批处理作业通常应该在继续执行之前确定它们对应的批次是否已被取消。但是,为了方便,你可以将 SkipIfBatchCancelled 中间件分配给作业。正如其名称所示,如果作业对应的批次已被取消,此中间件将指示 Laravel 不处理该作业:
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}
批次失败
当批处理作业失败时,catch 回调(如果已分配)将被调用。此回调仅对批次中第一个失败的作业调用。
允许失败
当批次中的作业失败时,Laravel 将自动将批次标记为“已取消”。如果你愿意,你可以禁用此行为,以便作业失败不会自动将批次标记为已取消。这可以通过在分派批次时调用 allowFailures 方法来完成:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->allowFailures()->dispatch();
重试失败的批次作业
为了方便,Laravel 提供了一个 queue:retry-batch Artisan 命令,它允许你轻松重试给定批次的所有失败作业。此命令接受应重试其失败作业的批次的 UUID:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
修剪批次
如果不进行修剪,job_batches 表会非常快地积累记录。为了缓解这种情况,你应该计划让 queue:prune-batches Artisan 命令每天运行:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches')->daily();
默认情况下,所有超过 24 小时的已完成批次都将被修剪。你可以在调用命令时使用 hours 选项来确定保留批次数据的时间。例如,以下命令将删除所有在 48 小时前完成的批次:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48')->daily();
有时,你的 jobs_batches 表可能会积累从未成功完成的批次的批次记录,例如作业失败且该作业从未成功重试的批次。你可以指示 queue:prune-batches 命令使用 unfinished 选项修剪这些未完成的批次记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同样,你的 jobs_batches 表也可能积累已取消批次的批次记录。你可以指示 queue:prune-batches 命令使用 cancelled 选项修剪这些已取消的批次记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
在 DynamoDB 中存储批次
Laravel 还支持将批次元信息存储在 DynamoDB 中而不是关系型数据库中。但是,你需要手动创建一个 DynamoDB 表来存储所有批次记录。
通常,此表应命名为 job_batches,但你应该根据你的应用程序的 queue 配置文件中 queue.batching.table 配置值来命名该表。
DynamoDB 批次表配置
job_batches 表应该有一个名为 application 的字符串主分区键和一个名为 id 的字符串主排序键。键的 application 部分将包含你的应用程序的名称,该名称由你的应用程序的 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此你可以使用同一个表来存储多个 Laravel 应用程序的作业批次。
此外,如果你想利用自动批次修剪,你可以为你的表定义 ttl 属性。
DynamoDB 配置
接下来,安装 AWS SDK,以便你的 Laravel 应用程序可以与 Amazon DynamoDB 通信:
composer require aws/aws-sdk-php
接下来,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,你还应该在 batching 配置数组中定义 key、secret 和 region 配置选项。这些选项将用于向 AWS 进行身份验证。当使用 dynamodb 驱动程序时,queue.batching.database 配置选项是不必要的:
'batching' => [
'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],
在 DynamoDB 中修剪批次
当使用 DynamoDB 存储作业批次信息时,用于修剪存储在关系型数据库中的批次的典型修剪命令将不起作用。相反,你可以利用 DynamoDB 的原生 TTL 功能 来自动删除旧批次的记录。
如果你使用 ttl 属性定义了你的 DynamoDB 表,你可以定义配置参数来指示 Laravel 如何修剪批次记录。queue.batching.ttl_attribute 配置值定义了保存 TTL 的属性的名称,而 queue.batching.ttl 配置值定义了批次记录可以从 DynamoDB 表中删除的秒数,相对于记录上次更新的时间:
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],
排队闭包
除了将作业类分派到队列之外,你还可以分派一个闭包。这对于需要在当前请求周期之外执行的快速、简单任务非常有用。当将闭包分派到队列时,闭包的代码内容会进行加密签名,以便在传输过程中无法修改:
use App\Models\Podcast;
$podcast = Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
要为排队的闭包分配一个名称,该名称可用于队列报告仪表板,并由 queue:work 命令显示,你可以使用 name 方法:
dispatch(function () {
// ...
})->name('Publish Podcast');
使用 catch 方法,你可以提供一个闭包,如果排队的闭包在耗尽所有队列的配置重试尝试后未能成功完成,该闭包应被执行:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// This job has failed...
});
catch 回调由 Laravel 队列序列化并在稍后执行,因此你不应在 catch 回调中使用 $this 变量。运行队列工作程序
queue:work 命令
Laravel 包含一个 Artisan 命令,它将启动一个队列工作程序并处理推送到队列上的新作业。你可以使用 queue:work Artisan 命令运行工作程序。请注意,一旦 queue:work 命令启动,它将继续运行,直到你手动停止或关闭终端:
php artisan queue:work
queue:work 进程永久在后台运行,你应该使用像 Supervisor 这样的进程监视器来确保队列工作程序不会停止运行。如果你希望在命令的输出中包含已处理的作业 ID、连接名称和队列名称,可以在调用 queue:work 命令时包含 -v 标志:
php artisan queue:work -v
请记住,队列工作程序是长期运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到你的代码库中的更改。因此,在你的部署过程中,请务必重新启动你的队列工作程序。此外,请记住,你的应用程序创建或修改的任何静态状态都不会在作业之间自动重置。
或者,你可以运行 queue:listen 命令。使用 queue:listen 命令时,当你想重新加载更新的代码或重置应用程序状态时,你不必手动重新启动工作程序;但是,此命令比 queue:work 命令效率低得多:
php artisan queue:listen
运行多个队列工作程序
要为队列分配多个工作程序并同时处理作业,你只需启动多个 queue:work 进程。这可以在本地通过终端中的多个选项卡或在生产环境中使用你的进程管理器的配置设置来完成。当使用 Supervisor 时,你可以使用 numprocs 配置值。
指定连接和队列
你还可以指定工作程序应使用哪个队列连接。传递给 work 命令的连接名称应与你的 config/queue.php 配置文件中定义的连接之一相对应:
php artisan queue:work redis
默认情况下,queue:work 命令只处理给定连接的默认队列的作业。但是,你可以通过仅处理给定连接的特定队列来进一步自定义你的队列工作程序。例如,如果你的所有电子邮件都在你的 redis 队列连接上的 emails 队列中处理,你可以发出以下命令来启动一个只处理该队列的工作程序:
php artisan queue:work redis --queue=emails
处理指定数量的作业
--once 选项可用于指示工作程序只处理队列中的单个作业:
php artisan queue:work --once
--max-jobs 选项可用于指示工作程序处理给定数量的作业然后退出。此选项与 Supervisor 结合使用时很有用,这样你的工作程序在处理给定数量的作业后会自动重新启动,释放它们可能积累的任何内存:
php artisan queue:work --max-jobs=1000
处理所有排队的作业然后退出
--stop-when-empty 选项可用于指示工作程序处理所有作业然后正常退出。如果你希望在队列为空后关闭容器,此选项在 Docker 容器中处理 Laravel 队列时会很有用:
php artisan queue:work --stop-when-empty
处理给定秒数的作业
--max-time 选项可用于指示工作程序处理作业给定秒数然后退出。此选项与 Supervisor 结合使用时很有用,这样你的工作程序在处理给定时间量的作业后会自动重新启动,释放它们可能积累的任何内存:
# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600
工作程序休眠持续时间
当队列上有作业可用时,工作程序将继续处理作业,作业之间没有延迟。但是,sleep 选项决定了如果没有可用作业,工作程序将“休眠”多少秒。当然,在休眠时,工作程序将不会处理任何新作业:
php artisan queue:work --sleep=3
维护模式和队列
当你的应用程序处于维护模式时,不会处理任何排队的作业。一旦应用程序退出维护模式,作业将继续正常处理。
要强制你的队列工作程序即使在维护模式下也处理作业,你可以使用 --force 选项:
php artisan queue:work --force
资源注意事项
守护进程队列工作程序在处理每个作业之前不会“重启”框架。因此,你应该在每个作业完成后释放任何大量资源。例如,如果你正在使用 GD 库进行图像处理,你应该在处理完图像后使用 imagedestroy 释放内存。
队列优先级
有时你可能希望优先处理你的队列。例如,在你的 config/queue.php 配置文件中,你可以将你的 redis 连接的默认 queue 设置为 low。但是,偶尔你可能希望将作业推送到 high 优先级队列,如下所示:
dispatch((new Job)->onQueue('high'));
要启动一个工作程序,该工作程序会验证所有 high 队列作业都已处理完毕,然后再继续处理 low 队列上的任何作业,请将逗号分隔的队列名称列表传递给 work 命令:
php artisan queue:work --queue=high,low
队列工作程序和部署
由于队列工作程序是长期运行的进程,如果不重新启动,它们将不会注意到你代码的更改。因此,部署使用队列工作程序的应用程序的最简单方法是在你的部署过程中重新启动工作程序。你可以通过发出 queue:restart 命令来正常重新启动所有工作程序:
php artisan queue:restart
此命令将指示所有队列工作程序在完成处理其当前作业后正常退出,以便不会丢失任何现有作业。由于在执行 queue:restart 命令时队列工作程序将退出,因此你应该运行一个像 Supervisor 这样的进程管理器来自动重新启动队列工作程序。
作业过期和超时
作业过期
在你的 config/queue.php 配置文件中,每个队列连接都定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待多少秒。例如,如果 retry_after 的值设置为 90,则如果作业已处理 90 秒而未被释放或删除,它将被释放回队列。通常,你应该将 retry_after 值设置为你的作业合理完成处理所需的最大秒数。
retry_after 值的队列连接是 Amazon SQS。SQS 将根据在 AWS 控制台中管理的 Default Visibility Timeout 来重试作业。工作程序超时
queue:work Artisan 命令公开了一个 --timeout 选项。默认情况下,--timeout 值为 60 秒。如果一个作业处理时间超过超时值指定的秒数,处理该作业的工作程序将以错误退出。通常,工作程序将由你在服务器上配置的进程管理器自动重新启动:
php artisan queue:work --timeout=60
retry_after 配置选项和 --timeout CLI 选项是不同的,但它们协同工作以确保作业不会丢失,并且作业只成功处理一次。
--timeout 值应始终比你的 retry_after 配置值短至少几秒钟。这将确保在重试作业之前,处理冻结作业的工作程序总是被终止。如果你的 --timeout 选项长于你的 retry_after 配置值,你的作业可能会被处理两次。Supervisor 配置
在生产环境中,你需要一种方法来保持你的 queue:work 进程运行。queue:work 进程可能因各种原因停止运行,例如超出工作程序超时或执行 queue:restart 命令。
因此,你需要配置一个进程监视器,该监视器可以检测你的 queue:work 进程何时退出并自动重新启动它们。此外,进程监视器可以让你指定你希望同时运行多少个 queue:work 进程。Supervisor 是一种在 Linux 环境中常用的进程监视器,我们将在以下文档中讨论如何配置它。
安装 Supervisor
Supervisor 是 Linux 操作系统的进程监视器,如果你的 queue:work 进程失败,它将自动重新启动它们。要在 Ubuntu 上安装 Supervisor,你可以使用以下命令:
sudo apt-get install supervisor
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,你可以创建任意数量的配置文件,这些文件会指导 supervisor 如何监视你的进程。例如,让我们创建一个 laravel-worker.conf 文件,该文件启动和监视 queue:work 进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
在此示例中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监视所有这些进程,如果它们失败,则自动重新启动它们。你应该更改配置的 command 指令以反映你所需的队列连接和工作程序选项。
stopwaitsecs 的值大于你的最长运行作业所消耗的秒数。否则,Supervisor 可能会在作业完成处理之前将其杀死。启动 Supervisor
创建配置文件后,你可以使用以下命令更新 Supervisor 配置并启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start "laravel-worker:*"
有关 Supervisor 的更多信息,请查阅 Supervisor 文档。
处理失败的作业
有时你的排队作业会失败。别担心,事情并不总是按计划进行!Laravel 包含一种方便的方法来指定作业应被尝试的最大次数。在异步作业超过此尝试次数后,它将被插入到 failed_jobs 数据库表中。同步分派的作业失败后不会存储在此表中,其异常会立即由应用程序处理。
在新的 Laravel 应用程序中通常已经存在一个用于创建 failed_jobs 表的迁移。但是,如果你的应用程序不包含此表的迁移,你可以使用 make:queue-failed-table 命令来创建迁移:
php artisan make:queue-failed-table
php artisan migrate
当运行 queue worker 进程时,你可以使用 queue:work 命令上的 --tries 开关来指定作业应被尝试的最大次数。如果你没有为 --tries 选项指定值,则作业只会尝试一次或作业类的 $tries 属性指定的次数:
php artisan queue:work redis --tries=3
使用 --backoff 选项,你可以指定 Laravel 在重试遇到异常的作业之前应等待多少秒。默认情况下,作业会立即释放回队列,以便可以再次尝试:
php artisan queue:work redis --tries=3 --backoff=3
如果你想为每个作业配置 Laravel 在重试遇到异常的作业之前应等待多少秒,你可以通过在你的作业类上定义一个 backoff 属性来做到这一点:
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;
如果你需要更复杂的逻辑来确定作业的回退时间,你可以在你的作业类上定义一个 backoff 方法:
/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
return 3;
}
你可以通过从 backoff 方法返回一个回退值数组来轻松配置“指数”回退。在这个例子中,第一次重试的重试延迟将是 1 秒,第二次重试是 5 秒,第三次重试是 10 秒,如果还有更多尝试,则每次后续重试都是 10 秒:
/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}
清理失败的作业
当某个作业失败时,你可能希望向你的用户发送警报或恢复作业部分完成的任何操作。为此,你可以在你的作业类上定义一个 failed 方法。导致作业失败的 Throwable 实例将被传递给 failed 方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
/**
* Handle a job failure.
*/
public function failed(?Throwable $exception): void
{
// Send user notification of failure, etc...
}
}
failed 方法之前,会实例化一个新的作业实例;因此,在 handle 方法中可能发生的任何类属性修改都将丢失。失败的作业不一定是遇到未处理异常的作业。当一个作业耗尽了所有允许的尝试次数时,它也可能被认为是失败的。这些尝试可以通过以下几种方式消耗:
- 作业超时。
- 作业在执行期间遇到未处理的异常。
- 作业被手动或通过中间件释放回队列。
如果最后一次尝试由于在作业执行期间抛出的异常而失败,则该异常将传递给作业的 failed 方法。但是,如果作业因已达到允许的最大尝试次数而失败,则 $exception 将是 Illuminate\Queue\MaxAttemptsExceededException 的实例。同样,如果作业因超过配置的超时而失败,则 $exception 将是 Illuminate\Queue\TimeoutExceededException 的实例。
重试失败的作业
要查看已插入到你的 failed_jobs 数据库表中的所有失败作业,你可以使用 queue:failed Artisan 命令:
php artisan queue:failed
queue:failed 命令将列出作业 ID、连接、队列、失败时间和有关作业的其他信息。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败作业,请发出以下命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
如有必要,你可以将多个 ID 传递给命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
你还可以重试特定队列的所有失败作业:
php artisan queue:retry --queue=name
要重试所有失败的作业,请执行 queue:retry 命令并将 all 作为 ID 传递:
php artisan queue:retry all
如果你想删除失败的作业,你可以使用 queue:forget 命令:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
horizon:forget 命令来删除失败的作业,而不是 queue:forget 命令。要从 failed_jobs 表中删除所有失败的作业,你可以使用 queue:flush 命令:
php artisan queue:flush
queue:flush 命令从你的队列中删除所有失败的作业记录,无论失败的作业有多旧。你可以使用 --hours 选项来仅删除在特定小时前或更早失败的作业:
php artisan queue:flush --hours=48
忽略丢失的模型
当将 Eloquent 模型注入作业时,模型会在放置到队列之前自动序列化,并在作业处理时从数据库中重新检索。但是,如果模型在作业等待工作程序处理时被删除,你的作业可能会因 ModelNotFoundException 而失败。
为了方便,你可以选择通过将作业的 deleteWhenMissingModels 属性设置为 true 来自动删除缺少模型的作业。当此属性设置为 true 时,Laravel 将悄悄地丢弃作业,而不会引发异常:
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
修剪失败的作业
你可以通过调用 queue:prune-failed Artisan 命令来修剪应用程序的 failed_jobs 表中的记录:
php artisan queue:prune-failed
默认情况下,所有超过 24 小时的失败作业记录都将被修剪。如果你为命令提供了 --hours 选项,则只会保留在过去 N 小时内插入的失败作业记录。例如,以下命令将删除所有在 48 小时前或更早插入的失败作业记录:
php artisan queue:prune-failed --hours=48
在 DynamoDB 中存储失败的作业
Laravel 还支持将你的失败作业记录存储在 DynamoDB 中而不是关系型数据库表中。但是,你必须手动创建一个 DynamoDB 表来存储所有失败的作业记录。通常,此表应命名为 failed_jobs,但你应该根据你的应用程序的 queue 配置文件中 queue.failed.table 配置值来命名该表。
failed_jobs 表应该有一个名为 application 的字符串主分区键和一个名为 uuid 的字符串主排序键。键的 application 部分将包含你的应用程序的名称,该名称由你的应用程序的 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此你可以使用同一个表来存储多个 Laravel 应用程序的失败作业。
此外,请确保你安装了 AWS SDK,以便你的 Laravel 应用程序可以与 Amazon DynamoDB 通信:
composer require aws/aws-sdk-php
接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,你还应该在失败的作业配置数组中定义 key、secret 和 region 配置选项。这些选项将用于向 AWS 进行身份验证。当使用 dynamodb 驱动程序时,queue.failed.database 配置选项是不必要的:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],
禁用失败作业存储
你可以通过将 queue.failed.driver 配置选项的值设置为 null 来指示 Laravel 丢弃失败的作业而不存储它们。通常,这可以通过 QUEUE_FAILED_DRIVER 环境变量来完成:
QUEUE_FAILED_DRIVER=null
失败作业事件
如果你想注册一个事件监听器,该监听器将在作业失败时被调用,你可以使用 Queue facade 的 failing 方法。例如,我们可以从 Laravel 随附的 AppServiceProvider 的 boot 方法中将一个闭包附加到此事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
从队列中清除作业
horizon:clear 命令来清除队列中的作业,而不是 queue:clear 命令。如果你想从默认连接的默认队列中删除所有作业,你可以使用 queue:clear Artisan 命令:
php artisan queue:clear
你还可以提供 connection 参数和 queue 选项,以从特定的连接和队列中删除作业:
php artisan queue:clear redis --queue=emails
从队列中清除作业仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在你清除队列后 60 秒内发送到 SQS 队列的作业也可能会被删除。
监视你的队列
如果你的队列突然涌入大量作业,它可能会变得不堪重负,从而导致作业完成的等待时间变长。如果你愿意,当你的队列作业计数超过指定阈值时,Laravel 可以提醒你。
要开始,你应该计划让 queue:monitor 命令每分钟运行一次。该命令接受你希望监视的队列的名称以及你所需的作业计数阈值:
php artisan queue:monitor redis:default,redis:deployments --max=100
仅安排此命令不足以触发通知来提醒你队列已不堪重负。当命令遇到作业计数超过你的阈值的队列时,将分派一个 Illuminate\Queue\Events\QueueBusy 事件。你可以在应用程序的 AppServiceProvider 中监听此事件,以便向你或你的开发团队发送通知:
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', 'dev@example.com')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}
测试
在测试分派作业的代码时,你可能希望指示 Laravel 不实际执行作业本身,因为作业的代码可以与分派它的代码分开直接测试。当然,要测试作业本身,你可以在测试中实例化一个作业实例并直接调用 handle 方法。
你可以使用 Queue facade 的 fake 方法来阻止排队的作业实际被推送到队列。调用 Queue facade 的 fake 方法后,你就可以断言应用程序尝试将作业推送到队列:
<?php
use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
test('orders can be shipped', function () {
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert that a closure was not pushed...
Queue::assertClosureNotPushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});
<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert that a closure was not pushed...
Queue::assertClosureNotPushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}
你可以将一个闭包传递给 assertPushed、assertNotPushed、assertClosurePushed 或 assertClosureNotPushed 方法,以断言一个通过给定“真相测试”的作业被推送了。如果至少一个通过给定真相测试的作业被推送,则断言将成功:
use Illuminate\Queue\CallQueuedClosure;
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
Queue::assertClosurePushed(function (CallQueuedClosure $job) {
return $job->name === 'validate-order';
});
伪造一部分作业
如果你只需要伪造特定的作业,同时允许你的其他作业正常执行,你可以将应伪造的作业的类名传递给 fake 方法:
test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}
你可以使用 except 方法伪造所有作业,除了指定的一组作业:
Queue::fake()->except([
ShipOrder::class,
]);
测试作业链
要测试作业链,你需要利用 Bus facade 的伪造功能。Bus facade 的 assertChained 方法可用于断言一个作业链已被分派。assertChained 方法接受一个包含链式作业的数组作为其第一个参数:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);
正如你在上面的示例中看到的,链式作业的数组可以是作业类名称的数组。但是,你也可以提供实际作业实例的数组。这样做时,Laravel 将确保作业实例与应用程序分派的链式作业属于同一类并具有相同的属性值:
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);
你可以使用 assertDispatchedWithoutChain 方法来断言一个作业在没有作业链的情况下被推送:
Bus::assertDispatchedWithoutChain(ShipOrder::class);
测试链式修改
如果一个链式作业将作业添加到现有链的前面或后面,你可以使用作业的 assertHasChain 方法来断言该作业具有预期的剩余作业链:
$job = new ProcessPodcast;
$job->handle();
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);
assertDoesntHaveChain 方法可用于断言作业的剩余链为空:
$job->assertDoesntHaveChain();
测试链式批次
如果你的作业链包含一批作业,你可以通过在你的链断言中插入一个 Bus::chainedBatch 定义来断言链式批次符合你的期望:
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);
测试作业批次
Bus facade 的 assertBatched 方法可用于断言一批作业已被分派。提供给 assertBatched 方法的闭包接收一个 Illuminate\Bus\PendingBatch 实例,该实例可用于检查批次中的作业:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'Import CSV' &&
$batch->jobs->count() === 10;
});
你可以使用 assertBatchCount 方法来断言分派了给定数量的批次:
Bus::assertBatchCount(3);
你可以使用 assertNothingBatched 来断言没有分派任何批次:
Bus::assertNothingBatched();
测试作业/批次交互
此外,你可能偶尔需要测试单个作业与其底层批次的交互。例如,你可能需要测试作业是否取消了其批次的进一步处理。为此,你需要通过 withFakeBatch 方法为作业分配一个伪造的批次。withFakeBatch 方法返回一个包含作业实例和伪造批次的元组:
[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);
测试作业/队列交互
有时,你可能需要测试一个排队的作业是否将自身释放回队列。或者,你可能需要测试作业是否删除了自身。你可以通过实例化作业并调用 withFakeQueueInteractions 方法来测试这些队列交互。
一旦作业的队列交互被伪造,你就可以调用作业上的 handle 方法。在调用作业之后,可以使用各种断言方法来验证作业的队列交互:
use App\Exceptions\CorruptedAudioException;
use App\Jobs\ProcessPodcast;
$job = (new ProcessPodcast)->withFakeQueueInteractions();
$job->handle();
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertFailedWith(CorruptedAudioException::class);
$job->assertNotFailed();
作业事件
使用 Queue facade 上的 before 和 after 方法,你可以指定在处理排队作业之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计数据的好机会。通常,你应该从服务提供者的 boot 方法中调用这些方法。例如,我们可以使用 Laravel 随附的 AppServiceProvider:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
使用 Queue facade 上的 looping 方法,你可以指定在工作程序尝试从队列中获取作业之前执行的回调。例如,你可以注册一个闭包来回滚之前因失败作业而留下的任何未关闭的事务:
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});