PQueue is a minimalist PHP library for processing background messages using a single persistent CLI (managed via systemd) or periodic execution (via cron), covering 90% of use cases without external dependencies or complex worker management.
|
|
1 день тому | |
|---|---|---|
| src | 1 день тому | |
| tests | 1 день тому | |
| .gitignore | 1 день тому | |
| LICENSE | 1 день тому | |
| README.md | 1 день тому | |
| composer.json | 1 день тому |
PQueue is a lightweight, framework-agnostic library for handling background jobs and messages with persistent queues.
SQLite and Filesystem transports.composer require michel/pqueue
This example shows how to use the library in a simple PHP script.
1. Create a Message and a Handler
// src/Messages/MyMessage.php
namespace App\Messages;
class MyMessage {
public string $text;
public function __construct(string $text) { $this->text = $text; }
}
// src/Handlers/MyMessageHandler.php
namespace App\Handlers;
use App\Messages\MyMessage;
class MyMessageHandler {
public function __invoke(MyMessage $message) {
echo "Processing message: " . $message->text . "\n";
}
}
2. Dispatch a Message
// send_message.php
require 'vendor/autoload.php';
use Michel\PQueue\Transport\SQLiteTransport;
use Michel\PQueue\PQueueDispatcher;
use App\Messages\MyMessage;
// 1. Create a transport
$transport = SQLiteTransport::create(['db_path' => __DIR__ . '/pqueue.sqlite']);
// 2. Create a dispatcher
$dispatcher = new PQueueDispatcher($transport);
// 3. Dispatch your message
$dispatcher->dispatch(new MyMessage('Hello, World!'));
echo "Message dispatched!\n";
3. Run the Worker
The worker needs a HandlerResolver to get handler instances. For this simple example, we'll create a basic one.
// worker.php
require 'vendor/autoload.php';
use Michel\PQueue\PQueueConsumerFactory;
use Michel\PQueue\PQueueWorker;
use Michel\PQueue\HandlerResolver\HandlerResolverInterface;
use Michel\PQueue\Transport\SQLiteTransport;
use App\Handlers\MyMessageHandler; // Import the handler class
// 1. Create a simple handler resolver for the example
$handlerResolver = new class implements HandlerResolverInterface {
private array $handlers = [];
public function getHandler(string $className): object {
if (!isset($this->handlers[$className])) {
$this->handlers[$className] = new $className();
}
return $this->handlers[$className];
}
public function hasHandler(string $className): bool {
return class_exists($className);
}
};
// 2. Create the transport
$transport = SQLiteTransport::create(['db_path' => __DIR__ . '/pqueue.sqlite']);
// 3. Use the factory to build the consumer
$factory = new PQueueConsumerFactory(
$handlerResolver,
[
MyMessageHandler::class, // You can add handler classes directly
__DIR__ . '/src/Handlers' // And also scan directories
],
__DIR__ . '/cache' // Cache directory for handler discovery
);
$consumer = $factory->createConsumer();
// 4. Create and run the worker
$worker = new PQueueWorker($transport, $consumer, [
'stopWhenEmpty' => true, // Stop after processing all messages
]);
$worker->run();
echo "Worker finished.\n";
You can hook into the worker lifecycle using the following methods:
onConsume(callable $callback): Executed after a message is successfully consumed.onFailure(callable $callback): Executed when a message fails processing.onStop(callable $callback): Executed when the worker stops (due to memory limit, time limit, or empty queue).
$worker->onConsume(function ($message) {
echo "Message processed!\n";
});
$worker->onFailure(function ($message, $exception) {
echo "Message failed: " . $exception->getMessage() . "\n";
});
$worker->onStop(function () {
echo "Worker stopped.\n";
});