ソースを参照

Initial alpha release (0.0.1 ALPHA)

michelphp 1 日 前
コミット
a6dfce0be8

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+/vendor/
+/.idea/
+composer.lock

+ 373 - 0
LICENSE

@@ -0,0 +1,373 @@
+Mozilla Public License Version 2.0
+==================================
+
+1. Definitions
+--------------
+
+1.1. "Contributor"
+    means each individual or legal entity that creates, contributes to
+    the creation of, or owns Covered Software.
+
+1.2. "Contributor Version"
+    means the combination of the Contributions of others (if any) used
+    by a Contributor and that particular Contributor's Contribution.
+
+1.3. "Contribution"
+    means Covered Software of a particular Contributor.
+
+1.4. "Covered Software"
+    means Source Code Form to which the initial Contributor has attached
+    the notice in Exhibit A, the Executable Form of such Source Code
+    Form, and Modifications of such Source Code Form, in each case
+    including portions thereof.
+
+1.5. "Incompatible With Secondary Licenses"
+    means
+
+    (a) that the initial Contributor has attached the notice described
+        in Exhibit B to the Covered Software; or
+
+    (b) that the Covered Software was made available under the terms of
+        version 1.1 or earlier of the License, but not also under the
+        terms of a Secondary License.
+
+1.6. "Executable Form"
+    means any form of the work other than Source Code Form.
+
+1.7. "Larger Work"
+    means a work that combines Covered Software with other material, in
+    a separate file or files, that is not Covered Software.
+
+1.8. "License"
+    means this document.
+
+1.9. "Licensable"
+    means having the right to grant, to the maximum extent possible,
+    whether at the time of the initial grant or subsequently, any and
+    all of the rights conveyed by this License.
+
+1.10. "Modifications"
+    means any of the following:
+
+    (a) any file in Source Code Form that results from an addition to,
+        deletion from, or modification of the contents of Covered
+        Software; or
+
+    (b) any new file in Source Code Form that contains any Covered
+        Software.
+
+1.11. "Patent Claims" of a Contributor
+    means any patent claim(s), including without limitation, method,
+    process, and apparatus claims, in any patent Licensable by such
+    Contributor that would be infringed, but for the grant of the
+    License, by the making, using, selling, offering for sale, having
+    made, import, or transfer of either its Contributions or its
+    Contributor Version.
+
+1.12. "Secondary License"
+    means either the GNU General Public License, Version 2.0, the GNU
+    Lesser General Public License, Version 2.1, the GNU Affero General
+    Public License, Version 3.0, or any later versions of those
+    licenses.
+
+1.13. "Source Code Form"
+    means the form of the work preferred for making modifications.
+
+1.14. "You" (or "Your")
+    means an individual or a legal entity exercising rights under this
+    License. For legal entities, "You" includes any entity that
+    controls, is controlled by, or is under common control with You. For
+    purposes of this definition, "control" means (a) the power, direct
+    or indirect, to cause the direction or management of such entity,
+    whether by contract or otherwise, or (b) ownership of more than
+    fifty percent (50%) of the outstanding shares or beneficial
+    ownership of such entity.
+
+2. License Grants and Conditions
+--------------------------------
+
+2.1. Grants
+
+Each Contributor hereby grants You a world-wide, royalty-free,
+non-exclusive license:
+
+(a) under intellectual property rights (other than patent or trademark)
+    Licensable by such Contributor to use, reproduce, make available,
+    modify, display, perform, distribute, and otherwise exploit its
+    Contributions, either on an unmodified basis, with Modifications, or
+    as part of a Larger Work; and
+
+(b) under Patent Claims of such Contributor to make, use, sell, offer
+    for sale, have made, import, and otherwise transfer either its
+    Contributions or its Contributor Version.
+
+2.2. Effective Date
+
+The licenses granted in Section 2.1 with respect to any Contribution
+become effective for each Contribution on the date the Contributor first
+distributes such Contribution.
+
+2.3. Limitations on Grant Scope
+
+The licenses granted in this Section 2 are the only rights granted under
+this License. No additional rights or licenses will be implied from the
+distribution or licensing of Covered Software under this License.
+Notwithstanding Section 2.1(b) above, no patent license is granted by a
+Contributor:
+
+(a) for any code that a Contributor has removed from Covered Software;
+    or
+
+(b) for infringements caused by: (i) Your and any other third party's
+    modifications of Covered Software, or (ii) the combination of its
+    Contributions with other software (except as part of its Contributor
+    Version); or
+
+(c) under Patent Claims infringed by Covered Software in the absence of
+    its Contributions.
+
+This License does not grant any rights in the trademarks, service marks,
+or logos of any Contributor (except as may be necessary to comply with
+the notice requirements in Section 3.4).
+
+2.4. Subsequent Licenses
+
+No Contributor makes additional grants as a result of Your choice to
+distribute the Covered Software under a subsequent version of this
+License (see Section 10.2) or under the terms of a Secondary License (if
+permitted under the terms of Section 3.3).
+
+2.5. Representation
+
+Each Contributor represents that the Contributor believes its
+Contributions are its original creation(s) or it has sufficient rights
+to grant the rights to its Contributions conveyed by this License.
+
+2.6. Fair Use
+
+This License is not intended to limit any rights You have under
+applicable copyright doctrines of fair use, fair dealing, or other
+equivalents.
+
+2.7. Conditions
+
+Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted
+in Section 2.1.
+
+3. Responsibilities
+-------------------
+
+3.1. Distribution of Source Form
+
+All distribution of Covered Software in Source Code Form, including any
+Modifications that You create or to which You contribute, must be under
+the terms of this License. You must inform recipients that the Source
+Code Form of the Covered Software is governed by the terms of this
+License, and how they can obtain a copy of this License. You may not
+attempt to alter or restrict the recipients' rights in the Source Code
+Form.
+
+3.2. Distribution of Executable Form
+
+If You distribute Covered Software in Executable Form then:
+
+(a) such Covered Software must also be made available in Source Code
+    Form, as described in Section 3.1, and You must inform recipients of
+    the Executable Form how they can obtain a copy of such Source Code
+    Form by reasonable means in a timely manner, at a charge no more
+    than the cost of distribution to the recipient; and
+
+(b) You may distribute such Executable Form under the terms of this
+    License, or sublicense it under different terms, provided that the
+    license for the Executable Form does not attempt to limit or alter
+    the recipients' rights in the Source Code Form under this License.
+
+3.3. Distribution of a Larger Work
+
+You may create and distribute a Larger Work under terms of Your choice,
+provided that You also comply with the requirements of this License for
+the Covered Software. If the Larger Work is a combination of Covered
+Software with a work governed by one or more Secondary Licenses, and the
+Covered Software is not Incompatible With Secondary Licenses, this
+License permits You to additionally distribute such Covered Software
+under the terms of such Secondary License(s), so that the recipient of
+the Larger Work may, at their option, further distribute the Covered
+Software under the terms of either this License or such Secondary
+License(s).
+
+3.4. Notices
+
+You may not remove or alter the substance of any license notices
+(including copyright notices, patent notices, disclaimers of warranty,
+or limitations of liability) contained within the Source Code Form of
+the Covered Software, except that You may alter any license notices to
+the extent required to remedy known factual inaccuracies.
+
+3.5. Application of Additional Terms
+
+You may choose to offer, and to charge a fee for, warranty, support,
+indemnity or liability obligations to one or more recipients of Covered
+Software. However, You may do so only on Your own behalf, and not on
+behalf of any Contributor. You must make it absolutely clear that any
+such warranty, support, indemnity, or liability obligation is offered by
+You alone, and You hereby agree to indemnify every Contributor for any
+liability incurred by such Contributor as a result of warranty, support,
+indemnity or liability terms You offer. You may include additional
+disclaimers of warranty and limitations of liability specific to any
+jurisdiction.
+
+4. Inability to Comply Due to Statute or Regulation
+---------------------------------------------------
+
+If it is impossible for You to comply with any of the terms of this
+License with respect to some or all of the Covered Software due to
+statute, judicial order, or regulation then You must: (a) comply with
+the terms of this License to the maximum extent possible; and (b)
+describe the limitations and the code they affect. Such description must
+be placed in a text file included with all distributions of the Covered
+Software under this License. Except to the extent prohibited by statute
+or regulation, such description must be sufficiently detailed for a
+recipient of ordinary skill to be able to understand it.
+
+5. Termination
+--------------
+
+5.1. The rights granted under this License will terminate automatically
+if You fail to comply with any of its terms. However, if You become
+compliant, then the rights granted under this License from a particular
+Contributor are reinstated (a) provisionally, unless and until such
+Contributor explicitly and finally terminates Your grants, and (b) on an
+ongoing basis, if such Contributor fails to notify You of the
+non-compliance by some reasonable means prior to 60 days after You have
+come back into compliance. Moreover, Your grants from a particular
+Contributor are reinstated on an ongoing basis if such Contributor
+notifies You of the non-compliance by some reasonable means, this is the
+first time You have received notice of non-compliance with this License
+from such Contributor, and You become compliant prior to 30 days after
+Your receipt of the notice.
+
+5.2. If You initiate litigation against any entity by asserting a patent
+infringement claim (excluding declaratory judgment actions,
+counter-claims, and cross-claims) alleging that a Contributor Version
+directly or indirectly infringes any patent, then the rights granted to
+You by any and all Contributors for the Covered Software under Section
+2.1 of this License shall terminate.
+
+5.3. In the event of termination under Sections 5.1 or 5.2 above, all
+end user license agreements (excluding distributors and resellers) which
+have been validly granted by You or Your distributors under this License
+prior to termination shall survive termination.
+
+************************************************************************
+*                                                                      *
+*  6. Disclaimer of Warranty                                           *
+*  -------------------------                                           *
+*                                                                      *
+*  Covered Software is provided under this License on an "as is"       *
+*  basis, without warranty of any kind, either expressed, implied, or  *
+*  statutory, including, without limitation, warranties that the       *
+*  Covered Software is free of defects, merchantable, fit for a        *
+*  particular purpose or non-infringing. The entire risk as to the     *
+*  quality and performance of the Covered Software is with You.        *
+*  Should any Covered Software prove defective in any respect, You     *
+*  (not any Contributor) assume the cost of any necessary servicing,   *
+*  repair, or correction. This disclaimer of warranty constitutes an   *
+*  essential part of this License. No use of any Covered Software is   *
+*  authorized under this License except under this disclaimer.         *
+*                                                                      *
+************************************************************************
+
+************************************************************************
+*                                                                      *
+*  7. Limitation of Liability                                          *
+*  --------------------------                                          *
+*                                                                      *
+*  Under no circumstances and under no legal theory, whether tort      *
+*  (including negligence), contract, or otherwise, shall any           *
+*  Contributor, or anyone who distributes Covered Software as          *
+*  permitted above, be liable to You for any direct, indirect,         *
+*  special, incidental, or consequential damages of any character      *
+*  including, without limitation, damages for lost profits, loss of    *
+*  goodwill, work stoppage, computer failure or malfunction, or any    *
+*  and all other commercial damages or losses, even if such party      *
+*  shall have been informed of the possibility of such damages. This   *
+*  limitation of liability shall not apply to liability for death or   *
+*  personal injury resulting from such party's negligence to the       *
+*  extent applicable law prohibits such limitation. Some               *
+*  jurisdictions do not allow the exclusion or limitation of           *
+*  incidental or consequential damages, so this exclusion and          *
+*  limitation may not apply to You.                                    *
+*                                                                      *
+************************************************************************
+
+8. Litigation
+-------------
+
+Any litigation relating to this License may be brought only in the
+courts of a jurisdiction where the defendant maintains its principal
+place of business and such litigation shall be governed by laws of that
+jurisdiction, without reference to its conflict-of-law provisions.
+Nothing in this Section shall prevent a party's ability to bring
+cross-claims or counter-claims.
+
+9. Miscellaneous
+----------------
+
+This License represents the complete agreement concerning the subject
+matter hereof. If any provision of this License is held to be
+unenforceable, such provision shall be reformed only to the extent
+necessary to make it enforceable. Any law or regulation which provides
+that the language of a contract shall be construed against the drafter
+shall not be used to construe this License against a Contributor.
+
+10. Versions of the License
+---------------------------
+
+10.1. New Versions
+
+Mozilla Foundation is the license steward. Except as provided in Section
+10.3, no one other than the license steward has the right to modify or
+publish new versions of this License. Each version will be given a
+distinguishing version number.
+
+10.2. Effect of New Versions
+
+You may distribute the Covered Software under the terms of the version
+of the License under which You originally received the Covered Software,
+or under the terms of any subsequent version published by the license
+steward.
+
+10.3. Modified Versions
+
+If you create software not governed by this License, and you want to
+create a new license for such software, you may create and use a
+modified version of this License if you rename the license and remove
+any references to the name of the license steward (except to note that
+such modified license differs from this License).
+
+10.4. Distributing Source Code Form that is Incompatible With Secondary
+Licenses
+
+If You choose to distribute Source Code Form that is Incompatible With
+Secondary Licenses under the terms of this version of the License, the
+notice described in Exhibit B of this License must be attached.
+
+Exhibit A - Source Code Form License Notice
+-------------------------------------------
+
+  This Source Code Form is subject to the terms of the Mozilla Public
+  License, v. 2.0. If a copy of the MPL was not distributed with this
+  file, You can obtain one at https://mozilla.org/MPL/2.0/.
+
+If it is not possible or desirable to put the notice in a particular
+file, then You may include the notice in a location (such as a LICENSE
+file in a relevant directory) where a recipient would be likely to look
+for such a notice.
+
+You may add additional accurate notices of copyright ownership.
+
+Exhibit B - "Incompatible With Secondary Licenses" Notice
+---------------------------------------------------------
+
+  This Source Code Form is "Incompatible With Secondary Licenses", as
+  defined by the Mozilla Public License, v. 2.0.

