Очереди
- Введение
- Соединения и очереди
- Предварительная подготовка драйверов
- Создание заданий
- Генерация класса задания
- Структура класса задания
- Уникальные задания
- Шифрование заданий
- Посредник (middleware) задания
- Ограничение частоты
- Предотвращение дублирования задания
- Ограничение частоты генерации исключений
- Пропуск заданий
- Отправка заданий
- Отложенная отправка
- Синхронная отправка
- Задания и транзакции базы данных
- Цепочка заданий
- Настройка соединения и очереди
- Указание максимального количества попыток задания / значений тайм-аута
- Обработка ошибок
- Пакетная обработка заданий
- Определение пакета заданий
- Отправка пакета заданий
- Добавление заданий в пакет заданий
- Инспектирование пакета
- Отмена пакетов
- Отказы в пакете заданий
- Очистка пакетов
- Хранение пакетов в DynamoDB
- Анонимные очереди
- Запуск обработчика очереди
- Команда queue:work
- Приоритеты очереди
- Обработчики очереди и развертывание
- Истечение срока и тайм-ауты задания
- Конфигурация Supervisor
- Разбор неудачных заданий
- Очистка после неудачных заданий
- Повторная попытка выполнения неудачных заданий
- Игнорирование отсутствующих моделей
- Удаление неудачных заданий
- Хранение неудачных заданий в DynamoDB
- Отключение хранилища неудачных заданий
- События неудачных заданий
- Удаление заданий из очередей
- Мониторинг очередей
- Тестирование
- Подделка определённого списка заданий
- Тестирование цепочку заданий
- Тестирование пакетов заданий
- Тестирование взаимодействия заданий и очередей
- События заданий
Введение
При создании веб-приложения у вас могут быть некоторые задачи, такие как синтаксический анализ и сохранение загруженного файла CSV, выполнение которых во время обычного веб-запроса занимает слишком много времени. К счастью, Laravel позволяет легко создавать задания (jobs) в очереди (queue), которые могут обрабатываться в фоновом режиме. Перемещая трудоемкие задания в очередь и выполняя их в фоне, ваше приложение может быстрее обрабатывать веб-запросы и быстрее отвечать клиенту.
Очереди Laravel предоставляют унифицированный API для различных серверных служб очередей, таких как Amazon SQS, Redis или даже обычная реляционная база данных.
Параметры конфигурации очереди Laravel хранятся в файле конфигурации вашего приложения config/queue.php
. В этом файле вы найдете конфигурации подключения для каждого из драйверов очереди фреймворка: база данных, Amazon SQS, Redis и Beanstalkd, а также синхронный драйвер для немедленного выполнения задания (используется во время локальной разработки). Также имеется драйвер очереди null
, который просто выбрасывает задания из очереди, не исполняя их.
Laravel теперь предлагает Horizon, красивую панель мониторинга и систему настройки для очередей на базе Redis. Дополнительную информацию можно найти в полной документации Horizon.
Соединения и очереди
Прежде чем приступить к работе с очередями Laravel, важно понять различие между «соединениями» и «очередями». В конфигурационном файле config/queue.php
есть массив connections
. Этот параметр определяет подключения к серверным службам очередей, таким как Amazon SQS, Beanstalk или Redis. Однако любое указанное «соединение» очереди может иметь несколько «очередей», которые можно рассматривать как разные стеки или пачки поочередных заданий.
Обратите внимание, что каждый пример конфигурации соединения в файле конфигурации queue
содержит ключ queue
. Это очередь по умолчанию, в которую будут отправляться задания при их отправке в определенное соединение. Другими словами, если вы отправляете задание без явного определения очереди, в которую оно должно быть отправлено, задание будет поставлено в очередь, определённую в ключе queue
конфигурации соединения:
use App\Jobs\ProcessPodcast;
// Это задание отправляется в очередь `default` соединения по умолчанию...
ProcessPodcast::dispatch();
// Это задание отправляется в очередь `emails` соединения по умолчанию...
ProcessPodcast::dispatch()->onQueue('emails');
Некоторым приложениям может не понадобиться помещать задания в несколько очередей, вместо этого предпочитая иметь одну простую очередь. Однако отправка заданий в несколько очередей может быть особенно полезна для приложений, определяющих приоритеты или сегментацию процесса обработки заданий, поскольку обработчик очереди Laravel позволяет вам указать, какие очереди он должен обрабатывать по приоритету. Например, если вы помещаете задания в очередь high
, то вы можете запустить обработчик, который даст им более высокий приоритет обработки:
php artisan queue:work --queue=high,default
Предварительная подготовка драйверов
База данных
Чтобы использовать драйвер очереди database
, вам понадобится таблица базы данных для хранения заданий. Обычно это включено в стандартный файл Laravel 0001_01_01_000002_create_jobs_table.php
databasemigration; однако, если ваше приложение не содержит этой миграции, вы можете использовать Artisan-команду make:queue-table
для ее создания:
php artisan make:queue-table
php artisan migrate
Redis
Чтобы использовать драйвер очереди redis
, вы должны настроить соединение с базой данных Redis в файле конфигурации config/database.php
.
Возможности Redis
serializer
иcompression
не поддерживаются драйвером очередиredis
.
Кластер Redis
Если ваше соединение с очередью Redis использует кластер Redis, то имена ваших очередей должны содержать ключевой хеш-тег. Это необходимо для того, чтобы все ключи Redis для указанной очереди были поставлены в один и тот же хеш-слот:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],
Блокировка
При использовании очереди Redis вы можете использовать параметр конфигурации block_for
, чтобы указать, как долго драйвер должен ждать, пока задание станет доступным, прежде чем выполнить итерацию через рабочий цикл и повторно опросить базу данных Redis.
Настройка этого значения зависит от загрузки очереди и может быть более эффективной, чем постоянный опрос базы данных Redis на предмет новых заданий. Например, вы можете установить значение 5
, чтобы указать, что драйвер должен блокироваться на пять секунд, ожидая, пока задание станет доступным:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],
Установка для
block_for
значения0
заставит обработчиков очереди блокироваться на неопределенный срок, пока задание не станет доступным. Это также предотвратит обработку таких сигналов, какSIGTERM
, до тех пор, пока не будет обработано следующее задание.
Дополнительные зависимости драйверов
Для перечисленных драйверов очереди необходимы следующие зависимости. Эти зависимости могут быть установлены через менеджер пакетов Composer:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~5.0
- Redis:
predis/predis ~2.0
или PHP-расширение phpredis - MongoDB:
mongodb/laravel-mongodb
Создание заданий
Генерация класса задания
Чтобы сгенерировать новое задание, используйте команду make:job
Artisan. Эта команда поместит новый класс задания в каталог app/Jobs
вашего приложения. Если этот каталог не существует в вашем приложении, то Laravel предварительно создаст его:
php artisan make:job ProcessPodcast
Сгенерированный класс будет реализовывать интерфейс Illuminate\Contracts\Queue\ShouldQueue
, указывая Laravel, что задание должно быть поставлено в очередь для асинхронного выполнения.
Заготовки (stub) заданий можно настроить с помощью публикации заготовок.
Структура класса задания
Классы заданий очень простые, обычно они содержат только метод handle
, который вызывается, когда задание обрабатывается очередью. Для начала рассмотрим пример класса задания. В этом примере мы представим, что управляем службой публикации подкастов и нам необходимо обработать загруженные файлы подкастов перед их публикацией:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Создать новый экземпляр задания.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Выполнить задание.
*/
public function handle(AudioProcessor $processor): void
{
// Обработка загруженного подкаста ...
}
}
Обратите внимание, что в этом примере мы смогли передать модель Eloquent непосредственно в конструктор задания. Благодаря трейту Queueable
, который использует задание, модели Eloquent и их загруженные отношения будут корректно сериализованы и десериализованы при обработке задания.
Если ваше задание в очереди принимает модель Eloquent в своем конструкторе, в очередь будет сериализован только идентификатор модели. Когда задание действительно обрабатывается, система очередей автоматически повторно извлекает полный экземпляр модели и его загруженные отношения из базы данных. Такой подход к сериализации модели позволяет отправлять в драйвер очереди гораздо меньший объем данных.
Внедрение зависимости метода handle
Метод handle
вызывается, когда задание обрабатывается очередью. Обратите внимание, что мы можем объявить тип зависимости в методе handle
задания. Контейнер служб Laravel автоматически внедряет эти зависимости.
Если вы хотите получить полный контроль над тем, как контейнер внедряет зависимости в метод handle
, вы можете использовать метод bindMethod
контейнера. Метод bindMethod
принимает функцию, которая получает задание и контейнер. В функции вы можете вызывать метод handle
. Обычно вы должны вызывать bindMethod
из метода boot
вашего сервис-провайдера App\Providers\AppServiceProvider
:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
Бинарные данные, например, необработанное содержимое изображения, должны быть переданы через функцию
base64_encode
перед передачей заданию. В противном случае задание может неправильно сериализоваться в JSON при отправке в очередь.
Очередь отношений
Поскольку все загруженные отношения модели Eloquent также сериализуются при постановке задания в очередь, сериализованная строка задания иногда может стать довольно объемной. Более того, когда задача десериализуется, и отношения модели повторно извлекаются из базы данных, они будут извлечены в полном объеме. Любые предыдущие ограничения отношений, которые были применены до того, как модель была сериализована в процессе постановки задания в очередь, не будут применены, когда задача будет десериализована. Поэтому, если вам необходимо работать с подмножеством определенного отношения, вам следует повторно наложить ограничение на это отношение внутри вашей задачи в очереди.
Или, чтобы предотвратить сериализацию отношений, вы можете вызвать метод модели withoutRelations
при установке значения свойства в задание. Этот метод вернет экземпляр модели без загруженных связей:
/**
* Создать новый экземпляр задания.
*/
public function __construct(
Podcast $podcast,
) {
$this->podcast = $podcast->withoutRelations();
}
Если вы используете свойства конструктора PHP и хотите указать, что модель Eloquent не должна сериализовать свои отношения, вы можете использовать атрибут withoutRelations
:
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast,
) {}
Если задание получает коллекцию или массив моделей Eloquent вместо одной модели, отношения между моделями в этой коллекции не будут восстановлены при десериализации и выполнении задания. Это необходимо для предотвращения чрезмерного использования ресурсов в заданиях, связанных с большим количеством моделей.
Уникальные задания
Для уникальных заданий требуется драйвер кеша, поддерживающий блокировки. В настоящее время драйверы кеширования
memcached
,redis
,dynamodb
,database
,file
, andarray
поддерживают атомарные блокировки. Кроме того, уникальность заданий не учитывается при пакетной обработке.
Иногда требуется убедиться, что только один экземпляр определенного задания находится в очереди в любой момент времени. Вы можете сделать это, реализовав интерфейс ShouldBeUnique
в своем классе задания. Этот интерфейс не требует от вас определения каких-либо дополнительных методов в вашем классе:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}
В приведенном выше примере задание UpdateSearchIndex
уникально. Таким образом, задание не будет отправлено, если другой экземпляр задания уже находится в очереди и еще не завершил обработку.
В некоторых случаях вам может потребоваться определить конкретный «ключ», делающий задание уникальным, или вы можете указать тайм-аут, по истечении которого задание больше не считается уникальным. Для этого вы можете определить свойства или методы uniqueId
и uniqueFor
в своем классе задания:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* Экземпляр продукта.
*
* @var \App\Product
*/
public $product;
/**
* Количество секунд, по истечении которых уникальная блокировка задания будет снята.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Получить уникальный идентификатор задания.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}
В приведенном выше примере задание UpdateSearchIndex
уникально по идентификатору продукта. Таким образом, любые новые отправленные задания с тем же идентификатором продукта будут игнорироваться, пока существующее задание не завершит обработку. Кроме того, если существующее задание не будет обработано в течение одного часа, уникальная блокировка будет снята, и в очередь может быть отправлено другое задание с таким же уникальным ключом.
Если ваше приложение отправляет задания с нескольких веб-серверов или контейнеров, вам следует убедиться, что все ваши серверы взаимодействуют с одним и тем же сервером центрального кэша, чтобы Laravel мог точно определить, является ли задание уникальным.
Сохранение уникальности задания только до начала обработки
По умолчанию уникальные задания «разблокируются» после того, как задание завершит обработку или потерпит неудачу во всех повторных попытках. Однако, могут возникнуть ситуации, когда вы захотите, чтобы ваше задание было разблокировано непосредственно перед его обработкой. Для этого ваше задание должно реализовать контракт ShouldBeUniqueUntilProcessing
вместо контракта ShouldBeUnique
:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}
Блокировки уникальных заданий
За кулисами, когда отправляется задание ShouldBeUnique
, Laravel пытается получить блокировку с ключом uniqueId
. Если блокировка не получена, задание не отправляется. Эта блокировка снимается, когда задание завершает обработку или терпит неудачу во всех повторных попытках. По умолчанию Laravel будет использовать драйвер кеша, назначенный по умолчанию, для получения этой блокировки. Однако, если вы хотите использовать другой драйвер для получения блокировки, вы можете определить метод uniqueVia
, возвращающий драйвер кеша, который следует использовать:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
/**
* Получить драйвер кеша для блокировки уникального задания.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
Если вам нужно ограничить только параллельную обработку задания, используйте вместо этого посредник
WithoutOverlapping
.
Шифрование заданий
Laravel позволяет вам обеспечить конфиденциальность и целостность данных задания с помощью шифрования. Для начала просто добавьте интерфейс ShouldBeEncrypted
в класс задания. Как только этот интерфейс будет добавлен в класс, Laravel автоматически зашифрует ваше задание, прежде чем поместить его в очередь:
<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}
Посредник (middleware) задания
Посредник задания позволяет обернуть пользовательскую логику вокруг выполнения заданий в очереди, уменьшая шаблонность самих заданий. Например, рассмотрим следующий метод handle
, который использует функции ограничения частоты, позволяющие обрабатывать только одно задание каждые пять секунд:
use Illuminate\Support\Facades\Redis;
/**
* Выполнить задание.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// Обработка задания...
}, function () {
// Не удалось получить блокировку...
return $this->release(5);
});
}
Хотя этот код действителен, реализация метода handle
становится «шумной», так как она загромождена логикой ограничения частоты Redis. Кроме того, эта логика ограничения частоты должна быть продублирована для любых других заданий, для которых мы хотим установить ограничение частоты.
Вместо ограничения частоты в методе handle
мы могли бы определить посредника задания, который обрабатывает ограничение частоты. В Laravel нет места по умолчанию для посредников заданий, поэтому вы можете разместить их в любом месте вашего приложения. В этом примере мы поместим его в каталог app/Jobs/Middleware
:
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Обработать задание в очереди.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Блокировка получена...
$next($job);
}, function () use ($job) {
// Не удалось получить блокировку...
$job->release(5);
});
}
}
Как вы можете видеть, как и посредник маршрута, посредник задания получает обрабатываемое задание и функцию, которая должна быть вызвана для продолжения обработки задания.
После создания посредника задания он может быть назначен заданию, вернув их из метода middleware
задания. Этот метод не существует для заданий, созданных с помощью команды make:job
Artisan, поэтому вам нужно будет вручную добавить его в свой класс задания:
use App\Jobs\Middleware\RateLimited;
/**
* Получить посредника, через которого должно пройти задание.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
Посредник заданий также может быть назначен слушателям событий, почтовым отправлениям или уведомлениям – если они выполняются через очередь.
Ограничение частоты
Хотя мы только что продемонстрировали, как написать собственного посредника, ограничивающего частоту, Laravel на самом деле включает посредника, который вы можете использовать для задания ограничения частоты. Как и ограничители частоты маршрута, ограничители частоты задания определяются с помощью метода for
фасада RateLimiter
.
Например, вы можете разрешить пользователям выполнять резервное копирование своих данных один раз в час, при этом не накладывая таких ограничений на премиум-клиентов. Для этого вы можете определить RateLimiter
в методе boot
вашего AppServiceProvider
:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* Загрузка любых служб приложения.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}
В приведенном выше примере мы определили часовой лимит частоты; однако вы можете легко определить ограничение на основе минут, используя метод perMinute
. Кроме того, вы можете передать любое значение методу by
ограничения; однако это значение чаще всего используется для сегментации ограничений частоты по клиентам:
return Limit::perMinute(50)->by($job->user->id);
После того как вы определили ограничение частоты, вы можете назначить ограничитель частоты своему заданию резервного копирования с помощью посредника Illuminate\Queue\Middleware\RateLimited
. Каждый раз, когда задание превышает ограничение частоты, этот посредник отправляет задание обратно в очередь с соответствующей задержкой в зависимости от продолжительности ограничения частоты.
use Illuminate\Queue\Middleware\RateLimited;
/**
* Получить посредника, через которого должно пройти задание.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}
Возвращение задания с ограниченной частотой обратно в очередь все равно увеличит общее количество «попыток» (attempts
) задания. Возможно, вы захотите соответствующим образом настроить свойства tries
и maxExceptions
в своем классе задания. Или вы можете использовать метод retryUntil
, чтобы определить время, по истечению которого попыток выполнения задания больше не будет.
Если вы не хотите, чтобы задание возвращалось в очередь, если оно ограничено по частоте, вы можете использовать метод dontRelease
:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
Если вы используете Redis, то вы можете использовать посредника
Illuminate\Queue\Middleware\RateLimitedWithRedis
, который лучше настроен для Redis и более эффективен, чем базовый посредник с ограничением частоты.
Предотвращение дублирования задания
Laravel включает посредника Illuminate\Queue\Middleware\WithoutOverlapping
, который позволяет предотвращать перекрытия заданий на основе произвольного ключа. Это может быть полезно, когда задание в очереди изменяет ресурс, который должен изменяться только одним заданием за раз.
Например, представим, что у вас есть задание в очереди, которое обновляет кредитный рейтинг пользователя, и вы хотите предотвратить дублирование задания обновления кредитного рейтинга для одного и того же идентификатора пользователя. Для этого вы можете вернуть посредника WithoutOverlapping
из метода middleware
вашего задания:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* Получить посредника, через которого должно пройти задание.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}
Любые перекрывающиеся задания одного и того же типа будут возвращены в очередь. Можно также указать время в секундах, которое должно пройти до повторной попытки возвращенного задания:
/**
* Получить посредника, через которого должно пройти задание.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}
Если вы хотите немедленно удалить все перекрывающиеся задания, чтобы они не повторялись, вы можете использовать метод dоntRelease
:
/**
* Получить посредника, через которого должно пройти задание.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}
Посредник WithoutOverlapping
работает благодаря функции атомарной блокировки Laravel. Но иногда ваше задание может неожиданно завершиться неудачей или таймаутом таким образом, что блокировка не будет освобождена. Поэтому вы можете явно определить время истечения блокировки с помощью метода expireAfter
. Например, в приведенном ниже примере Laravel даст указание освободить блокировку WithoutOverlapping
через три минуты после начала обработки задания:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
Для посредника
WithoutOverlapping
требуется драйвер кеша, который поддерживает блокировки. В настоящее время драйверы кешаmemcached
,redis
,dynamodb
,database
,file
, иarray
поддерживают атомарные блокировки.
Общий доступ к ключам блокировки для разных классов заданий
По умолчанию middleware WithoutOverlapping
предотвращает перекрытие только заданий одного и того же класса. Таким образом, хотя два разных класса могут использовать один и тот же ключ блокировки, их перекрытие не будет предотвращено. Однако вы можете поручить Laravel применять ключ ко всем классам заданий, используя метод shared
:
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
Ограничение частоты генерации исключений
Laravel содержит посредника Illuminate\Queue\Middleware\ThrottlesExceptions
, который позволяет вам регулировать вызываемые исключения. Как только задание вызывает переданное количество исключений, все дальнейшие попытки выполнить задание откладываются до истечения заданного интервала времени. Этот посредник особенно полезен для заданий, которые взаимодействуют с нестабильно работающими сторонними службами.
Например, представим себе задание в очереди, взаимодействующее со сторонним API, который начинает выбрасывать исключения. Чтобы ограничить исключения, вы можете вернуть посредника ThrottlesExceptions
из метода middleware
вашего задания. Как правило, этот посредник должен быть связан с заданием, которое реализует попытки, основанные на времени:
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Получить посредника, через которого должно пройти задание.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5 * 60)];
}
/**
* Задать временной предел попыток выполнить задания.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(30);
}
Первый аргумент конструктора посредника — это количество исключений, которые задание может выбросить перед ограничением. Второй аргумент конструктора — это количество секунд, которое должно пройти, прежде чем будет предпринято повторное выполнение задания после его ограничения. В приведенном выше примере кода, если задание выбросит 10 последовательных исключений, мы подождем 5 минут перед его повторной попыткой выполнения, ограниченную 30-минутным лимитом времени.
Когда задание вызывает исключение, но порог исключения еще не достигнут, то задание обычно немедленно повторяется. Однако вы можете указать количество минут, на которые такое задание должно быть отложено, вызвав метод backoff
при определении метода middleware
:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Получить посредника, через которого должно пройти задание.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}
Внутренне этот посредник использует систему кеширования Laravel для реализации ограничений частоты, а имя класса задания используется в качестве «ключа» кеша. Вы можете переопределить этот ключ, вызвав метод by
при определении метода middleware
вашего задания. Это может быть полезно, если у вас есть несколько заданий, взаимодействующих с одной и той же сторонней службой, и вы хотите, чтобы у них была общая «корзина» ограничений:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Получить посредника, через которого должно пройти задание.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}
По умолчанию это промежуточное программное обеспечение будет регулировать каждое исключение. Вы можете изменить это поведение, вызвав метод when
при подключении посредника к вашему заданию. Исключение будет регулироваться только в том случае, если закрытие, предоставленное методу when
, вернет true
:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
Если вы хотите, чтобы регулируемые исключения сообщались обработчику исключений вашего приложения, вы можете сделать это, вызвав метод report при подключении посредника к вашему заданию. При желании вы можете предоставить замыкание для метода report
, и об исключении будет сообщено только в том случае, если данное замыкание возвращает true
:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
Если вы используете Redis в качестве драйвера кеша вашего приложения, то вы можете использовать класс
Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
. Этот класс более эффективен при управлении ограничениями исключений с помощью Redis.
Пропуск заданий
Посредник Skip
позволяет вам указать, что задание должно быть пропущено/удалено без необходимости изменения логики задания. Метод Skip::when
удаляет задание, если данное условие оценивается как true
, а метод Skip::unless
удаляет задание, если условие оценивается как false
:
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when($someCondition),
];
}
Вы также можете передать Closure
методам when
и unless
для более сложной условной оценки:
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when(function (): bool {
return $this->shouldSkip();
}),
];
}
Отправка заданий
После того как вы написали свой класс задания, вы можете отправить его, используя метод dispatch
самого задания. Аргументы, переданные методу dispatch
, будут переданы конструктору задания:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}
Если требуется отправить задание по условию, то можно использовать методы dispatchIf
и dispatchUnless
:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
В новых приложениях Laravel драйвер sync
является драйвером очереди по-умолчанию. Этот драйвер выполняет задания синхронно во время запроса, что часто бывает удобно при локальной разработке. Если вы действительно хотите поставить задания в очередь для фоновой обработки, вы можете указать другой драйвер очереди в файле конфигурации вашего приложения config/queue.php
.
Отложенная отправка
Если вы хотите указать, что задание не должно быть немедленно доступно для обработчика очереди, вы можете использовать метод delay
при отправке задания. Например, давайте укажем, что задание не должно быть доступно для обработки в течение 10 минут после его отправки:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}
В некоторых случаях для заданий может быть настроена задержка по умолчанию. Если вам нужно обойти эту задержку и отправить задание на немедленную обработку, вы можете использовать метод withoutDelay
:
ProcessPodcast::dispatch($podcast)->withoutDelay();
У сервиса очередей Amazon SQS максимальное время задержки составляет 15 минут.
Отправка задания после отправки ответа в браузер
В качестве альтернативы, метод dispatchAfterResponse
задерживает отправку задания до тех пор, пока HTTP-ответ не будет отправлен в браузер пользователя, если ваш веб-сервер использует FastCGI. Это по прежнему позволит пользователю получить ответ от приложения, даже если задание в очереди все еще выполняется. Обычно это следует использовать только для заданий, которые занимают около секунды, например, для отправки электронного письма. Поскольку они обрабатываются в рамках текущего HTTP-запроса, отправляемые таким образом задания не требуют запуска обработчика очереди для их обработки:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
Вы также можете отправить замыкание и связать метод afterResponse
с помощником dispatch
, чтобы выполнить функцию после того, как HTTP-ответ был отправлен в браузер:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();
Синхронная отправка
Если вы хотите отправить задание немедленно (синхронно), то вы можете использовать метод dispatchSync
. При использовании этого метода задание не будет поставлено в очередь и будет выполнено немедленно в рамках текущего процессе:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Создание подкаста...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}
Задания и транзакции базы данных
Хотя отправлять задания в рамках транзакций базы данных вполне нормально, вам следует уделить особое внимание тому, чтобы ваше задание действительно могло выполняться успешно. При отправке задания в то время, как открыта транзакция в базе данных, возможно, что задание будет обработано до фиксации родительской транзакции. Когда это происходит, любые обновления, внесенные вами в модели или записи базы данных во время транзакции базы данных, могут еще не быть отражены в базе данных. Кроме того, любые модели или записи базы данных, созданные в рамках транзакции, могут даже не существовать в базе данных.
К счастью, Laravel содержит несколько методов решения этой проблемы. Во-первых, вы можете задать параметр соединения after_commit
в массиве конфигурации соединения к очереди:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],
Когда параметр after_commit
имеет значение true
, вы можете отправлять задания в транзакциях базы данных; однако, Laravel будет ждать, пока все открытые родительские транзакции базы данных будут завершены, прежде чем фактически отправить задание. Если в настоящее время нет открытых транзакций, задание будет отправлено немедленно.
При откате транзакции из-за исключения, возникшего во время транзакции, отправленные во время этой транзакции задания будут отброшены.
Установка параметру конфигурации
after_commit
значенияtrue
также вызовет отправку всех поставленных в очередь слушателей событий, почтовых отправлений, уведомлений и широковещательных событий после того, как все открытые транзакции базы данных были зафиксированы.
Непосредственное указание поведения отправки при фиксации транзакций БД
Если вы не установите для параметра конфигурации соединения очереди after_commit
значение true
, то вы все равно можете указать, что конкретное задание должно быть отправлено после того, как все открытые транзакции базы данных будут завершены. Для этого вы можете связать метод afterCommit
с операцией отправки:
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();
Аналогично, если для параметра конфигурации after_commit
установлено значение true
, вы можете указать, что конкретное задание должно быть отправлено немедленно, не дожидаясь завершения каких-либо открытых транзакций базы данных:
ProcessPodcast::dispatch($podcast)->beforeCommit();
Цепочка заданий
Цепочка заданий позволяет указать список заданий в очереди, которые должны выполняться последовательно после успешного выполнения основного задания. Если одно задание в последовательности завершается неуспешно, то остальные задания не выполняются. Чтобы выполнить цепочку заданий в очереди, вы можете использовать метод chain
, фасада Bus
. Командная шина Laravel – это компонент нижнего уровня, на котором построена диспетчеризация заданий в очереди:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
В дополнение к цепочке экземпляров класса задания вы также можете передавать функции:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
Удаление заданий с помощью метода
$this->delete()
внутри задания не остановит обработку связанных заданий. Цепочка прекратит выполнение только в случае сбоя задания в цепочке.
Соединения и очередь цепочки заданий
Если вы хотите указать соединение и очередь, которые должны использоваться для связанных заданий, вы можете использовать методы onConnection
и onQueue
. Эти методы указывают соединение и имя очереди, которые следует использовать, если заданию явно не назначено другое соединение / очередь:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
Добавление заданий в цепочку
Иногда вам может потребоваться добавить задание в существующую цепочку заданий из другого задания в этой цепочке. Вы можете сделать это, используя методы prependToChain
и appendToChain
:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
// Prepend to the current chain, run job immediately after current job...
$this->prependToChain(new TranscribePodcast);
// Append to the current chain, run job at end of chain...
$this->appendToChain(new TranscribePodcast);
}
Отказы в цепочке заданий
При объединении заданий в цепочку вы можете использовать метод catch
, чтобы указать функцию, которая должна вызываться, если задание в цепочке завершается неуспешно. Данная функция получит экземпляр Throwable
, спровоцировавшего провал задания:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// Задание в цепочке не выполнено...
})->dispatch();
Поскольку функции-замыкания сериализуются и выполняются позже в очереди Laravel, вам не следует использовать $this в замыканиях.
Настройка соединения и очереди
Отправка в определенную очередь
Помещая задания в разные очереди, вы можете «классифицировать» свои задания в очереди и даже определять приоритеты, сколько обработчиков вы назначаете в разные очереди. Имейте в виду, что при этом задания не отправляются в разные «соединения» очередей, как определено в файле конфигурации очереди, а только в определенные очереди в рамках одного соединения. Чтобы указать очередь, используйте метод onQueue
при отправке задания:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Создание подкаста...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}
Кроме того, вы можете указать очередь задания, вызвав метод onQueue
в конструкторе задания:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Создать новый экземпляр задания.
*/
public function __construct()
{
$this->onQueue('processing');
}
}
Отправка в конкретное соединение
Если ваше приложение взаимодействует с несколькими соединениями очередей, то вы можете указать, на какое соединение отправить задание, используя метод onConnection
:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Сохранить новый подкаст.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Создание подкаста...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}
Вы можете связать методы onConnection
и onQueue
вместе, чтобы указать соединение и очередь для задания:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
Кроме того, вы можете указать соединение задания, вызвав метод onConnection
в конструкторе задания:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Создать новый экземпляр задания.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}
Указание максимального количества попыток задания / значений тайм-аута
Максимальное количество попыток
Если в одном из ваших заданий в очереди обнаруживается ошибка, то вы, вероятно, не хотите, чтобы оно продолжало повторять попытки бесконечно. Laravel предлагает различные способы указать, сколько раз и как долго задание может быть повторно выполняться.
Один из подходов к указанию максимального количества попыток выполнения задания – это использование переключателя --tries
в командной строке Artisan. Это будет применяться ко всем заданиям обработчика, если только в обрабатываемом задание не указано количество попыток его выполнения:
php artisan queue:work --tries=3
Если задание превышает максимальное количество попыток, то оно будет считаться «неудачным». Для получения дополнительной информации об обработке невыполненных заданий обратитесь к документации по разбору неудачных заданий. Если указано --tries=0
в команде queue:work
, задание будет повторяться бесконечно.
Вы можете применить более детальный подход, указав максимальное количество попыток выполнения задания для самого класса задания. Если для задания указано максимальное количество попыток, оно будет иметь приоритет над значением --tries
, указанным в командной строке:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* Количество попыток выполнения задания.
*
* @var int
*/
public $tries = 5;
}
Если вам необходимо динамически управлять максимальным числом попыток выполнения конкретного задания, вы можете определить метод tries
внутри задания:
/**
* Определите количество попыток выполнения задания.
*/
public function tries(): int
{
return 5;
}
Попытки, основанные на времени
В качестве альтернативы определению количества попыток выполнения задания до того, как оно завершится ошибкой, вы можете определить время, когда прекратить попытки выполнения задания. Это позволяет выполнять задание любое количество раз в течение заданного периода времени. Чтобы определить время, через которое больше не следует пытаться выполнить задание, добавьте метод retryUntil
в свой класс задания. Этот метод должен возвращать экземпляр DateTime
:
use DateTime;
/**
* Задать временной предел попыток выполнить задания.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
Вы также можете определить свойство
$tries
или методretryUntil
в ваших слушателях событий.
Максимальное количество исключений
Иногда вы можете указать, что задание может быть выполнено много раз, но должно завершиться ошибкой, если повторные попытки инициированы заданным количеством необработанных исключений (в отличие от отправки напрямую методом release
). Для этого вы можете определить свойство maxExceptions
в своем классе задания:
<?php
namespace App\Jobs;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
/**
* Количество попыток выполнения задания.
*
* @var int
*/
public $tries = 25;
/**
* Максимальное количество разрешенных необработанных исключений.
*
* @var int
*/
public $maxExceptions = 3;
/**
* Выполнить задание.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Блокировка получена, обрабатываем подкаст...
}, function () {
// Невозможно получить блокировку...
return $this->release(10);
});
}
}
В этом примере задание высвобождается на десять секунд, если приложение не может получить блокировку Redis, и будет продолжать повторяться до 25 раз. Однако задание завершится ошибкой, если оно вызовет три необработанных исключения.
Таймаут
Часто вы приблизительно знаете, сколько времени займет выполнение заданий в очереди. По этой причине Laravel позволяет вам указать значение «таймаута». По умолчанию значение таймаута составляет 60 секунд. Если задание обрабатывается дольше, чем количество секунд, указанное в значении тайм-аута, рабочий процесс, обрабатывающий задание, завершит работу с ошибкой. Обычно worker перезапускается автоматически менеджером процессов, настроенным на вашем сервере.
Максимальное количество секунд, в течение которых могут выполняться задания, можно указать с помощью переключателя --timeout
в командной строке Artisan:
php artisan queue:work --timeout=30
Если задание превышает максимальное количество попыток из-за постоянного тайм-аута, оно будет помечено как «неудачное».
Вы также можете определить таймаут в самом классе задания. В этом случае это значение будет иметь приоритет над любым таймаутом, указанным в командной строке:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* Количество секунд, в течение которых задание может выполняться до истечения тайм-аута.
*
* @var int
*/
public $timeout = 120;
}
Иногда процессы блокировки ввода-вывода, такие, как сокеты или исходящие HTTP-соединения, могут не учитывать указанный вами таймаут. Следовательно, при использовании этих функций вы всегда должны пытаться указать таймаут, используя их API. Например, при использовании Guzzle вы всегда должны указывать значение таймаута соединения и запроса.
Для указания тайм-аутов заданий необходимо установить PHP-расширение
pcntl
. Кроме того, значение тайм-аута в задании всегда должно быть меньше значения “retry after”. В противном случае задание может быть повторено до того, как оно фактически завершится или истечет время ожидания.
Неудача заданий по таймауту
Если вы хотите указать, что задание должно быть помечено как failed по истечении времени, вы можете определить свойство $failOnTimeout
для класса задания:
/**
* Indicate if the job should be marked as failed on timeout.
*
* @var bool
*/
public $failOnTimeout = true;
Обработка ошибок
Если во время обработки задания возникает исключение, задание автоматически возвращается в очередь (release), чтобы его можно было повторить. Задание будет продолжать возвращаться до тех пор, пока оно не будет выполнено максимальное количество раз, разрешенное вашим приложением. Максимальное количество попыток определяется переключателем --tries
, используемым в команде queue:work
Artisan. В качестве альтернативы максимальное количество попыток может быть определено в самом классе задания. Более подробную информацию о запуске обработчика очереди можно найти ниже.
Ручное освобождение задания
По желанию можно вручную вернуть задание в очередь, чтобы его можно было повторить позже. Вы можете сделать это, вызвав метод release
:
/**
* Выполнить задание.
*/
public function handle(): void
{
// ...
$this->release();
}
По умолчанию метод release
помещает задание обратно в очередь для немедленной обработки. Однако вы можете указать очереди не делать задание доступным для обработки до тех пор, пока не истечет заданное количество секунд, передав целое число или экземпляр даты в методе release
:
$this->release(10);
$this->release(now()->addSeconds(10));
Пометка задания неудачным
Иногда требуется вручную пометить задание как «неудачное». Для этого вы можете вызвать метод fail
:
/**
* Выполнить задание.
*/
public function handle(): void
{
// ...
$this->fail();
}
Если вы хотите пометить свою работу как неудавшуюся из-за обнаруженного исключения, то вы можете передать исключение методу fail
. Или, для удобства, вы можете передать строковое сообщение об ошибке, которое будет преобразовано для вас в исключение:
$this->fail($exception);
$this->fail('Something went wrong.');
Для получения дополнительной информации об обработке невыполненных заданий обратитесь к документации по разбору неудачных заданий.
Пакетная обработка заданий
Функционал пакетной обработки заданий Laravel позволяет вам легко выполнить пакет заданий, по завершению которого дополнительно совершить определенные действия. Перед тем, как начать, вы должны создать миграцию базы данных, чтобы построить таблицу, содержащую метаинформацию о ваших пакетах заданий, такую как процент их завершения. Эта миграция может быть сгенерирована с помощью команды make:queue-batches-table
Artisan:
php artisan make:queue-batches-table
php artisan migrate
Определение пакета заданий
Чтобы определить задание с возможностью пакетной передачи, вы, как обычно, должны создать задание в очереди; тем не менее, вы должны добавить к классу задания трейт Illuminate\Bus\Batchable
. Этот трейт обеспечивает доступ к методу batch
, использующийся для получения текущего пакета, в котором выполняется задание:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ImportCsv implements ShouldQueue
{
use Batchable, Queueable;
/**
* Выполнить задание.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Определяем, был ли пакет отменен...
return;
}
// Импортируем часть CSV-файла...
}
}
Отправка пакета заданий
Чтобы отправить пакет заданий, вы должны использовать метод batch
фасада Bus
. Основное преимущество обработки заданий одним пакетом – в том, что можно исполнить некий код по завершению этого пакета. Этот код добавляется в виде функций в аргументах методов then
, catch
и finally
. Каждая из этих функций получит при вызове экземпляр Illuminate\Bus\Batch
. В этом примере мы представим, что отправляем в очередь пакет заданий, каждое из которых обрабатывает указанное количество строк из файла CSV:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// Пакет заданий создан, но не добавлено ни одно задание...
})->progress(function (Batch $batch) {
// Одна задача успешно завершена...
})->then(function (Batch $batch) {
// Все задания успешно завершены...
})->catch(function (Batch $batch, Throwable $e) {
// Обнаружено первое проваленное задание из пакета...
})->finally(function (Batch $batch) {
// Завершено выполнение пакета...
})->dispatch();
return $batch->id;
Идентификатор пакета, к которому можно получить доступ через свойство $batch->id
, можно использовать для запроса к командной шине Laravel для получения информации о пакете после того, как он был отправлен.
Поскольку пакетные обратные вызовы сериализуются и выполняются позднее в очереди Laravel, вы не должны использовать переменную
$this
в обратных вызовах. Кроме того, поскольку пакетные задания заключены в транзакции базы данных, операторы базы данных, вызывающие неявную фиксацию, не должны выполняться внутри заданий.
Именованные пакеты заданий
Некоторые инструменты, такие как Laravel Horizon и Laravel Telescope, могут предоставлять более удобную для пользователя отладочную информацию о пакет, если пакеты имеют имена. Чтобы присвоить пакету произвольное имя, вы можете вызвать метод name
при определении пакета:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// Все задания успешно завершены...
})->name('Import CSV')->dispatch();
Соединение и очередь пакета
Если вы хотите указать соединение и очередь, которые должны использоваться для пакетных заданий, то вы можете использовать методы onConnection
и onQueue
. Все пакетные задания должны выполняться в одном соединении и в одной очереди:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// Все задания успешно завершены...
})->onConnection('redis')->onQueue('imports')->dispatch();
Цепочки заданий (Chains) и Пакеты (Batches)
Вы можете определить набор связанных заданий в пакете, поместив связанные задания в массив. Например, мы можем выполнить две цепочки заданий параллельно и выполнить замыкание, когда обе цепочки заданий завершат обработку:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();
И наоборот, вы можете запускать пакеты заданий внутри цепочки, определяя пакеты внутри цепочки. Например, вы можете сначала запустить пакет заданий для выпуска нескольких подкастов, а затем пакет заданий для отправки уведомлений о выпуске:
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();
Добавление заданий в пакет заданий
Иногда может быть полезно добавить дополнительные задания в пакет, непосредственно из задания, уже находящегося в пакете. Этот шаблон может быть полезен, когда вам нужно выполнить пакетную обработку тысяч заданий, выполнение которых может занять слишком много времени во время веб-запроса, когда формируется пакет. Таким образом, вместо этого вы можете отправить начальный пакет заданий «загрузчику», которые дополнят пакет еще большим количеством заданий:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// Все задания успешно завершены...
})->name('Import Contacts')->dispatch();
В этом примере мы будем использовать задание LoadImportBatch
, чтобы дополнить пакет дополнительными заданиями. Для этого мы можем использовать метод add
экземпляра пакета, к которому можно получить доступ через метод batch
задания:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* Выполнить задание.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
Вы можете добавлять задания в пакет только из задания, которое принадлежит к тому же пакету.
Инспектирование пакета
Экземпляр Illuminate\Bus\Batch
, который передается замыканиям по завершению пакета, имеет множество свойств и методов, помогающих взаимодействовать с данным пакетом заданий и его анализа:
// UUID пакета...
$batch->id;
// Название пакета (если применимо)...
$batch->name;
// Количество заданий, назначенных пакету...
$batch->totalJobs;
// Количество заданий, которые не были обработаны очередью...
$batch->pendingJobs;
// Количество неудачных заданий...
$batch->failedJobs;
// Количество заданий, обработанных на данный момент...
$batch->processedJobs();
// Процент завершения пакетной обработки (0-100)...
$batch->progress();
// Указывает, завершено ли выполнение пакета...
$batch->finished();
// Отменить выполнение пакета...
$batch->cancel();
// Указывает, был ли пакет отменен...
$batch->cancelled();
Возврат пакетов заданий из маршрутов
Все экземпляры Illuminate\Bus\Batch
являются сериализуемыми в формате JSON, что означает, что вы можете возвращать их непосредственно из одного из маршрутов вашего приложения, чтобы получить JSON, содержащий информацию о пакете, включая ход его завершения. Это позволяет удобно отображать информацию о ходе выполнения пакета в пользовательском интерфейсе вашего приложения.
Чтобы получить пакет по его идентификатору, вы можете использовать метод findBatch
фасада Bus
:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
Отмена пакетов
Иногда требуется отменить выполнение определенного пакета. Это можно сделать, вызвав метод cancel
экземпляра Illuminate\Bus\Batch
:
/**
* Выполнить задание.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}
Как вы, возможно, заметили в предыдущих примерах, пакетные задания обычно должны определить, был ли соответствующий пакет отменен, прежде чем продолжить выполнение. Однако для удобства вместо этого вы можете назначить заданию SkipIfBatchCancelled
middleware. Как следует из названия, это middleware будет инструктировать Laravel не обрабатывать задание, если соответствующий пакет был отменен:
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}
Отказы в пакете заданий
Если задание в пакете завершается неуспешно, то будет вызвано замыкание catch
(если назначено). Это замыкание вызывается только для первого проваленного задания в пакете.
Допущение отказов
Когда задание в пакете завершается неуспешно, Laravel автоматически помечает пакет как «отмененный». При желании вы можете отключить это поведение, чтобы при провале задания пакет не отмечался автоматически как отмененный. Это может быть выполнено путем вызова метода allowFailures
при отправке пакета:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// Все задания успешно завершены...
})->allowFailures()->dispatch();
Повторная попытка выполнения неудачных пакетных заданий
Для удобства Artisan содержит команду queue:retry-batch
, которая позволяет вам легко повторить все неудачные задания для указанного пакета. Команда queue:retry-batch
принимает UUID пакета, чьи неудачные задания следует повторить:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
Очистка пакетов
Если не применять очистку, то таблица job_batches
может очень быстро накапливать записи. Чтобы избежать этого, вы должны запланировать ежедневный запуск команды queue:prune-batches
Artisan:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches')->daily();
По умолчанию все готовые пакеты, возраст которых превышает 24 часа, будут удалены. Вы можете использовать параметр hours
при вызове команды, чтобы определить, как долго хранить пакетные данные. Например, следующая команда удалит все пакеты, завершенные более 48 часов назад:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48')->daily();
Иногда в таблице jobs_batches
могут накапливаться записи пакетов, которые так и не были успешно завершены, например, пакеты, в которых задание не удалось выполнить, и это задание так и не было успешно перезапущено. Вы можете поручить команде queue:prune-batches
очистить эти незавершенные пакетные записи, используя опцию unfinished
:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
Аналогично, ваша таблица jobs_batches
может также накапливать записи об отмененных пакетах. Вы можете указать команде queue:prune-batches
удалить эти отмененные пакетные записи, используя флаг cancelled
:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
Хранение пакетов в DynamoDB
Laravel также поддерживает хранение мета-информации о пакетах в DynamoDB, а не в реляционной базе данных. Однако вам придется вручную создать таблицу DynamoDB для хранения всех записей о пакетах.
Обычно эта таблица должна называться job_batches
, но вы можете назвать таблицу в зависимости от значения конфигурации queue.batching.table
в файле конфигурации очереди вашего приложения.
Конфигурация таблицы пакетов DynamoDB
Таблица job_batches
должна иметь строковый первичный ключ с именем application
и строковый первичный ключ с именем id
. Часть application
ключа будет содержать имя вашего приложения, как определено значением name
в файле конфигурации приложения app
. Поскольку имя приложения является частью ключа таблицы DynamoDB, вы можете использовать ту же таблицу для хранения пакетов задач для нескольких приложений Laravel.
Кроме того, вы можете определить атрибут ttl
для вашей таблицы, если хотите воспользоваться автоматической обрезкой пакетов.
Конфигурация DynamoDB
Затем установите AWS SDK, чтобы ваше Laravel-приложение могло взаимодействовать с Amazon DynamoDB:
composer require aws/aws-sdk-php
Затем установите значение параметра конфигурации queue.batching.driver
на dynamodb
. Кроме того, вам следует определить параметры конфигурации key
, secret
и region
в массиве конфигурации batching
. Эти параметры будут использоваться для аутентификации в AWS. При использовании драйвера dynamodb
параметр конфигурации queue.batching.database
не требуется:
'batching' => [
'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],
Очистка пакетов в DynamoDB
При использовании DynamoDB для хранения информации о пакетах задач, типичные команды очистки для пакетов не будут работать. Вместо этого вы можете использовать встроенную функцию TTL в DynamoDB для автоматического удаления записей о старых пакетах.
Если вы определили таблицу DynamoDB с атрибутом ttl
, вы можете определить параметры конфигурации, чтобы указать Laravel, как удалять записи о пакетах. Значение параметра конфигурации queue.batching.ttl_attribute
определяет имя атрибута, содержащего TTL, а значение параметра конфигурации queue.batching.ttl
определяет количество секунд, через которое запись о пакете может быть удалена из таблицы DynamoDB, относительно последнего времени обновления записи:
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],
Анонимные очереди
Вместо отправки класса задания в очередь вы также можете отправить функцию. Это отлично подходит для быстрых и простых задач, которые необходимо выполнять вне текущего цикла запроса. При отправке функции в очередь содержимое кода функции криптографически подписывается, поэтому его нельзя изменить при передаче:
$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
Используя метод catch
, вы можете определить функцию, которая должна быть выполнена, если анонимная очередь не завершится успешно после исчерпания всех сконфигурированных попыток повтора вашей очереди:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// Это задание завершилось неудачно...
});
Поскольку функции-замыкания в
catch
сериализуются и выполняются очередью Laravel позднее, вам не следует использовать$this
в обратных вызовахcatch
.
Запуск обработчика очереди
Команда queue:work
Laravel включает команду Artisan, которая запускает обработчика очереди и обрабатывает новые задания по мере их помещения в очередь. Вы можете запустить обработчик с помощью команды queue:work
Artisan. Обратите внимание, что после запуска команды queue:work
она будет продолжать работать, пока не будет остановлена вручную или пока вы не закроете терминал (консоль):
php artisan queue:work
Чтобы процесс
queue:work
работал постоянно в фоновом режиме, вам следует использовать диспетчер процессов, такой как Supervisor, чтобы гарантировать, что worker очереди не перестанет работать.
Вы можете включить флаг -v
при вызове команды queue:work
, если хотите, чтобы идентификаторы обработанных заданий были включены в output команды:
php artisan queue:work -v
Помните, что обработчики очереди – это долгоживущие процессы, которые хранят состояние загруженного приложения в памяти. В результате они не заметят изменений в вашей кодовой базе после их запуска. Итак, во время процесса развертывания обязательно перезапустите своих обработчиков очереди. Кроме того, помните, что любое статическое состояние, созданное или измененное вашим приложением, не будет автоматически пробрасываться между заданиями.
Как вариант, вы можете запустить команду queue:listen
. При использовании команды queue:listen
вам не нужно вручную перезапускать обработчик, если вы хотите перезагрузить обновленный код или сбросить состояние приложения; однако эта команда значительно менее эффективна, чем команда queue:work
:
php artisan queue:listen
Запуск нескольких обработчиков очереди
Чтобы назначить несколько обработчиков в очередь и обрабатывать задания одновременно, вы должны просто запустить несколько процессов queue:work
. Это можно сделать либо локально с помощью нескольких вкладок в вашем терминале, либо в эксплуатационном режиме, используя параметры конфигурации вашего диспетчера процессов. При использовании Supervisor вы можете использовать значение конфигурации numprocs
.
Указание соединения и очереди
Вы также можете указать, какое соединение очереди должен использовать обработчик. Имя соединения, переданное команде work
, должно соответствовать одному из соединений, определенных в конфигурационном файле config/queue.php
:
php artisan queue:work redis
По умолчанию команда queue:work
обрабатывает задания только для очереди по умолчанию на данном соединении. Однако, вы можете дополнительно указать, какие очереди необходимо обрабатывать для указанного соединения. Например, если все ваши электронные письма обрабатываются в очереди emails
соединения redis
, то вы можете использовать команду, чтобы запустить обработчик только для этой очереди:
php artisan queue:work redis --queue=emails
Обработка указанного количества заданий
Переключатель --once
обработчика используется для указания обработать только одно задание из очереди:
php artisan queue:work --once
Параметр --max-jobs
обработчика проинструктирует его обработать заданное количество заданий, а затем выйти. Этот параметр может быть полезен в сочетании с Supervisor, чтобы ваши рабочие процессы автоматически перезапускались после обработки заданного количества заданий, освобождая любую занятую ими память:
php artisan queue:work --max-jobs=1000
Обработка всех заданий в очереди с последующим выходом
Переключатель --stop-when-empty
обработчика может использоваться, чтобы дать ему указание обработать все задания и затем корректно завершить работу. Этот параметр может быть полезен при обработке очередей Laravel в контейнере Docker, если вы хотите выключить контейнер после того, как очередь пуста:
php artisan queue:work --stop-when-empty
Обработка заданий за заданное количество секунд
Параметр --max-time
обработчика может использоваться, чтобы дать ему указание обрабатывать задания в течение заданного количества секунд, а затем выйти. Этот параметр может быть полезен в сочетании с Supervisor, чтобы ваши рабочие процессы автоматически перезапускались после обработки заданий в течение заданного времени, освобождая любую занятую ими память:
# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600
Продолжительность задержки выполнения обработчика
Когда задания доступны в очереди, обработчик будет продолжать обрабатывать задания без задержки между ними. Однако опция sleep
определяет, сколько секунд обработчик будет «спать», если нет новых доступных заданий. Конечно, во время задержки выполнения обработчик не будет обрабатывать никаких новых заданий – задания будут обработаны после того, как обработчик снова проснется:
php artisan queue:work --sleep=3
Режим обслуживания и очереди
Пока ваше приложение находится в режиме обслуживания, задания, поставленные в очередь, не будут обрабатываться. После выхода приложения из режима обслуживания задания будут обрабатываться в обычном режиме.
Чтобы обрабатывать задания в очереди, даже если включён режим обслуживания, вы можете использовать опцию --force
:
php artisan queue:work --force
Соображения относительно ресурсов
Демоны обработчиков очередей не «перезагружают» фреймворк перед обработкой каждого задания. Следовательно, вы должны освобождать все тяжелые ресурсы после завершения каждого задания. Например, если вы выполняете манипуляции с изображениями с помощью библиотеки GD, вы должны освободить память с помощью imagedestroy
, когда вы закончите обработку изображения.
Приоритеты очереди
Иногда вы можете установить приоритетность обработки очередей. Например, в конфигурационном файле config/queue.php
для очереди по умолчанию вашего соединения redis
вы можете установить low
. По желанию можно поместить задание в очередь с «высоким» (high
) приоритетом, например:
dispatch((new Job)->onQueue('high'));
Чтобы запустить обработчика, который проверяет, что все задания очереди high
обработаны, прежде чем переходить к любым заданиям в очереди low
, передайте разделенный запятыми список имен очередей команде work
:
php artisan queue:work --queue=high,low
Обработчики очереди и развертывание
Поскольку обработчики очереди – это долгоживущие процессы, они не заметят изменений в вашем коде без перезапуска. Итак, самый простой способ развернуть приложение с использованием обработчиков очереди – это перезапустить обработчиков во время процесса развертывания. Вы можете корректно перезапустить всех обработчиков, используя команду queue:restart
:
php artisan queue:restart
Эта команда проинструктирует всех обработчиков очереди корректно выйти после завершения обработки своего текущего задания, чтобы существующие задания не были потеряны. Поскольку обработчики очереди выйдут при выполнении команды queue:restart
, вы должны запустить диспетчер процессов, такой как Supervisor, для автоматического перезапуска обработчиков очереди.
Очередь использует кеш для хранения сигналов перезапуска, поэтому перед использованием этой функции необходимо убедиться, что драйвер кеша правильно настроен для приложения.
Истечение срока и тайм-ауты задания
Истечение срока задания
В вашем файле конфигурации config/queue.php
каждое соединение с очередью определяет параметр retry_after
. Этот параметр указывает, сколько секунд соединение очереди должно ждать перед повторной попыткой выполнения задания, которое обрабатывается. Например, если значение retry_after
установлено на 90
, задание будет возвращено в очередь, если оно обрабатывалось в течение 90 секунд, но не было высвобождено или удалено. Как правило, вы должны установить значение retry_after
на максимальное количество секунд, которое может потребоваться вашим заданиям для завершения обработки.
Единственное соединение очереди, которое не содержит значения
retry_after
– это Amazon SQS. SQS будет повторять выполнение задания в соответствии с таймаутом видимости по умолчанию, управляемый консолью AWS.
Тайм-ауты обработчиков
Команда queue:work
Artisan также содержит параметр --timeout
. По умолчанию значение --timeout
составляет 60 секунд. Если задание обрабатывается дольше, чем количество секунд, указанное значением тайм-аута, Обработчик, выполняющий задание, завершится с ошибкой. Обычно обработчик перезапускается автоматически диспетчером, настроенным на вашем сервере:
php artisan queue:work --timeout=60
Параметр конфигурации retry_after
и параметр --timeout
Artisan отличаются, но работают вместе, чтобы гарантировать, что задания не будут потеряны и что задания будут успешно обработаны только один раз.
Значение
--timeout
всегда должно быть как минимум на несколько секунд короче, чем ваше значение конфигурацииretry_after
. Это гарантирует, что обрабатывающий замороженное задание обработчик, всегда завершает работу перед повторной попыткой выполнения задания. Если параметр--timeout
выше значения конфигурацииretry_after
, то ваши задания могут быть обработаны дважды.
Конфигурация Supervisor
В эксплуатационном окружении вам нужен способ поддерживать процессы queue:work
в рабочем состоянии. Процесс queue:work
может перестать работать по разным причинам, например, из-за превышения тайм-аута обработчика или выполнения команды queue:restart
.
По этой причине вам необходимо настроить диспетчер процессов, который может определять, когда ваши процессы queue:work
завершаются, и автоматически перезапускать их. Кроме того, диспетчеры процессов могут позволить вам указать, сколько процессов queue:work
вы хотите запускать одновременно. Supervisor – это диспетчер процессов, обычно используемый в средах Linux, и мы обсудим, как его настроить в следующей документации.
Установка Supervisor
Supervisor – это диспетчер процессов для операционной системы Linux, который автоматически перезапускает ваши процессы queue:work
в случае их сбоя. Чтобы установить Supervisor в Ubuntu, вы можете использовать следующую команду:
sudo apt-get install supervisor
Если настройка Supervisor и управление им самостоятельно кажется ошеломляющим, рассмотрите возможность использования Laravel Forge, который автоматически установит и настроит Supervisor для ваших проектов Laravel.
Настройка Supervisor
Файлы конфигурации Supervisor обычно хранятся в каталоге /etc/supervisor/conf.d
. В этом каталоге вы можете создать любое количество файлов конфигурации, которые сообщают Supervisor, как следует контролировать ваши процессы. Например, давайте создадим файл laravel-worker.conf
, который запускает и отслеживает процессы queue:work
:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
В этом примере директива numprocs
инструктирует Supervisor запустить восемь процессов queue:work
и отслеживать их все, автоматически перезапуская их в случае сбоя. Вы должны изменить директиву command
конфигурации, чтобы отразить желаемое соединение с очередью и параметры обработчика.
Вы должны убедиться, что значение
stopwaitsecs
больше, чем количество секунд, затраченных на выполнение вашего самого продолжительного задания. В противном случае Supervisor может убить задание до того, как оно завершит обработку.
Запуск Supervisor
После создания файла конфигурации вы можете обновить конфигурацию Supervisor и запустить процессы, используя следующие команды:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start "laravel-worker:*"
Для получения дополнительной информации о Supervisor обратитесь к документации Supervisor.
Разбор неудачных заданий
Иногда ваши задания в очереди терпят неудачу. Не волнуйтесь, не всегда все идет по плану! Laravel включает удобный способ указать максимальное количество попыток выполнения задания. После того, как асинхронное задание превысит это количество попыток, оно будет вставлено в таблицу базы данных failed_jobs
. Синхронно отправленные задания, которые потерпели неудачу, не сохраняются в этой таблице, и их исключения немедленно обрабатываются приложением.
Миграция для создания таблицы failed_jobs
обычно уже присутствует в новых приложениях Laravel. Однако, если ваше приложение не содержит миграции для этой таблицы, вы можете использовать команду make:queue-failed-table
для создания миграции:
php artisan make:queue-failed-table
php artisan migrate
При запуске обработчика очереди вы можете указать максимальное количество попыток выполнения задания, используя переключатель --tries
команды queue:work
. Если вы не укажете значение для параметра --tries
, задания будут выполняться только один раз или столько раз, сколько указано в свойстве класса задания $tries
:
php artisan queue:work redis --tries=3
Используя параметр --backoff
, вы можете указать, сколько секунд Laravel должен ждать перед повторной попыткой выполнения задания, для которого возникло исключение. По умолчанию задание сразу же возвращается в очередь, чтобы его можно было повторить:
php artisan queue:work redis --tries=3 --backoff=3
Если вы хотите настроить, сколько секунд Laravel должен ждать перед повторной попыткой выполнения каждого из заданий, для которого возникло исключение, вы можете сделать это, определив свойство $backoff
в своем классе задания:
/**
* Количество секунд ожидания перед повторной попыткой выполнения задания.
*
* @var int
*/
public $backoff = 3;
Если вам требуется более сложная логика для определения времени отсрочки выполнения задания, вы можете определить метод backoff
для своего класса задания:
/**
* Рассчитать количество секунд ожидания перед повторной попыткой выполнения задания.
*/
public function backoff(): int
{
return 3;
}
Вы можете легко настроить «экспоненциальную» отсрочку, возвращая массив значений отсрочки из метода backoff
. В этом примере задержка повторной попытки выполнения будет составлять 1 секунду для первой попытки, 5 секунд для второй попытки и 10 секунд для третьей попытки, и 10 секунд для каждой последующей повторной попытки, если осталось еще попыток:
/**
* Рассчитать количество секунд ожидания перед повторной попыткой выполнения задания.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}
Очистка после неудачных заданий
В случае сбоя определенного задания вы можете отправить предупреждение своим пользователям или отменить любые действия, которые были частично выполнены заданием. Для этого вы можете определить метод failed
в своем классе работы. Экземпляр Throwable
, который привел к сбою задания, будет передан методу failed
:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Создать новый экземпляр задания.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Выполнить задание.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
/**
* Обработать провал задания.
*/
public function failed(?Throwable $exception): void
{
// Отправляем пользователю уведомление об ошибке и т.д.
}
}
Перед вызовом метода
failed
создается новый экземпляр задания. Поэтому все изменения свойств класса, которые могли произойти в методеhandle
, будут потеряны.
Повторная попытка выполнения неудачных заданий
Чтобы просмотреть все неудачные задания, которые были вставлены в вашу таблицу базы данных failed_jobs
, вы можете использовать команду queue:failed
Artisan:
php artisan queue:failed
Команда queue:failed
перечислит идентификатор задания, соединение, очередь, время сбоя и другую информацию о задании. Идентификатор задания может быть использован для повторной попытки выполнить неудачное задание. Например, чтобы повторить неудачное задание с идентификатором ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
, введите следующую команду:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
При необходимости вы можете передать команде несколько идентификаторов:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
Вы также можете повторить все неудачные задания для определенной очереди:
php artisan queue:retry --queue=name
Чтобы повторить все неудачные задания, выполните команду queue:retry
и передайте all
вместо идентификаторов:
php artisan queue:retry all
Если вы хотите удалить неудачные задание, вы можете использовать команду queue:forget
:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
При использовании Horizon вы должны использовать команду
horizon:forget
для удаления неудачного задания вместо командыqueue:forget
.
Чтобы удалить все неудачные задания из таблицы failed_jobs
, вы можете использовать команду queue:flush
:
php artisan queue:flush
Игнорирование отсутствующих моделей
При внедрении модели Eloquent в задание, модель автоматически сериализуется перед помещением в очередь и повторно извлекается из базы данных при обработке задания. Однако, если модель была удалена в то время, когда задание ожидало обработки, ваше задание может завершиться ошибкой с ModelNotFoundException
.
Для удобства вы можете выбрать автоматическое удаление заданий с отсутствующими моделями, установив для свойства задания $deleteWhenMissingModels
значение true
. Когда для этого свойства установлено значение true
, Laravel отбрасывает задание, не вызывая исключения:
/**
* Удалить задание, если модели больше не существуют.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
Удаление неудачных заданий
Вы можете удалить записи в таблице failed_jobs
вашего приложения, вызвав команду queue:prune-failed
Artisan:
php artisan queue:prune-failed
По умолчанию все записи о неудачных заданиях старше 24 часов будут удалены. Если в команде указать параметр --hours
, будут сохранены только те записи о неудачных заданиях, которые были вставлены в течение последних N часов. Например, следующая команда удалит все записи неудачных заданий, которые были вставлены более 48 часов назад:
php artisan queue:prune-failed --hours=48
Хранение неудачных заданий в DynamoDB
Laravel поддерживает хранение записей о неудачных заданиях в DynamoDB вместо таблицы реляционной базы данных. Перед этим вы должны вручную создать таблицу DynamoDB для хранения всех записей о неудачных заданиях. Обычно эта таблица называется failed_jobs
, но вы должны назвать ее в зависимости от значения параметра конфигурации queue.failed.table
в конфигурационном файле queue
вашего приложения.
Таблица failed_jobs
должна иметь строковый первичный partition key с именем application
и строковый первичный sort key с именем uuid
. Часть ключа application
будет содержать имя вашего приложения, определенное значением конфигурации name
в конфигурационном файле app
вашего приложения. Поскольку имя приложения является частью ключа таблицы DynamoDB, вы можете использовать одну и ту же таблицу для хранения неудачных заданий для нескольких приложений Laravel.
Кроме того, убедитесь, что вы установили AWS SDK, чтобы ваше приложение Laravel могло работать с Amazon DynamoDB:
composer require aws/aws-sdk-php
Затем установите значение параметра конфигурации queue.failed.driver
на dynamodb
. Кроме того, вы должны определить опции конфигурации key
, secret
и region
в массиве конфигурации неудачного задания. Эти параметры будут использоваться для аутентификации в AWS. При использовании драйвера dynamodb
опция конфигурации queue.failed.database
не нужна:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],
Отключение хранилища неудачных заданий
Вы можете указать Laravel отбрасывать невыполненные задания без их сохранения, установив параметр конфигурации queue.failed.driver
в значение null
. Как правило, это можно сделать с помощью переменной окружения QUEUE_FAILED_DRIVER
:
QUEUE_FAILED_DRIVER=null
События неудачных заданий
Если вы хотите зарегистрировать слушатель событий, который будет вызываться при сбое задания, вы можете использовать метод failing
фасада Queue
. Вызывать его можно например из метода boot
сервис-провайдера AppServiceProvider
:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* Регистрация любых служб приложения.
*/
public function register(): void
{
// ...
}
/**
* Загрузка любых служб приложения.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
Удаление заданий из очередей
При использовании Horizon вы должны использовать команду
horizon:clear
для удаления заданий из очереди вместо командыqueue:clear
.
Если вы хотите удалить все задания, принадлежащие соединению и очереди по умолчанию, вы можете сделать это с помощью команды queue:clear
Artisan:
php artisan queue:clear
Вы также можете указать аргумент connection
и параметр queue
для удаления заданий из конкретного соединения / очереди:
php artisan queue:clear redis --queue=emails
Удаление заданий из очередей доступно только для драйверов очереди SQS, Redis и базы данных. Кроме того, процесс удаления в SQS занимает до 60 секунд, поэтому задания, отправленные в очередь SQS в течение 60 секунд после очистки очереди, также могут быть удалены.
Мониторинг очередей
Если ваша очередь получает внезапный приток заданий, она может стать перегруженной, что приведет к длительному ожиданию завершения заданий. При желании Laravel может предупредить вас, когда количество заданий в очереди превысит заданный порог.
Для этого добавьте в планировщик команду queue:monitor
на запуск раз в минуту. Команда принимает имена очередей, которые вы хотите контролировать, а также желаемый порог количества заданий:
php artisan queue:monitor redis:default,redis:deployments --max=100
Когда команда обнаруживает очередь, количество заданий в которой превышает указанный порог, будет отправлено событие Illuminate\Queue\Events\QueueBusy
. Вы можете прослушать это событие в AppServiceProvider
вашего приложения, чтобы отправить уведомление вам или вашим коллегам:
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', 'dev@example.com')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}
Тестирование
При тестировании кода, отправляющего задание, вы можете указать Laravel не выполнять само задание, поскольку код задания можно тестировать напрямую и отдельно от остального кода, отправляющего его. Конечно, чтобы протестировать само задание, вы можете создать экземпляр задания и вызвать метод handle
непосредственно в тесте.
Вы можете использовать метод fake
фасада Queue
, чтобы предотвратить фактическую отправку заданий очередь. После вызова метода fake
фасада Queue
вы можете проверять в тестах, что приложение пыталось поместить задания в очередь:
<?php
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
test('orders can be shipped', function () {
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});
<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}
Вы можете передать функцию-замыкание методам assertPushed
или assertNotPushed
, чтобы подтвердить, что задание было отправлено и прошло заданный «тест на истинность». Если было отправлено хотя бы одно задание, которое проходит заданный тест, то утверждение будет успешным:
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
Подделка определённого списка заданий
Если вам нужно имитировать только определенные задания, позволяя другим заданиям выполняться нормально, вы можете передать имена классов заданий, которые следует имитировать, методу fake
:
test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}
Вы можете подделать все задания, кроме набора указанных, используя метод except
:
Queue::fake()->except([
ShipOrder::class,
]);
Тестирование цепочку заданий
Чтобы протестировать цепочки заданий, вам нужно будет использовать возможности фасада Bus
. Метод assertChained
фасада Bus
может использоваться для подтверждения того, что цепочка заданий была отправлена. Метод assertChained
принимает массив связанных заданий в качестве первого аргумента:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);
Как вы можете видеть в приведенном выше примере, массив цепочки заданий может быть массивом имен классов заданий. Однако вы также можете предоставить массив реальных экземпляров заданий. При этом Laravel гарантирует, что экземпляры заданий относятся к одному и тому же классу и имеют одинаковые значения свойств, что и связанные задания, отправленные вашим приложением:
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);
Вы можете использовать метод assertDispatchedWithoutChain
, чтобы подтвердить, что задание было отправлено без цепочки заданий:
Bus::assertDispatchedWithoutChain(ShipOrder::class);
Модификации цепочки тестирования
Если связанное задание добавляет или добавляет задания в существующую цепочку, вы можете использовать метод задания assertHasChain
, чтобы подтвердить, что задание имеет ожидаемую цепочку оставшихся заданий:
$job = new ProcessPodcast;
$job->handle();
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);
Метод assertDoesntHaveChain
может использоваться для подтверждения того, что оставшаяся цепочка задания пуста:
$job->assertDoesntHaveChain();
Тестирование цепочки пакетов
Если ваша цепочка заданий содержит пакет заданий, вы можете утверждать, что связанный пакет соответствует вашим ожиданиям, вставив определение Bus::chainedBatch
в assert цепочки:
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);
Тестирование пакетов заданий
Метод assertBatched
фасада Bus
может использоваться для подтверждения того, что пакет заданий был отправлен. Замыкание, данное методу assertBatched
, получает экземпляр Illuminate\Bus\PendingBatch
, который можно использовать для проверки заданий в пакете:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});
Вы можете использовать метод assertBatchCount
, чтобы подтвердить, что было отправлено заданное количество пакетов:
Bus::assertBatchCount(3);
Вы можете использовать assertNothingBatched
, чтобы подтвердить, что никакие пакеты не были отправлены:
Bus::assertNothingBatched();
Тестирования заданий / Взаимодействие пакетов
Кроме того, иногда вам может потребоваться протестировать взаимодействие отдельного задания с его базовым пакетом. Например, вам может потребоваться проверить, не отменило ли задание дальнейшую обработку своего пакета. Для этого вам необходимо назначить заданию поддельный пакет с помощью метода withFakeBatch
. Метод withFakeBatch
возвращает массив, содержащий экземпляр задания и поддельный пакет:
[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);
Тестирование взаимодействия заданий и очередей
Иногда вам может потребоваться проверить, что задание в очереди освобождается обратно в очередь. Или вам может потребоваться проверить, что задание удалилось само собой. Вы можете протестировать это взаимодействие с очередью, создав экземпляр задания и вызвав метод withFakeQueueInteractions
.
Как только взаимодействие задания с очередью будет сфальсифицировано, вы можете вызвать метод handle
для задания. После вызова задания методы assertReleased
, assertDeleted
, assertNotDeleted
, assertFailed
, assertFailedWith
и assertNotFailed
могут использоваться для создания утверждений относительно взаимодействия задания с очередью:
use App\Exceptions\CorruptedAudioException;
use App\Jobs\ProcessPodcast;
$job = (new ProcessPodcast)->withFakeQueueInteractions();
$job->handle();
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertFailedWith(CorruptedAudioException::class);
$job->assertNotFailed();
События заданий
Используя методы before
и after
фасада Queue
, вы можете указать функции, которые будут выполняться до или после обработки задания в очереди. Эти функции – прекрасная возможность для дополнительной регистрации или увеличения счётчиков для панели мониторинга. Как правило, вызов этих методов осуществляется в методе boot
сервис-провайдера. Например, мы можем использовать AppServiceProvider
, который включен в Laravel:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Регистрация любых служб приложения.
*/
public function register(): void
{
// ...
}
/**
* Загрузка любых служб приложения.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}
Используя метод looping
фасада Queue
, вы можете указать замыкания, которые выполняются до того, как обработчик попытается получить задание из очереди. Например, вы можете зарегистрировать замыкание для отката любых транзакций, оставшихся открытыми из-за ранее неудачного задания:
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});