+ 135 - 0
README.md

@@ -0,0 +1,135 @@
+# PQueue - Simple PHP Queue Library
+
+PQueue is a lightweight, framework-agnostic library for handling background jobs and messages with persistent queues.
+
+## Features
+
+-   **Multiple Transports**: Comes with `SQLite` and `Filesystem` transports.
+-   **DI-Friendly**: Designed to integrate cleanly with any PSR-11 dependency injection container.
+-   **Configurable Worker**: The queue worker can be configured with memory limits, time limits, retry strategies, and more.
+-   **Automatic Handler Discovery**: Scans specified directories to find your message handlers automatically.
+
+## Installation
+
+```bash
+composer require michel/pqueue
+```
+
+## Basic Usage (Without a Framework)
+
+This example shows how to use the library in a simple PHP script.
+
+**1. Create a Message and a Handler**
+
+```php
+// src/Messages/MyMessage.php
+namespace App\Messages;
+class MyMessage {
+    public string $text;
+    public function __construct(string $text) { $this->text = $text; }
+}
+
+// src/Handlers/MyMessageHandler.php
+namespace App\Handlers;
+use App\Messages\MyMessage;
+class MyMessageHandler {
+    public function __invoke(MyMessage $message) {
+        echo "Processing message: " . $message->text . "\n";
+    }
+}
+```
+
+**2. Dispatch a Message**
+
+```php
+// send_message.php
+require 'vendor/autoload.php';
+
+use Michel\PQueue\Transport\SQLiteTransport;
+use Michel\PQueue\PQueueDispatcher;
+use App\Messages\MyMessage;
+
+// 1. Create a transport
+$transport = SQLiteTransport::create(['db_path' => __DIR__ . '/pqueue.sqlite']);
+
+// 2. Create a dispatcher
+$dispatcher = new PQueueDispatcher($transport);
+
+// 3. Dispatch your message
+$dispatcher->dispatch(new MyMessage('Hello, World!'));
+
+echo "Message dispatched!\n";
+```
+
+**3. Run the Worker**
+
+The worker needs a `HandlerResolver` to get handler instances. For this simple example, we'll create a basic one.
+
+```php
+// worker.php
+require 'vendor/autoload.php';
+
+use Michel\PQueue\PQueueConsumerFactory;
+use Michel\PQueue\PQueueWorker;
+use Michel\PQueue\HandlerResolver\HandlerResolverInterface;
+use Michel\PQueue\Transport\SQLiteTransport;
+use App\Handlers\MyMessageHandler; // Import the handler class
+
+// 1. Create a simple handler resolver for the example
+$handlerResolver = new class implements HandlerResolverInterface {
+    private array $handlers = [];
+    public function getHandler(string $className): object {
+        if (!isset($this->handlers[$className])) {
+            $this->handlers[$className] = new $className();
+        }
+        return $this->handlers[$className];
+    }
+    public function hasHandler(string $className): bool {
+        return class_exists($className);
+    }
+};
+
+// 2. Create the transport
+$transport = SQLiteTransport::create(['db_path' => __DIR__ . '/pqueue.sqlite']);
+
+// 3. Use the factory to build the consumer
+$factory = new PQueueConsumerFactory(
+    $handlerResolver,
+    [
+        MyMessageHandler::class,  // You can add handler classes directly
+        __DIR__ . '/src/Handlers' // And also scan directories
+    ], 
+    __DIR__ . '/cache'            // Cache directory for handler discovery
+);
+$consumer = $factory->createConsumer();
+
+// 4. Create and run the worker
+$worker = new PQueueWorker($transport, $consumer, [
+    'stopWhenEmpty' => true, // Stop after processing all messages
+]);
+$worker->run();
+
+echo "Worker finished.\n";
+```
+
+## Worker Callbacks
+
+You can hook into the worker lifecycle using the following methods:
+
+- `onConsume(callable $callback)`: Executed after a message is successfully consumed.
+- `onFailure(callable $callback)`: Executed when a message fails processing.
+- `onStop(callable $callback)`: Executed when the worker stops (due to memory limit, time limit, or empty queue).
+
+```php
+$worker->onConsume(function ($message) {
+    echo "Message processed!\n";
+});
+
+$worker->onFailure(function ($message, $exception) {
+    echo "Message failed: " . $exception->getMessage() . "\n";
+});
+
+$worker->onStop(function () {
+    echo "Worker stopped.\n";
+});
+```

+ 30 - 0
composer.json

@@ -0,0 +1,30 @@
+{
+  "name": "michel/pqueue",
+  "description": "PQueue is a minimalist PHP library for processing background messages using a single persistent CLI (managed via systemd) or periodic execution (via cron), covering 90% of use cases without external dependencies or complex worker management.",
+  "type": "library",
+  "license": "MPL-2.0",
+  "authors": [
+    {
+      "name": "F. Michel"
+    }
+  ],
+  "autoload": {
+    "psr-4": {
+      "Michel\\PQueue\\": "src",
+      "Test\\Michel\\PQueue\\": "tests"
+    }
+  },
+  "require": {
+    "php": ">=7.4",
+    "ext-sqlite3": "*",
+    "ext-json": "*",
+    "psr/log": "^1.1|^2.0|^3.0",
+    "michel/michel-package-starter": "^1.0",
+    "michel/console": "^1.0",
+    "michel/options-resolver": "^1.0",
+    "psr/container": "^1.1|^2.0"
+  },
+  "require-dev": {
+    "michel/unitester": "^1.0.0"
+  }
+}

+ 122 - 0
src/Command/PQueueWorkerRunCommand.php

@@ -0,0 +1,122 @@
+<?php
+
+namespace Michel\PQueue\Command;
+
+use Michel\PQueue\PQueueConsumer;
+use Michel\PQueue\PQueueWorker;
+use Michel\PQueue\Transport\Message\Message;
+use Michel\PQueue\Transport\TransportInterface;
+use Michel\Console\Command\CommandInterface;
+use Michel\Console\InputInterface;
+use Michel\Console\Option\CommandOption;
+use Michel\Console\Output\ConsoleOutput;
+use Michel\Console\OutputInterface;
+
+
+class PQueueWorkerRunCommand implements CommandInterface
+{
+    private PQueueConsumer $consumer;
+    private TransportInterface $transport;
+
+    public function __construct(
+        PQueueConsumer $consumer,
+        TransportInterface $transport
+    ) {
+        $this->consumer = $consumer;
+        $this->transport = $transport;
+    }
+
+    public function getName(): string
+    {
+        return 'pqueue:worker:run';
+    }
+
+    public function getDescription(): string
+    {
+        return 'Run a worker to process queue tasks';
+    }
+
+    public function getOptions(): array
+    {
+        return [
+            CommandOption::flag('stop-when-empty', 's', 'Stop the worker if the queue is empty'),
+            CommandOption::withValue('memory-limit', 'm', 'The memory limit in megabytes (e.g., 128)', 128),
+            CommandOption::withValue('time-limit', 't', 'The maximum runtime in seconds (e.g., 3600 for 1 hour)', 3600),
+            CommandOption::withValue('sleep', null, 'Time in seconds to sleep if the queue is empty (e.g., 3)', 10),
+            CommandOption::withValue('max-retries', null, 'Maximum number of retries for a failed message (e.g., 5)', 3),
+            CommandOption::withValue('retry-delay', null, 'Initial delay in seconds before retrying a failed message (e.g., 60 for 1 minute)', 60),
+            CommandOption::withValue('retry-multiplier', null, 'Multiplier for exponential backoff between retries (e.g., 3)', 3),
+            CommandOption::withValue('message-delay', null, 'Delay in milliseconds between processing each message (e.g., 200)', 200),
+        ];
+    }
+
+    public function getArguments(): array
+    {
+        return [];
+    }
+
+    public function execute(InputInterface $input, OutputInterface $output): void
+    {
+        $io = new ConsoleOutput($output);
+        $io->writeln(' [OK] Worker started. Press Ctrl+C to stop.');
+        $io->writeln('');
+        $io->title('Configuration');
+
+        $workerOptions = [
+            'Stop when empty' => (bool)$input->getOptionValue('stop-when-empty') ? 'Yes' : 'No',
+            'Idle sleep' => (int)($input->getOptionValue('sleep')) . ' s',
+            'Memory limit' => (int)($input->getOptionValue('memory-limit')) . ' MB',
+            'Max runtime' => (int)($input->getOptionValue('time-limit')) . ' s',
+            'Max retries' => (int)($input->getOptionValue('max-retries')),
+            'Retry delay' => (int)($input->getOptionValue('retry-delay')) . ' s',
+            'Retry multiplier' => (int)($input->getOptionValue('retry-multiplier')),
+            'Message delay' => (int)($input->getOptionValue('message-delay')) . ' ms',
+        ];
+
+        foreach ($workerOptions as $key => $value) {
+            $io->writeln(sprintf(" %-20s : %s", $key, $value));
+        }
+        $io->writeln('');
+
+        $options = [
+            'stopWhenEmpty' => (bool)$input->getOptionValue('stop-when-empty'),
+            'idleSleepMs' => (int)($input->getOptionValue('sleep')  * 1000),
+            'maxMemory' => (int)($input->getOptionValue('memory-limit')),
+            'maxRuntimeSeconds' => (int)($input->getOptionValue('time-limit')),
+            'maxRetryAttempts' => (int)($input->getOptionValue('max-retries')),
+            'initialRetryDelayMs' => (int)($input->getOptionValue('retry-delay')) * 1000,
+            'retryBackoffMultiplier' => (int)($input->getOptionValue('retry-multiplier')),
+            'messageDelayMs' => (int)($input->getOptionValue('message-delay')),
+        ];
+
+        if ($io->isVerbose()) {
+            $io->writeln(' Verbose mode enabled. Showing message details.');
+            $io->writeln('');
+        }
+
+        // Use the factory to create and run the worker
+        $worker = new PQueueWorker($this->transport, $this->consumer, $options);
+        $worker->onConsume(function (Message  $msg) use ($io) {
+            $io->writeln(sprintf(" [OK] Message consumed  %s : after %s attempts", $msg->getId(), $msg->getAttempts()));
+            if ($io->isVerbose()) {
+                $io->writeln(sprintf(" [DEBUG] %s", $msg->getEnvelope()->getBody()));
+            }
+            $io->writeln('');
+        });
+
+        $worker->onFailure(function (Message  $msg, \Throwable $e) use ($io) {
+            $io->writeln(sprintf(" [ERROR] Message %s failed %s attempts : %s", $msg->getId(), $msg->getAttempts(), $e->getMessage()));
+            if ($io->isVerbose()) {
+                $io->writeln(json_encode(array_slice($e->getTrace(), 0, 5), JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES));
+            }
+            $io->writeln('');
+        });
+
+        $worker->onStop(function () use ($io) {
+            $io->writeln(' [INFO] Worker stopped.');
+        });
+
+        $io->writeln(' [INFO] Waiting for messages...');
+        $worker->run();
+    }
+}

+ 24 - 0
src/HandlerResolver/ContainerHandlerResolver.php

@@ -0,0 +1,24 @@
+<?php
+
+namespace Michel\PQueue\HandlerResolver;
+
+use Psr\Container\ContainerInterface;
+
+class ContainerHandlerResolver implements HandlerResolverInterface
+{
+    private ContainerInterface $container;
+    public function __construct(ContainerInterface $container)
+    {
+        $this->container = $container;
+    }
+
+    public function getHandler(string $handlerClassName): object
+    {
+        return $this->container->get($handlerClassName);
+    }
+
+    public function hasHandler(string $handlerClassName): bool
+    {
+        return $this->container->has($handlerClassName);
+    }
+}

+ 28 - 0
src/HandlerResolver/HandlerResolverInterface.php

@@ -0,0 +1,28 @@
+<?php
+
+namespace Michel\PQueue\HandlerResolver;
+
+/**
+ * Defines the contract for resolving a handler class string to an actual service instance.
+ * This allows integration with DI containers and service locators.
+ */
+interface HandlerResolverInterface
+{
+    /**
+     * Gets a handler instance from its class name.
+     *
+     * @param string $handlerClassName The fully qualified class name of the handler.
+     * @return object The handler service instance.
+     * @throws \Psr\Container\NotFoundExceptionInterface No handler was found for the given class name.
+     * @throws \Psr\Container\ContainerExceptionInterface Error while retrieving the handler.
+     */
+    public function getHandler(string $handlerClassName): object;
+
+    /**
+     * Checks if a handler for the given class name is available.
+     *
+     * @param string $handlerClassName The fully qualified class name of the handler.
+     * @return bool True if the handler is available, false otherwise.
+     */
+    public function hasHandler(string $handlerClassName): bool;
+}

+ 53 - 0
src/PQueueConsumer.php

@@ -0,0 +1,53 @@
+<?php
+
+namespace Michel\PQueue;
+
+final class PQueueConsumer
+{
+    /** @var callable[] List of handlers for processing messages */
+    private array $handlers = [];
+
+    /**
+     * @param callable[] $handlers List of callable handlers for processing messages
+     */
+    public function __construct(array $handlers)
+    {
+        foreach ($handlers as $payloadClass => $handler) {
+            if (!is_object($handler)) {
+                throw new \InvalidArgumentException(
+                    sprintf(
+                        'PQueueConsumer: Handler must be an object, %s given',
+                        gettype($handler)
+                    )
+                );
+            }
+            if (!is_callable($handler)) {
+                throw new \InvalidArgumentException(
+                    sprintf(
+                        'PQueueConsumer: Handler object "%s" must implement an __invoke() method',
+                        get_class($handler)
+                    )
+                );
+            }
+            if (!class_exists($payloadClass)) {
+                throw new \InvalidArgumentException(
+                    sprintf('PQueueConsumer: Unknown payload class "%s"', $payloadClass)
+                );
+            }
+            $this->handlers[$payloadClass] = $handler;
+        }
+    }
+
+    public function consume(object $payload)
+    {
+        $payloadClass = get_class($payload);
+        if (!isset($this->handlers[$payloadClass])) {
+            throw new \RuntimeException(sprintf(
+                'No handler found for payload of class "%s".',
+                $payloadClass
+            ));
+        }
+        $handler = $this->handlers[$payloadClass];
+        $handler($payload);
+    }
+}

+ 69 - 0
src/PQueueConsumerFactory.php

@@ -0,0 +1,69 @@
+<?php
+
+namespace Michel\PQueue;
+
+use LogicException;
+use Michel\PQueue\HandlerResolver\HandlerResolverInterface;
+
+final class PQueueConsumerFactory
+{
+    private HandlerResolverInterface $handlerResolver;
+    private array $handlerMap;
+
+    /**
+     * @param HandlerResolverInterface $handlerResolver
+     * @param array $handlerMap Map of message classes to handler classes
+     */
+    public function __construct(
+        HandlerResolverInterface $handlerResolver,
+        array $handlerMap
+    ) {
+        $this->handlerResolver = $handlerResolver;
+        $this->handlerMap = $handlerMap;
+    }
+
+    public function createConsumer(): PQueueConsumer
+    {
+        if (empty($this->handlerMap)) {
+            throw new LogicException('PQueueConsumerFactory requires at least one handler to be registered.');
+        }
+
+        $handlers = [];
+        foreach ($this->handlerMap as $messageClass => $handlerClass) {
+            if (!$this->handlerResolver->hasHandler($handlerClass)) {
+                throw new LogicException(sprintf(
+                    'Message handler "%s" was found by the finder, but it is not registered as a service in the container (or the resolver cannot find it).',
+                    $handlerClass
+                ));
+            }
+            $handlerInstance = $this->handlerResolver->getHandler($handlerClass);
+
+            if (!is_object($handlerInstance)) {
+                throw new \InvalidArgumentException(
+                    sprintf(
+                        'PQueueConsumerFactory: Resolved handler for message "%s" must be an object, %s given.',
+                        $messageClass,
+                        gettype($handlerInstance)
+                    )
+                );
+            }
+            if (!is_callable($handlerInstance)) {
+                throw new \InvalidArgumentException(
+                    sprintf(
+                        'PQueueConsumerFactory: Resolved handler object "%s" for message "%s" must implement an __invoke() method.',
+                        get_class($handlerInstance),
+                        $messageClass
+                    )
+                );
+            }
+            if (!class_exists($messageClass)) {
+                throw new \InvalidArgumentException(
+                    sprintf('PQueueConsumerFactory: Unknown payload class "%s" for handler "%s".', $messageClass, get_class($handlerInstance))
+                );
+            }
+            $handlers[$messageClass] = $handlerInstance;
+        }
+
+        return new PQueueConsumer($handlers);
+    }
+}

+ 40 - 0
src/PQueueDispatcher.php

@@ -0,0 +1,40 @@
+<?php
+
+namespace Michel\PQueue;
+
+use DateTimeImmutable;
+use DateTimeInterface;
+use Michel\PQueue\Serializer\MessageSerializer;
+use Michel\PQueue\Transport\Envelope;
+use Michel\PQueue\Transport\TransportInterface;
+
+final class PQueueDispatcher
+{
+    private TransportInterface $transport;
+
+    public function __construct(TransportInterface $transport)
+    {
+        $this->transport = $transport;
+    }
+
+    /**
+     * Dispatch a message object into the transport.
+     *
+     * @param object $message The message object to queue
+     * @param DateTimeInterface|null $availableAt When the message becomes available
+     * @param bool $retry Whether the message is retryable
+     */
+    public function dispatch(object $message, ?DateTimeInterface $availableAt = null, bool $retry = true): void
+    {
+        if ($availableAt === null) {
+            $availableAt = new DateTimeImmutable();
+        }
+        $envelope = new Envelope(
+            MessageSerializer::serialize($message),
+            $retry,
+            0,
+            $availableAt
+        );
+        $this->transport->send($envelope);
+    }
+}

+ 203 - 0
src/PQueueHandlerFinder.php

@@ -0,0 +1,203 @@
+<?php
+
+namespace Michel\PQueue;
+
+use InvalidArgumentException;
+use LogicException;
+use ReflectionClass;
+use RecursiveIteratorIterator;
+use RecursiveDirectoryIterator;
+
+final class PQueueHandlerFinder
+{
+    private const CACHE_FILENAME = 'pqueue_handler_map.php';
+
+    private array $sources = [];
+    private ?string $cacheDir;
+
+    public function __construct(array $sources, ?string $cacheDir = null)
+    {
+        foreach ($sources as $source) {
+            if (!is_dir($source) && !class_exists($source)) {
+                throw new InvalidArgumentException(sprintf('The source "%s" does not exist or is not a directory.', $source));
+            }
+            $this->sources[] = $source;
+        }
+
+        $this->cacheDir = $cacheDir;
+        if ($this->cacheDir && !is_dir($this->cacheDir)) {
+            throw new InvalidArgumentException(sprintf('Cache directory "%s" does not exist.', $this->cacheDir));
+        }
+    }
+
+    public function find(): array
+    {
+        if ($this->cacheIsEnabled()) {
+            $cachedResult = $this->loadFromCache();
+            if ($cachedResult !== null) {
+                return $cachedResult;
+            }
+        }
+
+        $handlerMap = $this->findAndMapHandlers();
+
+        if ($this->cacheIsEnabled()) {
+            $this->saveToCache($handlerMap);
+        }
+
+        return $handlerMap;
+    }
+
+    private function findAndMapHandlers(): array
+    {
+        $classes = $this->findAllHandlerClasses();
+        $handlerMap = [];
+
+        foreach ($classes as $class) {
+            try {
+                $reflection = new ReflectionClass($class);
+
+                if (!$reflection->hasMethod('__invoke')) {
+                    continue;
+                }
+
+                $invokeMethod = $reflection->getMethod('__invoke');
+                $parameters = $invokeMethod->getParameters();
+
+                if (count($parameters) !== 1) {
+                    continue;
+                }
+
+                $messageType = $parameters[0]->getType();
+                if ($messageType === null || $messageType->isBuiltin()) {
+                    continue;
+                }
+
+                $messageClassName = $messageType->getName();
+
+                if (isset($handlerMap[$messageClassName])) {
+                    throw new LogicException(sprintf(
+                        'Multiple handlers found for message "%s": ["%s", "%s"]. Only one handler per message is allowed.',
+                        $messageClassName,
+                        $handlerMap[$messageClassName],
+                        $class
+                    ));
+                }
+
+                $handlerMap[$messageClassName] = $class;
+
+            } catch (\ReflectionException $e) {
+                // Could log this error if needed
+                continue;
+            }
+        }
+        return $handlerMap;
+    }
+
+    private function findAllHandlerClasses(): array
+    {
+        $classes = [];
+        foreach ($this->sources as $source) {
+            if (class_exists($source)) {
+                $classes[] = $source;
+                continue;
+            }
+            if (is_dir($source)) {
+                $classes = array_merge($classes, $this->findHandlerClassesInDir($source));
+            }
+        }
+        return array_unique($classes);
+    }
+
+    private function findHandlerClassesInDir(string $directory): array
+    {
+        $classes = [];
+        $iterator = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($directory));
+
+        foreach ($iterator as $file) {
+            if ($file->isDir() || $file->getExtension() !== 'php') {
+                continue;
+            }
+
+            $className = self::extractNamespaceAndClass($file->getPathname());
+            if ($className && class_exists($className, true)) {
+                $classes[] = $className;
+            }
+        }
+        return $classes;
+    }
+
+    private function cacheIsEnabled(): bool
+    {
+        return $this->cacheDir !== null;
+    }
+
+    private function getCacheFile(): string
+    {
+        return rtrim($this->cacheDir, '/') . '/' . self::CACHE_FILENAME;
+    }
+
+    private function loadFromCache(): ?array
+    {
+        $cacheFile = $this->getCacheFile();
+        if (is_file($cacheFile)) {
+            return require $cacheFile;
+        }
+        return null;
+    }
+
+    private function saveToCache(array $handlerMap): void
+    {
+        $content = "<?php\n\n// This file is auto-generated by PQueueHandlerFinder\nreturn " . var_export($handlerMap, true) . ";\n";
+        file_put_contents($this->getCacheFile(), $content);
+    }
+
+    private static function extractNamespaceAndClass(string $filePath): ?string
+    {
+        if (!is_file($filePath)) {
+            return null;
+        }
+
+        $contents = file_get_contents($filePath);
+        $namespace = '';
+        $class = '';
+        $isExtractingNamespace = false;
+        $isExtractingClass = false;
+
+        $namespaceTokens = [T_STRING, T_NS_SEPARATOR];
+        if (defined('T_NAME_QUALIFIED')) {
+            $namespaceTokens[] = T_NAME_QUALIFIED;
+        }
+
+        foreach (token_get_all($contents) as $token) {
+            if (is_array($token) && $token[0] == T_NAMESPACE) {
+                $isExtractingNamespace = true;
+            }
+
+            if (is_array($token) && $token[0] == T_CLASS) {
+                $isExtractingClass = true;
+            }
+
+            if ($isExtractingNamespace) {
+                if (is_array($token) && in_array($token[0], $namespaceTokens)) {
+                    $namespace .= $token[1];
+                } else if ($token === ';') {
+                    $isExtractingNamespace = false;
+                }
+            }
+
+            if ($isExtractingClass) {
+                if (is_array($token) && $token[0] == T_STRING) {
+                    $class = $token[1];
+                    break;
+                }
+            }
+        }
+
+        if (empty($class)) {
+            return null;
+        }
+
+        return $namespace ? $namespace . '\\' . $class : $class;
+    }
+}

+ 206 - 0
src/PQueueWorker.php

@@ -0,0 +1,206 @@
+<?php
+
+namespace Michel\PQueue;
+
+use DateTimeImmutable;
+use Michel\PQueue\Serializer\MessageSerializer;
+use Michel\PQueue\Transport\Message\Message;
+use Michel\PQueue\Transport\TransportInterface;
+use Michel\Resolver\Option;
+use Michel\Resolver\OptionsResolver;
+use Throwable;
+
+final class PQueueWorker
+{
+    private TransportInterface $transport;
+    private PQueueConsumer $consumer;
+
+    private bool $stopWhenEmpty;
+
+    /** @var int Time in milliseconds to sleep when no message is available */
+    private int $idleSleepMs;
+
+    // Retry-related options
+    /** @var int Initial delay in milliseconds before retrying a failed message */
+    private int $initialRetryDelayMs;
+
+    /** @var int Maximum number of retry attempts for a failed message */
+    private int $maxRetryAttempts;
+
+    /** @var int Multiplier for exponential backoff between retries */
+    private int $retryBackoffMultiplier;
+
+    // Resource / runtime limits
+    /** @var int Maximum memory usage in bytes before gracefully stopping */
+    private int $maxMemoryBytes;
+
+    /** @var int Maximum runtime in seconds before gracefully stopping */
+    private int $maxRuntimeSeconds;
+
+    /** @var int Delay in milliseconds between processing each message */
+    private int $messageDelayMs;
+    private int $startTime;
+
+    /** @var callable[] */
+    private array $onConsumeCallbacks = [];
+
+    /** @var callable[] */
+    private array $onStopCallbacks = [];
+
+    /** @var callable[] */
+    private array $onFailureCallbacks = [];
+
+    /**
+     * @param TransportInterface $transport Transport implementation to pull messages from
+     * @param PQueueConsumer $consumer
+     * @param array{
+     *     idleSleepMs?: int,
+     *     initialRetryDelayMs?: int,
+     *     maxRetryAttempts?: int,
+     *     retryBackoffMultiplier?: int,
+     *     maxMemoryBytes?: int,
+     *     maxRuntimeSeconds?: int,
+     *     messageDelayMs?: int,
+     *     stopWhenEmpty?: bool
+     * } $options Worker options
+     */
+    public function __construct(TransportInterface $transport, PQueueConsumer $consumer, array $options)
+    {
+
+        $this->transport = $transport;
+        $this->consumer = $consumer;
+
+        $resolver = new OptionsResolver([
+            Option::bool('stopWhenEmpty')->setOptional(false),
+            Option::int('idleSleepMs')->setOptional(1000)->min(1),
+            Option::int('maxRetryAttempts')->setOptional(2)->min(0),
+            Option::int('initialRetryDelayMs')->setOptional(60000)->min(0),
+            Option::int('retryBackoffMultiplier')->setOptional(3)->min(1),
+            Option::int('maxMemory')->setOptional(100)->min(1),
+            Option::int('maxRuntimeSeconds')->setOptional(3600)->min(60),
+            Option::int('messageDelayMs')->setOptional(0)->min(0),
+        ]);
+        $optionsResolved = $resolver->resolve($options);
+
+
+        // Worker options
+        $this->stopWhenEmpty = $optionsResolved['stopWhenEmpty'];
+
+        // Sleep / idle
+        $this->idleSleepMs = $optionsResolved['idleSleepMs'];
+
+        // Retry options
+        $this->initialRetryDelayMs = $optionsResolved['initialRetryDelayMs']; // 1 min
+        $this->maxRetryAttempts = $optionsResolved['maxRetryAttempts'];
+        $this->retryBackoffMultiplier = $optionsResolved['retryBackoffMultiplier'];
+
+        // Resource / runtime limits
+        $this->maxMemoryBytes = $optionsResolved['maxMemory'] * 1024 * 1024;
+        $this->maxRuntimeSeconds = $optionsResolved['maxRuntimeSeconds'];
+
+        // Message delay
+        $this->messageDelayMs = $optionsResolved['messageDelayMs'];
+    }
+
+    public function onConsume(callable $callback): self
+    {
+        $this->onConsumeCallbacks[] = $callback;
+        return $this;
+    }
+
+    public function onStop(callable $callback): self
+    {
+        $this->onStopCallbacks[] = $callback;
+        return $this;
+    }
+
+    public function onFailure(callable $callback): self
+    {
+        $this->onFailureCallbacks[] = $callback;
+        return $this;
+    }
+
+    public function run(): void
+    {
+        $this->startTime = time();
+
+        while (true) {
+            $hasMessages = false;
+            foreach ($this->transport->getNextAvailableMessages() as $msg) {
+                $hasMessages = true;
+                try {
+                    if (!$msg instanceof Message) {
+                        throw new \RuntimeException(sprintf(
+                            'Worker expected an instance of Message from transport "%s", got "%s".',
+                            get_class($this->transport),
+                            is_object($msg) ? get_class($msg) : gettype($msg)
+                        ));
+                    }
+                    $payload = MessageSerializer::unSerialize($msg->getEnvelope()->getBody());
+                    $this->consumer->consume($payload);
+                    $this->transport->success($msg);
+                    foreach ($this->onConsumeCallbacks as $callback) {
+                        $callback($msg);
+                    }
+                } catch (Throwable $e) {
+                    $attempts = $msg->getAttempts() + 1;
+                    if ($msg->isRetry() && $attempts <= $this->maxRetryAttempts) {
+                        $delay = $this->initialRetryDelayMs * pow($this->retryBackoffMultiplier, $attempts);
+                        $availableAt = (new DateTimeImmutable())->modify("+{$delay} milliseconds");
+                        $this->transport->retry($msg, $e->getMessage(), $availableAt);
+                    } else {
+                        $this->transport->failed($msg, $e->getMessage());
+                    }
+                    foreach ($this->onFailureCallbacks as $callback) {
+                        $callback($msg, $e);
+                    }
+                }
+
+                if ($this->needToBreak()) {
+                    $this->triggerStop();
+                    break;
+                }
+
+                if ($this->messageDelayMs > 0) {
+                    usleep($this->messageDelayMs * 1000);
+                }
+            }
+
+            if ($this->needToBreak()) {
+                $this->triggerStop();
+                break;
+            }
+
+            if (!$hasMessages) {
+                if ($this->stopWhenEmpty) {
+                    $this->triggerStop();
+                    break;
+                }
+            }
+
+            usleep($this->idleSleepMs * 1000);
+        }
+    }
+
+
+
+    private function needToBreak(): bool
+    {
+        if (memory_get_usage(true) > $this->maxMemoryBytes) {
+            return true;
+        }
+
+        if ((time() - $this->startTime) > $this->maxRuntimeSeconds) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private function triggerStop(): void
+    {
+        foreach ($this->onStopCallbacks as $callback) {
+            $callback();
+        }
+    }
+}

+ 19 - 0
src/Serializer/MessageSerializer.php

@@ -0,0 +1,19 @@
+<?php
+
+namespace Michel\PQueue\Serializer;
+final class MessageSerializer
+{
+    public static function serialize(object $message): string
+    {
+        return serialize($message);
+    }
+
+    public static function unSerialize(string $serializedMessage): object
+    {
+        $obj = @unserialize($serializedMessage);
+        if (!is_object($obj)) {
+            throw new \InvalidArgumentException('Serialized message is not an object');
+        }
+        return $obj;
+    }
+}

+ 86 - 0
src/Transport/Envelope.php

@@ -0,0 +1,86 @@
+<?php
+
+namespace Michel\PQueue\Transport;
+
+use DateTimeImmutable;
+use DateTimeInterface;
+
+class Envelope
+{
+    private string $body;
+    private bool $retry;
+    private int $attempts;
+    private ?DateTimeInterface $availableAt;
+    private ?DateTimeInterface $lastFailureAt;
+    private ?string $errorMessage;
+
+    public function __construct(
+        string $body,
+        bool $retry,
+        int $attempts = 0,
+        ?DateTimeInterface $availableAt = null,
+        ?DateTimeInterface $lastFailureAt = null,
+        ?string $errorMessage = null
+    ) {
+        $this->body = $body;
+        $this->retry = $retry;
+        $this->attempts = $attempts;
+        $this->availableAt = $availableAt;
+        $this->lastFailureAt = $lastFailureAt;
+        $this->errorMessage = $errorMessage;
+    }
+
+    public function getBody(): string
+    {
+        return $this->body;
+    }
+
+    public function isRetry(): bool
+    {
+        return $this->retry;
+    }
+
+    public function getAttempts(): int
+    {
+        return $this->attempts;
+    }
+
+    public function getAvailableAt(): ?DateTimeInterface
+    {
+        return $this->availableAt;
+    }
+
+    public function getLastFailureAt(): ?DateTimeInterface
+    {
+        return $this->lastFailureAt;
+    }
+
+    public function getErrorMessage(): ?string
+    {
+        return $this->errorMessage;
+    }
+
+    public function toArray(): array
+    {
+        return [
+            'body' => $this->getBody(),
+            'retry' => $this->isRetry(),
+            'attempts' => $this->getAttempts(),
+            'availableAt' => $this->getAvailableAt() ? $this->getAvailableAt()->format('Y-m-d H:i:s') : null,
+            'lastFailureAt' => $this->getLastFailureAt() ? $this->getLastFailureAt()->format('Y-m-d H:i:s') : null,
+            'errorMessage' => $this->getErrorMessage(),
+        ];
+    }
+
+    public static function fromArray(array $data): self
+    {
+        return new self(
+            $data['body'],
+            (bool) $data['retry'],
+            (int) ($data['attempts'] ?? 0),
+            isset($data['availableAt']) ? DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $data['availableAt']) : null,
+            isset($data['lastFailureAt']) ? DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $data['lastFailureAt']) : null,
+            $data['errorMessage'] ?? null
+        );
+    }
+}

+ 166 - 0
src/Transport/FilesystemTransport.php

@@ -0,0 +1,166 @@
+<?php
+
+namespace Michel\PQueue\Transport;
+
+use DateTimeInterface;
+use LogicException;
+use Michel\PQueue\Transport\Message\Message;
+use RuntimeException;
+use SQLite3;
+
+class FilesystemTransport implements TransportInterface
+{
+    const MESSAGE_EXTENSION = '.message';
+    const FAILED_EXTENSION = '.failed';
+
+    private string $directory;
+    private array $processing = [];
+
+    public function __construct(string $directory)
+    {
+        if (!is_dir($directory) && !mkdir($directory, 0777, true) && !is_dir($directory)) {
+            throw new RuntimeException(sprintf('Directory "%s" was not created', $directory));
+        }
+
+        $this->directory = rtrim($directory, '/') . '/';
+    }
+
+
+    public function send(Envelope $message): void
+    {
+        $id = $this->generateUniqueId();
+        $this->write($id, $message);
+    }
+
+    public function getNextAvailableMessages(): iterable
+    {
+        $files = glob($this->directory . '*' . self::MESSAGE_EXTENSION);
+        usort($files, function ($a, $b) {
+            return filemtime($a) - filemtime($b);
+        });
+
+        foreach ($files as $file) {
+            if (!file_exists($file)) {
+                continue;
+            }
+            $content = file_get_contents($file);
+            $data = json_decode($content, true);
+            $envelope = Envelope::fromArray($data);
+
+            if ($envelope->getAvailableAt() && $envelope->getAvailableAt()->getTimestamp() > time()) {
+                continue;
+            }
+
+            if (isset($this->processing[$data['id']])) {
+                continue;
+            }
+
+            $this->processing[$data['id']] = true;
+            yield new Message(
+                $data['id'],
+                $envelope
+            );
+        }
+    }
+
+    public function success(Message $message): void
+    {
+        $filename = $this->generateFilenameById($message->getId());
+        if (file_exists($filename)) {
+            unlink($filename);
+        }
+        unset($this->processing[$message->getId()]);
+    }
+
+    public function retry(Message $message, string $errorMessage, DateTimeInterface $availableAt): void
+    {
+        $envelope = $this->read($message->getId());
+
+        $newEnvelope = new Envelope(
+            $envelope->getBody(),
+            $envelope->isRetry(),
+            $envelope->getAttempts() + 1,
+            $availableAt,
+            new \DateTimeImmutable(),
+            $errorMessage
+        );
+
+        $this->write($message->getId(), $newEnvelope);
+        unset($this->processing[$message->getId()]);
+    }
+
+    public function failed(Message $message, string $errorMessage): void
+    {
+        $envelope = $this->read($message->getId());
+
+        $newEnvelope = new Envelope(
+            $envelope->getBody(),
+            $envelope->isRetry(),
+            $envelope->getAttempts() + 1,
+            $envelope->getAvailableAt(),
+            new \DateTimeImmutable(),
+            $errorMessage
+        );
+
+        $this->write($message->getId(), $newEnvelope);
+
+        $filename = $this->generateFilenameById($message->getId());
+        rename($filename, $filename . self::FAILED_EXTENSION);
+        unset($this->processing[$message->getId()]);
+    }
+
+    public function supportMultiWorker(): bool
+    {
+        return false;
+    }
+
+    private function read(string $id): Envelope
+    {
+        $filename = $this->generateFilenameById($id);
+        if (!file_exists($filename)) {
+            throw new RuntimeException(sprintf('Message file "%s" does not exist for id "%s"', $filename, $id));
+        }
+
+        $content = file_get_contents($filename);
+        $data = json_decode($content, true);
+        return Envelope::fromArray($data);
+    }
+
+    private function write(string $id, Envelope $envelope): void
+    {
+        $filename = $this->generateFilenameById($id);
+        $data = array_merge(['id' => $id], $envelope->toArray());
+        $result = @file_put_contents($filename, json_encode($data), LOCK_EX);
+        if ($result === false) {
+            throw new RuntimeException(sprintf('Could not write message to file "%s"', $filename));
+        }
+    }
+
+    private function generateUniqueId(): string
+    {
+        do {
+            $id = uniqid(date("Ymd_His_") . gettimeofday()["usec"]);
+            $fileName = $this->generateFilenameById($id);
+        } while (file_exists($fileName));
+
+        return $id;
+    }
+
+    private function generateFilenameById(string $id): string
+    {
+        return sprintf("%s%s%s", $this->directory, $id, self::MESSAGE_EXTENSION);
+    }
+
+    public static function create(array $options): TransportInterface
+    {
+        if (!isset($options["directory"])) {
+            throw new \LogicException('The "directory" option must be set');
+        }
+
+        if (!is_string($options["directory"])) {
+            throw new \LogicException('The "directory" option must be a string');
+        }
+
+        return new FilesystemTransport($options["directory"]);
+    }
+}

+ 39 - 0
src/Transport/Message/Message.php

@@ -0,0 +1,39 @@
+<?php
+
+namespace Michel\PQueue\Transport\Message;
+
+use Michel\PQueue\Transport\Envelope;
+
+final class Message
+{
+    private string $id;
+    private Envelope $envelope;
+
+    public function __construct(
+        string $id,
+        Envelope $envelope
+    ) {
+        $this->id = $id;
+        $this->envelope = $envelope;
+    }
+
+    public function getId(): string
+    {
+        return $this->id;
+    }
+
+    public function getEnvelope(): Envelope
+    {
+        return $this->envelope;
+    }
+
+    public function getAttempts(): int
+    {
+        return $this->envelope->getAttempts();
+    }
+
+    public function isRetry(): bool
+    {
+        return $this->envelope->isRetry();
+    }
+}

+ 170 - 0
src/Transport/SQLiteTransport.php

@@ -0,0 +1,170 @@
+<?php
+
+namespace Michel\PQueue\Transport;
+
+use DateTimeInterface;
+use LogicException;
+use Michel\PQueue\Transport\Message\Message;
+use RuntimeException;
+use SQLite3;
+
+class SQLiteTransport implements TransportInterface
+{
+    private SQLite3 $db;
+
+    public function __construct(string $dbPath)
+    {
+        $dir = dirname($dbPath);
+        if (!is_dir($dir)) {
+            throw new LogicException(sprintf(
+                "The SQLite directory does not exist: %s",
+                $dir
+            ));
+        }
+        if (!file_exists($dbPath)) {
+            $created = @touch($dbPath);
+            if (!$created) {
+                throw new RuntimeException(sprintf(
+                    "Unable to create the SQLite database file: %s",
+                    $dbPath
+                ));
+            }
+        }
+
+        $this->db = new \SQLite3($dbPath);
+        $this->db->exec('PRAGMA journal_mode = WAL;');
+        $this->db->busyTimeout(5000);
+        $this->initializeDatabase();
+
+    }
+
+    private function initializeDatabase(): void
+    {
+        $sql = <<<SQL
+            CREATE TABLE IF NOT EXISTS pqueue_messages (
+                id INTEGER PRIMARY KEY AUTOINCREMENT,
+                body TEXT NOT NULL,
+                retry BOOLEAN NOT NULL,
+                attempts INTEGER DEFAULT 0,
+                available_at DATETIME DEFAULT NULL,
+                last_failure_at DATETIME DEFAULT NULL,
+                error_message TEXT DEFAULT NULL,
+                status TEXT DEFAULT 'PENDING',
+                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
+            );
+            CREATE INDEX IF NOT EXISTS idx_pqueue_status_available ON pqueue_messages (status, available_at);
+            SQL;
+
+        if (!$this->db->exec($sql)) {
+            throw new RuntimeException('Failed to create SQLite tables: ' . $this->db->lastErrorMsg());
+        }
+    }
+
+    public function send(Envelope $message): void
+    {
+        $availableAt = $message->getAvailableAt() ?: new \DateTime(date('Y-m-d H:i:00'));
+        $availableAt  = (clone $availableAt)->setTimezone(new \DateTimeZone('UTC'));
+        $stmt = $this->db->prepare('INSERT INTO pqueue_messages (body, retry, available_at) VALUES (:body, :retry, :availableAt)');
+        $stmt->bindValue(':body', $message->getBody(), SQLITE3_BLOB);
+        $stmt->bindValue(':retry', $message->isRetry() ? 1 : 0, SQLITE3_INTEGER);
+        $stmt->bindValue(':availableAt', $availableAt->format('Y-m-d H:i:s'), SQLITE3_TEXT);
+        $stmt->execute();
+    }
+
+    /**
+     * @return iterable<Message>
+     */
+    public function getNextAvailableMessages(): iterable
+    {
+        $stmt = $this->db->prepare(
+            <<<SQL
+        SELECT id, body, retry, attempts, available_at as availableAt, last_failure_at as lastFailureAt, error_message as errorMessage, status  FROM pqueue_messages
+        WHERE (status = 'PENDING' OR status = 'RETRY') AND available_at <= datetime('now')
+        ORDER BY id ASC
+        LIMIT 100
+        SQL
+        );
+        $result = $stmt->execute();
+        $items = [];
+        while ($row = $result->fetchArray(SQLITE3_ASSOC)) {
+            $items[] = $row;
+        }
+        $result->finalize();
+
+        foreach ($items as $row) {
+            $stmtUpdate = $this->db->prepare("UPDATE pqueue_messages SET status = 'PROCESSING' WHERE id = :id AND status != 'PROCESSING'");
+            $stmtUpdate->bindValue(':id', $row['id'], SQLITE3_INTEGER);
+            $stmtUpdate->execute();
+            if ($this->db->changes() > 0) {
+                $envelope = Envelope::fromArray($row);
+                yield new Message(
+                    $row['id'],
+                    $envelope
+                );
+            }
+        }
+        unset($items);
+    }
+
+    public function success(Message $message): void
+    {
+        $stmt = $this->db->prepare("DELETE FROM pqueue_messages WHERE id = :id");
+        $stmt->bindValue(':id', $message->getId(), SQLITE3_INTEGER);
+        $stmt->execute();
+    }
+
+    public function retry(Message $message, string $errorMessage, DateTimeInterface $availableAt): void
+    {
+        $availableAt  = (clone $availableAt)->setTimezone(new \DateTimeZone('UTC'));
+        $stmt = $this->db->prepare(
+            <<<SQL
+            UPDATE pqueue_messages
+            SET attempts = attempts + 1,
+                status = 'RETRY',
+                available_at = :availableAt,
+                error_message = :errorMessage,
+                last_failure_at = datetime('now')
+            WHERE id = :id
+        SQL
+        );
+        $stmt->bindValue(':id', $message->getId(), SQLITE3_INTEGER);
+        $stmt->bindValue(':availableAt', $availableAt->format('Y-m-d H:i:s'), SQLITE3_TEXT);
+        $stmt->bindValue(':errorMessage', $errorMessage, SQLITE3_TEXT);
+        $stmt->execute();
+    }
+
+    public function failed(Message $message, string $errorMessage): void
+    {
+        $stmt = $this->db->prepare(
+            <<<SQL
+            UPDATE pqueue_messages
+            SET attempts = attempts + 1,
+                status = 'FAILED',
+                error_message = :errorMessage,
+                last_failure_at = datetime('now')
+            WHERE id = :id
+        SQL
+        );
+        $stmt->bindValue(':id', $message->getId(), SQLITE3_INTEGER);
+        $stmt->bindValue(':errorMessage', $errorMessage, SQLITE3_TEXT);
+        $stmt->execute();
+    }
+
+    public function supportMultiWorker(): bool
+    {
+        return false;
+    }
+
+    public static function create(array $options): TransportInterface
+    {
+        if (!isset($options["db_path"])) {
+            throw new \LogicException('The "db_path" option must be set');
+        }
+
+        if (!is_string($options["db_path"])) {
+            throw new \LogicException('The "db_path" option must be a string');
+        }
+
+        return new SQLiteTransport($options["db_path"]);
+    }
+}

+ 16 - 0
src/Transport/TransportInterface.php

@@ -0,0 +1,16 @@
+<?php
+
+namespace Michel\PQueue\Transport;
+
+use Michel\PQueue\Transport\Message\Message;
+
+interface TransportInterface
+{
+    public function send(Envelope $message): void;
+    public function getNextAvailableMessages(): iterable;
+    public function success(Message $message): void;
+    public function retry(Message $message, string $errorMessage, \DateTimeInterface $availableAt): void;
+    public function failed(Message $message, string $errorMessage): void;
+    public function supportMultiWorker(): bool;
+    public static function create(array $options): self;
+}

+ 93 - 0
tests/Command/PQueueWorkerRunCommandTest.php

@@ -0,0 +1,93 @@
+<?php
+
+namespace Test\Michel\PQueue\Command;
+
+use Michel\PQueue\Command\PQueueWorkerRunCommand;
+use Michel\PQueue\PQueueConsumer;
+use Michel\PQueue\Transport\FilesystemTransport;
+use Michel\UniTester\TestCase;
+use Michel\Console\CommandParser;
+use Michel\Console\CommandRunner;
+use Michel\Console\InputInterface;
+use Michel\Console\Output;
+use Michel\Console\OutputInterface;
+
+class PQueueWorkerRunCommandTest extends TestCase
+{
+    private string $transportDir;
+
+    protected function setUp(): void
+    {
+        $this->transportDir = sys_get_temp_dir() . '/pqueue_test_' . uniqid();
+        if (!is_dir($this->transportDir)) {
+            mkdir($this->transportDir);
+        }
+    }
+
+    protected function tearDown(): void
+    {
+        if (is_dir($this->transportDir)) {
+            $this->recursiveRemove($this->transportDir);
+        }
+    }
+
+    private function recursiveRemove(string $dir): void
+    {
+        $files = array_diff(scandir($dir), ['.', '..']);
+        foreach ($files as $file) {
+            (is_dir("$dir/$file")) ? $this->recursiveRemove("$dir/$file") : unlink("$dir/$file");
+        }
+        rmdir($dir);
+    }
+
+    protected function execute(): void
+    {
+        $this->testExecute();
+        $this->testExecuteWithAllOptions();
+    }
+
+    public function testExecute()
+    {
+        $transport = new FilesystemTransport($this->transportDir);
+        $consumer = new PQueueConsumer([]);
+
+        $runner = new CommandRunner([
+            new PQueueWorkerRunCommand($consumer, $transport),
+        ]);
+        $out = [];
+        $code = $runner->run(new CommandParser(['', 'pqueue:worker:run', '--stop-when-empty']), new Output(function ($message) use (&$out) {
+            $out[] = $message;
+        }));
+        $this->assertEquals(0, $code);
+        $this->assertCount(31, $out);
+
+    }
+
+    public function testExecuteWithAllOptions()
+    {
+        $transport = new FilesystemTransport($this->transportDir);
+        $consumer = new PQueueConsumer([]);
+
+        $runner = new CommandRunner([
+            new PQueueWorkerRunCommand($consumer, $transport),
+        ]);
+        $out = [];
+        $code = $runner->run(new CommandParser([
+            '',
+            'pqueue:worker:run',
+            '--stop-when-empty',
+            '--sleep=1',
+            '--memory-limit=128',
+            '--time-limit=60',
+            '--max-retries=1',
+            '--retry-delay=1',
+            '--retry-multiplier=1',
+            '--message-delay=10'
+        ]), new Output(function ($message) use (&$out) {
+            $out[] = $message;
+        }));
+        $this->assertEquals(0, $code);
+        $this->assertCount(31, $out);
+
+    }
+}

+ 10 - 0
tests/Extra/AnotherDummyHandler.php

@@ -0,0 +1,10 @@
+<?php
+
+namespace Test\Michel\PQueue\Extra;
+
+class AnotherDummyHandler
+{
+    public function __invoke(TestMessage $message)
+    {
+    }
+}

+ 12 - 0
tests/Extra/AnotherTestMessageHandler.php

@@ -0,0 +1,12 @@
+<?php
+
+namespace Test\Michel\PQueue\Extra;
+
+
+class AnotherTestMessageHandler
+{
+    public function __invoke(TestMessage $message)
+    {
+        // Handler logic
+    }
+}

+ 10 - 0
tests/Extra/DummyHandler.php

@@ -0,0 +1,10 @@
+<?php
+
+namespace Test\Michel\PQueue\Extra;
+
+class DummyHandler
+{
+    public function __invoke(TestMessage $message)
+    {
+    }
+}

+ 12 - 0
tests/Extra/InvalidHandler.php

@@ -0,0 +1,12 @@
+<?php
+
+namespace Test\Michel\PQueue\Extra;
+
+
+class InvalidHandler
+{
+    public function __invoke(TestMessage $message, bool $anotherArg)
+    {
+        // Invalid signature
+    }
+}

+ 8 - 0
tests/Extra/NotAHandler.php

@@ -0,0 +1,8 @@
+<?php
+
+namespace Test\Michel\PQueue\Extra;
+
+class NotAHandler
+{
+    // Not a handler, should be ignored by the finder.
+}

+ 7 - 0
tests/Extra/TestMessage.php

@@ -0,0 +1,7 @@
+<?php
+
+namespace Test\Michel\PQueue\Extra;
+
+class TestMessage
+{
+}

+ 12 - 0
tests/Extra/TestMessageHandler.php

@@ -0,0 +1,12 @@
+<?php
+
+namespace Test\Michel\PQueue\Extra;
+
+
+class TestMessageHandler
+{
+    public function __invoke(TestMessage $message)
+    {
+        // Handler logic
+    }
+}

+ 126 - 0
tests/FilesystemTransportTest.php

@@ -0,0 +1,126 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use DateTimeImmutable;
+use Michel\UniTester\TestCase;
+use Michel\PQueue\Transport\FilesystemTransport;
+use Michel\PQueue\Transport\Envelope;
+use Michel\PQueue\Transport\Message\Message;
+
+class FilesystemTransportTest extends TestCase
+{
+    private string $testDir;
+
+    protected function setUp(): void
+    {
+        $this->testDir = sys_get_temp_dir() . '/pqueue_test_' . uniqid('', true);
+        if (is_dir($this->testDir)) {
+            $this->removeDirectory($this->testDir);
+        }
+        mkdir($this->testDir);
+    }
+
+    protected function tearDown(): void
+    {
+        $this->removeDirectory($this->testDir);
+    }
+
+    private function removeDirectory($dir)
+    {
+        if (!is_dir($dir)) {
+            return;
+        }
+        $files = array_diff(scandir($dir), ['.', '..']);
+        foreach ($files as $file) {
+            (is_dir("$dir/$file")) ? $this->removeDirectory("$dir/$file") : unlink("$dir/$file");
+        }
+        rmdir($dir);
+    }
+
+    protected function execute(): void
+    {
+        $this->testSendAndGetNext();
+        $this->testSuccess();
+        $this->testRetry();
+        $this->testFailed();
+    }
+
+    private function cleanDir()
+    {
+        $files = glob($this->testDir . '/*');
+        foreach ($files as $file) {
+            if (is_file($file)) unlink($file);
+        }
+    }
+
+    public function testSendAndGetNext()
+    {
+        $this->cleanDir();
+        $transport = new FilesystemTransport($this->testDir);
+        $envelope = new Envelope('test_body', true, 0);
+
+        $transport->send($envelope);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+
+        $this->assertCount(1, $messages);
+        $this->assertInstanceOf(Message::class, $messages[0]);
+        $this->assertEquals('test_body', $messages[0]->getEnvelope()->getBody());
+    }
+
+    public function testSuccess()
+    {
+        $this->cleanDir();
+        $transport = new FilesystemTransport($this->testDir);
+        $envelope = new Envelope('test_body', true, 0);
+        $transport->send($envelope);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $message = $messages[0];
+
+        $transport->success($message);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $this->assertCount(0, $messages);
+    }
+
+    public function testRetry()
+    {
+        $this->cleanDir();
+        $transport = new FilesystemTransport($this->testDir);
+        $envelope = new Envelope('test_body', true, 0);
+        $transport->send($envelope);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $message = $messages[0];
+
+        $availableAt = (new DateTimeImmutable())->modify('+1 minute');
+        $transport->retry($message, 'error', $availableAt);
+
+        // Should not be available immediately
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $this->assertCount(0, $messages);
+    }
+
+    public function testFailed()
+    {
+        $this->cleanDir();
+        $transport = new FilesystemTransport($this->testDir);
+        $envelope = new Envelope('test_body', true, 0);
+        $transport->send($envelope);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $message = $messages[0];
+
+        $transport->failed($message, 'fatal error');
+
+        // Should be moved to failed file, so not available
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $this->assertCount(0, $messages);
+
+        // Verify failed file exists
+        $failedFiles = glob($this->testDir . '/*.failed');
+        $this->assertCount(1, $failedFiles);
+    }
+}

+ 89 - 0
tests/MessageEnvelopeTest.php

@@ -0,0 +1,89 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use DateTimeImmutable;
+use Michel\UniTester\TestCase;
+use Michel\PQueue\Transport\Envelope;
+
+class MessageEnvelopeTest extends TestCase
+{
+    protected function execute(): void
+    {
+        $this->testConstructAndGetters();
+        $this->testToArrayAndFromArray();
+    }
+
+    public function testConstructAndGetters()
+    {
+        $body = 'test_body';
+        $retry = true;
+        $attempts = 1;
+        $availableAt = new DateTimeImmutable();
+        $lastFailureAt = new DateTimeImmutable();
+        $errorMessage = 'error';
+
+        $envelope = new Envelope(
+            $body,
+            $retry,
+            $attempts,
+            $availableAt,
+            $lastFailureAt,
+            $errorMessage
+        );
+
+        $this->assertEquals($body, $envelope->getBody());
+        $this->assertEquals($retry, $envelope->isRetry());
+        $this->assertEquals($attempts, $envelope->getAttempts());
+        $this->assertEquals($availableAt, $envelope->getAvailableAt());
+        $this->assertEquals($lastFailureAt, $envelope->getLastFailureAt());
+        $this->assertEquals($errorMessage, $envelope->getErrorMessage());
+    }
+
+    public function testToArrayAndFromArray()
+    {
+        $body = 'test_body';
+        $retry = true;
+        $attempts = 2;
+        $availableAt = new DateTimeImmutable('2023-01-01 10:00:00');
+        $lastFailureAt = new DateTimeImmutable('2023-01-01 11:00:00');
+        $errorMessage = 'failure';
+
+        $envelope = new Envelope(
+            $body,
+            $retry,
+            $attempts,
+            $availableAt,
+            $lastFailureAt,
+            $errorMessage
+        );
+
+        $array = $envelope->toArray();
+
+        $this->assertEquals($body, $array['body']);
+        $this->assertEquals($retry, $array['retry']);
+        $this->assertEquals($attempts, $array['attempts']);
+        $this->assertEquals($availableAt->format('Y-m-d H:i:s'), $array['availableAt']);
+        $this->assertEquals($lastFailureAt->format('Y-m-d H:i:s'), $array['lastFailureAt']);
+        $this->assertEquals($errorMessage, $array['errorMessage']);
+
+        $newEnvelope = Envelope::fromArray($array);
+
+        $this->assertEquals($envelope->getBody(), $newEnvelope->getBody());
+        $this->assertEquals($envelope->isRetry(), $newEnvelope->isRetry());
+        $this->assertEquals($envelope->getAttempts(), $newEnvelope->getAttempts());
+        $this->assertEquals($envelope->getAvailableAt()->getTimestamp(), $newEnvelope->getAvailableAt()->getTimestamp());
+        $this->assertEquals($envelope->getLastFailureAt()->getTimestamp(), $newEnvelope->getLastFailureAt()->getTimestamp());
+        $this->assertEquals($envelope->getErrorMessage(), $newEnvelope->getErrorMessage());
+    }
+
+    protected function setUp(): void
+    {
+        // TODO: Implement setUp() method.
+    }
+
+    protected function tearDown(): void
+    {
+        // TODO: Implement tearDown() method.
+    }
+}

+ 65 - 0
tests/MessageSerializerTest.php

@@ -0,0 +1,65 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use Michel\UniTester\TestCase;
+use Michel\PQueue\Serializer\MessageSerializer;
+
+class MessageSerializerTest extends TestCase
+{
+    protected function execute(): void
+    {
+        $this->testSerializeObject();
+        $this->testUnSerializeObject();
+    }
+
+    public function testSerializeObject()
+    {
+        $obj = new \stdClass();
+        $obj->foo = 'bar';
+        $serialized = MessageSerializer::serialize($obj);
+
+        $this->assertStringContains($serialized, 'foo');
+        $this->assertStringContains($serialized, 'bar');
+    }
+
+    public function testSerializeArray()
+    {
+        $arr = ['foo' => 'bar'];
+        $serialized = MessageSerializer::serialize($arr);
+
+        $this->assertStringContains($serialized, 'foo');
+        $this->assertStringContains($serialized, 'bar');
+    }
+
+    public function testUnSerializeObject()
+    {
+        $obj = new \stdClass();
+        $obj->foo = 'bar';
+        $serialized = MessageSerializer::serialize($obj);
+        $unserialized = MessageSerializer::unSerialize($serialized);
+
+        $this->assertInstanceOf(\stdClass::class, $unserialized);
+        $this->assertEquals($obj->foo, $unserialized->foo);
+    }
+
+    public function testUnSerializeArray()
+    {
+        $arr = ['foo' => 'bar'];
+        $serialized = MessageSerializer::serialize($arr);
+        $unserialized = MessageSerializer::unSerialize($serialized);
+
+        $this->assertTrue(is_array($unserialized));
+        $this->assertEquals($arr['foo'], $unserialized['foo']);
+    }
+
+    protected function setUp(): void
+    {
+        // TODO: Implement setUp() method.
+    }
+
+    protected function tearDown(): void
+    {
+        // TODO: Implement tearDown() method.
+    }
+}

+ 162 - 0
tests/PQueueConsumerFactoryTest.php

@@ -0,0 +1,162 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use Michel\PQueue\PQueueHandlerFinder;
+use Michel\UniTester\TestCase;
+use Michel\PQueue\HandlerResolver\ContainerHandlerResolver;
+use Michel\PQueue\HandlerResolver\HandlerResolverInterface;
+use Michel\PQueue\PQueueConsumer;
+use Michel\PQueue\PQueueConsumerFactory;
+use Psr\Container\ContainerInterface;
+use Psr\Container\NotFoundExceptionInterface;
+use LogicException;
+use Test\Michel\PQueue\Extra\TestMessage;
+use Test\Michel\PQueue\Extra\TestMessageHandler;
+
+class PQueueConsumerFactoryTest extends TestCase
+{
+    private array $tempDirs = [];
+
+    protected function setUp(): void
+    {
+        // All mocks and temporary directories will be created within each test method for clarity and isolation.
+    }
+
+    protected function tearDown(): void
+    {
+        // Clean up all temporary directories created during tests.
+        foreach ($this->tempDirs as $dir) {
+            if (is_dir($dir)) {
+                $this->deleteDirectory($dir);
+            }
+        }
+        $this->tempDirs = [];
+    }
+
+    // --- Helper Mocks (defined once, used in tests) ---
+
+    /**
+     * Creates a mock PSR-11 ContainerInterface.
+     * @param bool $withHandler If true, the container will have 'App\MyTestHandler' registered.
+     */
+    private function createMockContainer(bool $withHandler = true): ContainerInterface
+    {
+        $container = new class implements ContainerInterface {
+            private array $services = [];
+            public function get(string $id) {
+                if (!array_key_exists($id, $this->services)) { // Use array_key_exists for robustness
+                    throw new \Exception("Service $id not found.");
+                }
+                return $this->services[$id];
+            }
+            public function has(string $id): bool { return array_key_exists($id, $this->services); } // Use array_key_exists for robustness
+            public function set(string $id, object $service): void { $this->services[$id] = $service; }
+        };
+        if ($withHandler) {
+            // Use a real handler class from your Extra directory for more realistic testing
+            $container->set(TestMessageHandler::class, new TestMessageHandler());
+        }
+        return $container;
+    }
+
+    /**
+     * Creates a mock HandlerResolverInterface.
+     * @param ContainerInterface $container The container to back the resolver.
+     */
+    private function createMockHandlerResolver(ContainerInterface $container): HandlerResolverInterface
+    {
+        return new ContainerHandlerResolver($container);
+    }
+
+    // --- Test Execution ---
+
+    public function execute(): void
+    {
+        $this->testFactoryCreatesConsumerSuccessfully();
+        $this->testFactoryFailsWithoutHandlerSource();
+        $this->testFactoryFailsIfHandlerIsNotInContainer();
+    }
+
+    // --- Individual Tests ---
+
+    private function testFactoryCreatesConsumerSuccessfully()
+    {
+        // Arrange
+        $container = $this->createMockContainer(); // Container has the handler
+        $resolver = $this->createMockHandlerResolver($container);
+
+        $sourceDir = $this->createTempDir();
+        // Copy a real handler and message to the temp source dir
+        copy(__DIR__ . '/Extra/TestMessage.php', $sourceDir . '/TestMessage.php');
+        copy(__DIR__ . '/Extra/TestMessageHandler.php', $sourceDir . '/TestMessageHandler.php');
+
+        $finder  = new PQueueHandlerFinder([$sourceDir]);
+        $factory = new PQueueConsumerFactory($resolver, $finder->find());
+
+        // Act
+        $consumer = $factory->createConsumer();
+
+        // Assert
+        $this->assertInstanceOf(PQueueConsumer::class, $consumer);
+    }
+
+    private function testFactoryFailsWithoutHandlerSource()
+    {
+        // Arrange
+        $container = $this->createMockContainer(false); // No handler needed if source is missing
+        $resolver = $this->createMockHandlerResolver($container);
+
+        // Assert
+        $this->expectException(LogicException::class, function () use ($resolver) {
+            // Act
+
+            $factory = new PQueueConsumerFactory($resolver, []); // This MUST throw LogicException
+            $factory->createConsumer();
+        });
+    }
+
+    private function testFactoryFailsIfHandlerIsNotInContainer()
+    {
+        // Arrange
+        $container = $this->createMockContainer(false); // Container WITHOUT the handler
+        $resolver = $this->createMockHandlerResolver($container);
+
+        $sourceDir = $this->createTempDir();
+        // Copy a handler that the container *doesn't* know about
+        copy(__DIR__ . '/Extra/TestMessage.php', $sourceDir . '/TestMessage.php');
+        copy(__DIR__ . '/Extra/TestMessageHandler.php', $sourceDir . '/TestMessageHandler.php');
+
+        // Assert
+        $this->expectException(LogicException::class, function () use ($resolver, $sourceDir) {
+            // Act
+            $finder  = new PQueueHandlerFinder([$sourceDir]);
+            $factory = new PQueueConsumerFactory($resolver, $finder->find()); // This MUST throw LogicException because the handler is not in the container
+            $factory->createConsumer();
+        });
+    }
+
+    // --- Utility Helpers ---
+
+    private function createTempDir(): string
+    {
+        $dir = sys_get_temp_dir() . '/' . uniqid('pqueue_consumer_factory_test_');
+        mkdir($dir, 0777, true);
+        $this->tempDirs[] = $dir;
+        return $dir;
+    }
+
+    private function deleteDirectory(string $dir): void
+    {
+        if (!is_dir($dir)) return;
+        $files = new \RecursiveIteratorIterator(
+            new \RecursiveDirectoryIterator($dir, \RecursiveDirectoryIterator::SKIP_DOTS),
+            \RecursiveIteratorIterator::CHILD_FIRST
+        );
+        foreach ($files as $fileinfo) {
+            $todo = ($fileinfo->isDir() ? 'rmdir' : 'unlink');
+            $todo($fileinfo->getRealPath());
+        }
+        rmdir($dir);
+    }
+}

+ 84 - 0
tests/PQueueConsumerSimpleTest.php

@@ -0,0 +1,84 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use Michel\UniTester\TestCase;
+use Michel\PQueue\PQueueConsumer;
+use LogicException;
+use Test\Michel\PQueue\Extra\TestMessage;
+use Test\Michel\PQueue\Extra\TestMessageHandler;
+
+class PQueueConsumerSimpleTest extends TestCase
+{
+    public function execute(): void
+    {
+        $this->testConsumeCallsHandler();
+        $this->testConsumeThrowsExceptionIfNoHandlerFound();
+        $this->testConstructorValidatesHandlers();
+    }
+
+    private function testConsumeCallsHandler()
+    {
+        // Arrange
+        $message = new TestMessage();
+        $handlerCalled = false;
+        $mockHandler = new class extends TestMessageHandler {
+            public $called = false;
+            public function __invoke(TestMessage $message) {
+                $this->called = true;
+            }
+        };
+
+        $consumer = new PQueueConsumer([
+            TestMessage::class => $mockHandler
+        ]);
+
+        // Act
+        $consumer->consume($message);
+
+        // Assert
+        $this->assertTrue($mockHandler->called, 'Handler should have been called.');
+    }
+
+    private function testConsumeThrowsExceptionIfNoHandlerFound()
+    {
+        // Arrange
+        $message = new TestMessage();
+        $consumer = new PQueueConsumer([]); // Consumer with no handlers
+
+        // Assert
+        $this->expectException(\RuntimeException::class, function () use ($consumer, $message) {
+            // Act
+            $consumer->consume($message);
+        });
+    }
+
+    private function testConstructorValidatesHandlers()
+    {
+        // Test invalid handler (not an object)
+        $this->expectException(\InvalidArgumentException::class, function () {
+            new PQueueConsumer([TestMessage::class => 'not_an_object']);
+        });
+
+        // Test invalid handler (no __invoke)
+        $this->expectException(\InvalidArgumentException::class, function () {
+            new PQueueConsumer([TestMessage::class => new \stdClass()]);
+        });
+
+        // Test unknown payload class
+        $this->expectException(\InvalidArgumentException::class, function () {
+            $mockHandler = new class { public function __invoke(TestMessage $message) {} };
+            new PQueueConsumer(['NonExistentClass' => $mockHandler]);
+        });
+    }
+
+    protected function setUp(): void
+    {
+        // TODO: Implement setUp() method.
+    }
+
+    protected function tearDown(): void
+    {
+        // TODO: Implement tearDown() method.
+    }
+}

+ 65 - 0
tests/PQueueDispatcherTest.php

@@ -0,0 +1,65 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use DateTimeImmutable;
+use Michel\UniTester\TestCase;
+use Michel\PQueue\PQueueDispatcher;
+use Michel\PQueue\Transport\Envelope;
+use Michel\PQueue\Transport\Message\Message;
+use Michel\PQueue\Transport\TransportInterface;
+
+class PQueueDispatcherTest extends TestCase
+{
+    protected function execute(): void
+    {
+        $this->testDispatch();
+    }
+
+    public function testDispatch()
+    {
+        $transport = new class implements TransportInterface {
+            public ?Envelope $lastEnvelope = null;
+            public function send(Envelope $message): void
+            {
+                $this->lastEnvelope = $message;
+            }
+            public function getNextAvailableMessages(): iterable
+            {
+                return [];
+            }
+            public function success(Message $message): void {}
+            public function retry(Message $message, string $errorMessage, \DateTimeInterface $availableAt): void {}
+            public function failed(Message $message, string $errorMessage): void {}
+            public function supportMultiWorker(): bool
+            {
+                return false;
+            }
+
+            public static function create(array $options): TransportInterface
+            {
+                // TODO: Implement create() method.
+            }
+        };
+
+        $dispatcher = new PQueueDispatcher($transport);
+        $message = new \stdClass();
+        $message->data = 'test';
+
+        $dispatcher->dispatch($message);
+
+        $this->assertNotNull($transport->lastEnvelope);
+        $this->assertStringContains($transport->lastEnvelope->getBody(), 'test');
+        $this->assertTrue($transport->lastEnvelope->isRetry());
+    }
+
+    protected function setUp(): void
+    {
+        // TODO: Implement setUp() method.
+    }
+
+    protected function tearDown(): void
+    {
+        // TODO: Implement tearDown() method.
+    }
+}

+ 134 - 0
tests/PQueueHandlerFinderTest.php

@@ -0,0 +1,134 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use Michel\UniTester\TestCase;
+use Michel\PQueue\PQueueHandlerFinder;
+use LogicException;
+use InvalidArgumentException;
+use Test\Michel\PQueue\Extra\TestMessage;
+use Test\Michel\PQueue\Extra\TestMessageHandler;
+
+class PQueueHandlerFinderTest extends TestCase
+{
+    private array $tempDirs = [];
+
+    protected function setUp(): void
+    {
+        // Each test is self-contained.
+    }
+
+    protected function tearDown(): void
+    {
+        // Clean up all temporary directories created during tests.
+        foreach ($this->tempDirs as $dir) {
+            if (is_dir($dir)) {
+                $this->deleteDirectory($dir);
+            }
+        }
+        $this->tempDirs = [];
+    }
+
+    public function execute(): void
+    {
+        $this->testFindsSingleHandlerCorrectly();
+        $this->testFailsWhenMultipleHandlersExist();
+        $this->testCacheWorksCorrectly();
+        $this->testFailsForInvalidDirectory();
+    }
+
+    /**
+     * Test: The finder correctly finds a single, valid handler.
+     */
+    private function testFindsSingleHandlerCorrectly()
+    {
+        // Given: A temporary directory with ONE valid handler copied from the Extra directory.
+        $sourceDir = $this->createTempDir();
+        copy(__DIR__ . '/Extra/TestMessage.php', $sourceDir . '/TestMessage.php');
+        copy(__DIR__ . '/Extra/TestMessageHandler.php', $sourceDir . '/TestMessageHandler.php');
+
+        // When: We run the finder on that directory.
+        $finder = new PQueueHandlerFinder([$sourceDir]);
+        $handlerMap = $finder->find();
+
+        // Then: The map should contain exactly one correct entry.
+        $this->assertArrayHasKey(TestMessage::class, $handlerMap);
+        $this->assertStrictEquals(TestMessageHandler::class, $handlerMap[TestMessage::class]);
+        $this->assertCount(1, $handlerMap);
+    }
+
+    /**
+     * Test: The finder throws an exception when scanning the Extra directory, which has multiple handlers.
+     */
+    private function testFailsWhenMultipleHandlersExist()
+    {
+        $this->expectException(LogicException::class, function () {
+            // When: We run the finder on that directory.
+            $finder = new PQueueHandlerFinder([__DIR__ . '/Extra']);
+            $finder->find();
+        });
+    }
+
+    /**
+     * Test: The finder creates and uses a cache file correctly.
+     */
+    private function testCacheWorksCorrectly()
+    {
+        // Given: A temporary directory with one handler and a cache directory.
+        $sourceDir = $this->createTempDir();
+        $cacheDir = $this->createTempDir();
+        $handlerFile = $sourceDir . '/TestMessageHandler.php';
+        copy(__DIR__ . '/Extra/TestMessage.php', $sourceDir . '/TestMessage.php');
+        copy(__DIR__ . '/Extra/TestMessageHandler.php', $handlerFile);
+
+        // When: We run the finder the first time.
+        $finder = new PQueueHandlerFinder([$sourceDir], $cacheDir);
+        $handlerMap1 = $finder->find();
+
+        // Then: The cache file must exist.
+        $this->assertFileExists($cacheDir . '/pqueue_handler_map.php');
+
+        // And When: We delete the source handler file and run the finder again.
+        unlink($handlerFile);
+        $finder2 = new PQueueHandlerFinder([$sourceDir], $cacheDir);
+        $handlerMap2 = $finder2->find();
+
+        // Then: The result should be identical because it was loaded from cache.
+        $this->assertNotEmpty($handlerMap2, 'Cache should not be empty');
+        $this->assertStrictEquals($handlerMap1, $handlerMap2, 'Result from cache should match the original');
+    }
+
+    /**
+     * Test: The finder fails if the source directory does not exist.
+     */
+    private function testFailsForInvalidDirectory()
+    {
+        $this->expectException(InvalidArgumentException::class, function () {
+            new PQueueHandlerFinder(['/this/directory/does/not/exist']);
+        });
+    }
+
+    // --- UTILITY HELPERS ---
+
+    private function createTempDir(): string
+    {
+        $dir = sys_get_temp_dir() . '/' . uniqid('pqueue_test_');
+        mkdir($dir, 0777, true);
+        $this->tempDirs[] = $dir;
+        return $dir;
+    }
+
+    private function deleteDirectory(string $dir): void
+    {
+        if (!is_dir($dir)) return;
+        $files = new \RecursiveIteratorIterator(
+            new \RecursiveDirectoryIterator($dir, \RecursiveDirectoryIterator::SKIP_DOTS),
+            \RecursiveIteratorIterator::CHILD_FIRST
+        );
+        foreach ($files as $fileinfo) {
+            $todo = ($fileinfo->isDir() ? 'rmdir' : 'unlink');
+            $todo($fileinfo->getRealPath());
+        }
+        rmdir($dir);
+    }
+}

+ 90 - 0
tests/PQueueWorkerEventsTest.php

@@ -0,0 +1,90 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use Michel\PQueue\PQueueConsumer;
+use Michel\PQueue\PQueueWorker;
+use Michel\PQueue\Serializer\MessageSerializer;
+use Michel\PQueue\Transport\Envelope;
+use Michel\PQueue\Transport\FilesystemTransport;
+use Michel\PQueue\Transport\Message\Message;
+use Michel\UniTester\TestCase;
+use Test\Michel\PQueue\Extra\TestMessage;
+
+class PQueueWorkerEventsTest extends TestCase
+{
+    private string $transportDir;
+
+    protected function setUp(): void
+    {
+        $this->transportDir = sys_get_temp_dir() . '/pqueue_test_events_' . uniqid();
+        if (!is_dir($this->transportDir)) {
+            mkdir($this->transportDir);
+        }
+    }
+
+    protected function tearDown(): void
+    {
+        if (is_dir($this->transportDir)) {
+            $this->recursiveRemove($this->transportDir);
+        }
+    }
+
+    protected function execute(): void
+    {
+        $this->testEvents();
+    }
+
+    private function recursiveRemove(string $dir): void
+    {
+        $files = array_diff(scandir($dir), ['.', '..']);
+        foreach ($files as $file) {
+            (is_dir("$dir/$file")) ? $this->recursiveRemove("$dir/$file") : unlink("$dir/$file");
+        }
+        rmdir($dir);
+    }
+
+    public function testEvents()
+    {
+        $transport = new FilesystemTransport($this->transportDir);
+        $consumer = new PQueueConsumer([]);
+
+        // Add a message to the queue
+        $envelope = new Envelope(
+            MessageSerializer::serialize(new TestMessage()),
+            true,
+            0,
+            null
+        );
+        $transport->send($envelope);
+
+        $options = [
+            'stopWhenEmpty' => true,
+            'idleSleepMs' => 100,
+            'maxMemory' => 128,
+            'maxRuntimeSeconds' => 60,
+            'maxRetryAttempts' => 3,
+            'initialRetryDelayMs' => 1000,
+            'retryBackoffMultiplier' => 3,
+            'messageDelayMs' => 0,
+        ];
+
+        $worker = new PQueueWorker($transport, $consumer, $options);
+        $failed = false;
+        $stopped = false;
+
+        $worker->onFailure(function ($msg) use (&$failed) {
+            $failed = true;
+        });
+
+        $worker->onStop(function () use (&$stopped) {
+            $stopped = true;
+        });
+
+        $worker->run();
+
+        $this->assertTrue($failed, 'onFailure callback should be failed');
+        $this->assertTrue($stopped, 'onStop callback should be called');
+    }
+
+}

+ 294 - 0
tests/PQueueWorkerTest.php

@@ -0,0 +1,294 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use Michel\UniTester\TestCase;
+use Michel\PQueue\PQueueConsumer;
+use Michel\PQueue\PQueueWorker;
+use Michel\PQueue\Serializer\MessageSerializer;
+use Michel\PQueue\Transport\Envelope;
+use Michel\PQueue\Transport\Message\Message;
+use Michel\PQueue\Transport\TransportInterface;
+
+class PQueueWorkerTest extends TestCase
+{
+    protected function execute(): void
+    {
+        $this->testWorkerProcess();
+        $this->testStopWhenEmpty();
+        $this->testRetryLogic();
+        $this->testMaxRetryAttempts();
+        $this->testRetryBackoff();
+        $this->testMaxRuntime();
+        $this->testMaxMemory();
+        $this->testMessageDelay();
+        $this->testHighVolume();
+    }
+
+    public function testWorkerProcess()
+    {
+        $processed = false;
+        $transport = $this->createMockTransport([new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)]);
+
+        $consumer = new PQueueConsumer([\stdClass::class => new class($processed) {
+            private $processed;
+            public function __construct(&$processed)
+            {
+                $this->processed = &$processed;
+            }
+            public function __invoke(\stdClass $msg)
+            {
+                $this->processed = true;
+            }
+        }]);
+
+        $worker = new PQueueWorker($transport, $consumer, ['stopWhenEmpty' => true]);
+        $worker->run();
+
+        $this->assertTrue($processed);
+        $this->assertCount(1, $transport->processed);
+    }
+
+    public function testStopWhenEmpty()
+    {
+        $transport = $this->createMockTransport([]);
+        $consumer = new PQueueConsumer([]);
+
+        $startTime = microtime(true);
+        $worker = new PQueueWorker($transport, $consumer, ['stopWhenEmpty' => true]);
+        $worker->run();
+        $duration = microtime(true) - $startTime;
+
+        $this->assertTrue($duration < 1.0, "Worker should stop immediately");
+    }
+
+    public function testRetryLogic()
+    {
+        $transport = $this->createMockTransport([new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)]);
+
+        $consumer = new PQueueConsumer([\stdClass::class => new class {
+            public function __invoke(\stdClass $msg)
+            {
+                throw new \Exception("fail");
+            }
+        }]);
+
+        // Max retries 1. Flow: 0 -> retry -> 1 -> fail
+        // Set delay to 0 so it processes immediately
+        $worker = new PQueueWorker($transport, $consumer, [
+            'stopWhenEmpty' => true,
+            'maxRetryAttempts' => 1,
+            'initialRetryDelayMs' => 0
+        ]);
+        $worker->run();
+
+        $this->assertCount(1, $transport->retried);
+        $this->assertCount(1, $transport->failed);
+    }
+
+    public function testMaxRetryAttempts()
+    {
+        // Max retries 3. Flow: 0 -> 1 -> 2 -> 3 -> Fail
+        $transport = $this->createMockTransport([new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)]);
+
+        $consumer = new PQueueConsumer([\stdClass::class => new class {
+            public function __invoke(\stdClass $msg)
+            {
+                throw new \Exception("fail");
+            }
+        }]);
+
+        $worker = new PQueueWorker($transport, $consumer, [
+            'stopWhenEmpty' => true,
+            'maxRetryAttempts' => 3,
+            'initialRetryDelayMs' => 0
+        ]);
+        $worker->run();
+
+        $this->assertCount(3, $transport->retried, "Should retry 3 times");
+        $this->assertCount(1, $transport->failed, "Should fail eventually");
+    }
+
+    public function testRetryBackoff()
+    {
+        $transport = $this->createMockTransport([new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)]);
+
+        $consumer = new PQueueConsumer([\stdClass::class => new class {
+            public function __invoke(\stdClass $msg)
+            {
+                throw new \Exception("fail");
+            }
+        }]);
+
+        // initialDelay 1000ms, multiplier 2. 
+        // Attempt 0 fails -> retry (attempt 1). Delay = 1000 * 2^0 = 1000ms.
+        $worker = new PQueueWorker($transport, $consumer, [
+            'stopWhenEmpty' => true,
+            'initialRetryDelayMs' => 1000,
+            'retryBackoffMultiplier' => 2,
+            'maxRetryAttempts' => 1,
+        ]);
+        $worker->run();
+        $this->assertCount(1, $transport->retried);
+        $retryInfo = $transport->retried[0];
+        $availableAt = $retryInfo['at'];
+
+        $diff = $availableAt->getTimestamp() - time();
+        // It should be around 1 second in future.
+
+        $this->assertTrue($diff >= 1 && $diff <= 2, "Backoff should be around 1s");
+    }
+
+    public function testMaxRuntime()
+    {
+        $this->assertTrue(true);
+    }
+
+    public function testMaxMemory()
+    {
+        // Custom transport needed for memory test
+        $transport = new class implements TransportInterface {
+            public function getNextAvailableMessages(): iterable
+            {
+                while (true) {
+                    $data = str_repeat('a', 1024 * 1024);
+                    yield new Message('1', new Envelope(MessageSerializer::serialize(new \stdClass()), true));
+                }
+            }
+            public function send(Envelope $message): void {}
+            public function success(Message $message): void {}
+            public function retry(Message $message, string $errorMessage, \DateTimeInterface $availableAt): void {}
+            public function failed(Message $message, string $errorMessage): void {}
+            public function supportMultiWorker(): bool
+            {
+                return false;
+            }
+
+            public static function create(array $options): TransportInterface
+            {
+                // TODO: Implement create() method.
+            }
+        };
+
+        $consumer = new PQueueConsumer([\stdClass::class => new class {
+            public function __invoke(\stdClass $msg) {}
+        }]);
+
+        $worker = new PQueueWorker($transport, $consumer, ['maxMemory' => 1]);
+
+        $startTime = microtime(true);
+        $worker->run();
+        $duration = microtime(true) - $startTime;
+
+        $this->assertTrue($duration < 5.0, "Worker should stop when memory limit exceeded");
+    }
+
+    public function testMessageDelay()
+    {
+        $transport = $this->createMockTransport([
+            new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0),
+            new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)
+        ]);
+
+        $consumer = new PQueueConsumer([\stdClass::class => new class {
+            public function __invoke(\stdClass $msg) {}
+        }]);
+
+        $worker = new PQueueWorker($transport, $consumer, ['stopWhenEmpty' => true, 'messageDelayMs' => 500]);
+
+        $startTime = microtime(true);
+        $worker->run();
+        $duration = microtime(true) - $startTime;
+
+        $this->assertTrue($duration >= 1.0, "Worker should respect message delay");
+    }
+
+    public function testHighVolume()
+    {
+        $count = 1000;
+        $envelopes = [];
+        for ($i = 0; $i < $count; $i++) {
+            $envelopes[] = new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0);
+        }
+
+        $transport = $this->createMockTransport($envelopes);
+
+        $consumer = new PQueueConsumer([\stdClass::class => new class {
+            public function __invoke(\stdClass $msg) {}
+        }]);
+
+        $worker = new PQueueWorker($transport, $consumer, ['stopWhenEmpty' => true]);
+
+        $startTime = microtime(true);
+        $worker->run();
+        $duration = microtime(true) - $startTime;
+
+        $this->assertCount($count, $transport->processed);
+        $this->assertTrue($duration < 2.0, "High volume processing should be fast");
+    }
+
+    private function createMockTransport(array $envelopes): TransportInterface
+    {
+        return new class($envelopes) implements TransportInterface {
+            public array $queue = [];
+            public array $processed = [];
+            public array $retried = [];
+            public array $failed = [];
+
+            public function __construct($envelopes)
+            {
+                foreach ($envelopes as $k => $e) {
+                    $this->queue[] = new Message((string)$k, $e);
+                }
+            }
+
+            public function send(Envelope $message): void
+            {
+                $this->queue[] = new Message(uniqid(), $message);
+            }
+
+            public function getNextAvailableMessages(): iterable
+            {
+                $now = new \DateTimeImmutable();
+                foreach ($this->queue as $k => $msg) {
+                    $av = $msg->getEnvelope()->getAvailableAt();
+                    if ($av === null || $av <= $now) {
+                        unset($this->queue[$k]);
+                        yield $msg;
+                    }
+                }
+            }
+
+            public function success(Message $message): void
+            {
+                $this->processed[] = $message;
+            }
+
+            public function retry(Message $message, string $errorMessage, \DateTimeInterface $availableAt): void
+            {
+                $this->retried[] = ['msg' => $message, 'error' => $errorMessage, 'at' => $availableAt];
+                $env = $message->getEnvelope();
+                $newEnv = new Envelope($env->getBody(), true, $env->getAttempts() + 1, $availableAt);
+                $this->queue[] = new Message($message->getId(), $newEnv);
+            }
+
+            public function failed(Message $message, string $errorMessage): void
+            {
+                $this->failed[] = ['msg' => $message, 'error' => $errorMessage];
+            }
+
+            public function supportMultiWorker(): bool
+            {
+                return false;
+            }
+
+            public static function create(array $options): TransportInterface
+            {
+                // TODO: Implement create() method.
+            }
+        };
+    }
+
+    protected function setUp(): void {}
+    protected function tearDown(): void {}
+}

+ 106 - 0
tests/SQLiteTransportTest.php

@@ -0,0 +1,106 @@
+<?php
+
+namespace Test\Michel\PQueue;
+
+use DateTimeImmutable;
+use Michel\UniTester\TestCase;
+use Michel\PQueue\Transport\SQLiteTransport;
+use Michel\PQueue\Transport\Envelope;
+use Michel\PQueue\Transport\Message\Message;
+
+class SQLiteTransportTest extends TestCase
+{
+    private string $dbPath;
+
+    protected function setUp(): void
+    {
+        $this->dbPath = sys_get_temp_dir() . '/pqueue_test_' . uniqid('', true) . '.sqlite';
+    }
+
+    protected function tearDown(): void
+    {
+        if (file_exists($this->dbPath)) {
+            unlink($this->dbPath);
+        }
+    }
+
+    protected function execute(): void
+    {
+        $this->testSendAndGetNext();
+        $this->testSuccess();
+        $this->testRetry();
+        $this->testFailed();
+    }
+
+    private function cleanDb()
+    {
+        $db = new \SQLite3($this->dbPath);
+        $db->exec('DELETE FROM pqueue_messages');
+        $db->close();
+    }
+
+    public function testSendAndGetNext()
+    {
+        $transport = new SQLiteTransport($this->dbPath);
+        $envelope = new Envelope('test_body', true, 0);
+
+        $transport->send($envelope);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+
+        $this->assertCount(1, $messages);
+        $this->assertInstanceOf(Message::class, $messages[0]);
+        $this->assertEquals('test_body', $messages[0]->getEnvelope()->getBody());
+    }
+
+    public function testSuccess()
+    {
+        $this->cleanDb();
+        $transport = new SQLiteTransport($this->dbPath);
+        $envelope = new Envelope('test_body', true, 0);
+        $transport->send($envelope);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $message = $messages[0];
+
+        $transport->success($message);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $this->assertCount(0, $messages);
+    }
+
+    public function testRetry()
+    {
+        $this->cleanDb();
+        $transport = new SQLiteTransport($this->dbPath);
+        $envelope = new Envelope('test_body', true, 0);
+        $transport->send($envelope);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $message = $messages[0];
+
+        $availableAt = (new DateTimeImmutable())->modify('+1 minute');
+        $transport->retry($message, 'error', $availableAt);
+
+        // Should not be available immediately
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $this->assertCount(0, $messages);
+    }
+
+    public function testFailed()
+    {
+        $this->cleanDb();
+        $transport = new SQLiteTransport($this->dbPath);
+        $envelope = new Envelope('test_body', true, 0);
+        $transport->send($envelope);
+
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $message = $messages[0];
+
+        $transport->failed($message, 'fatal error');
+
+        // Should not be available in pending/retry
+        $messages = iterator_to_array($transport->getNextAvailableMessages());
+        $this->assertCount(0, $messages);
+    }
+}