| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- <?php
- namespace Test\Michel\PQueue;
- use Michel\UniTester\TestCase;
- use Michel\PQueue\PQueueConsumer;
- use Michel\PQueue\PQueueWorker;
- use Michel\PQueue\Serializer\MessageSerializer;
- use Michel\PQueue\Transport\Envelope;
- use Michel\PQueue\Transport\Message\Message;
- use Michel\PQueue\Transport\TransportInterface;
- class PQueueWorkerTest extends TestCase
- {
- protected function execute(): void
- {
- $this->testWorkerProcess();
- $this->testStopWhenEmpty();
- $this->testRetryLogic();
- $this->testMaxRetryAttempts();
- $this->testRetryBackoff();
- $this->testMaxRuntime();
- $this->testMaxMemory();
- $this->testMessageDelay();
- $this->testHighVolume();
- }
- public function testWorkerProcess()
- {
- $processed = false;
- $transport = $this->createMockTransport([new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)]);
- $consumer = new PQueueConsumer([\stdClass::class => new class($processed) {
- private $processed;
- public function __construct(&$processed)
- {
- $this->processed = &$processed;
- }
- public function __invoke(\stdClass $msg)
- {
- $this->processed = true;
- }
- }]);
- $worker = new PQueueWorker($transport, $consumer, ['stopWhenEmpty' => true]);
- $worker->run();
- $this->assertTrue($processed);
- $this->assertCount(1, $transport->processed);
- }
- public function testStopWhenEmpty()
- {
- $transport = $this->createMockTransport([]);
- $consumer = new PQueueConsumer([]);
- $startTime = microtime(true);
- $worker = new PQueueWorker($transport, $consumer, ['stopWhenEmpty' => true]);
- $worker->run();
- $duration = microtime(true) - $startTime;
- $this->assertTrue($duration < 1.0, "Worker should stop immediately");
- }
- public function testRetryLogic()
- {
- $transport = $this->createMockTransport([new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)]);
- $consumer = new PQueueConsumer([\stdClass::class => new class {
- public function __invoke(\stdClass $msg)
- {
- throw new \Exception("fail");
- }
- }]);
- // Max retries 1. Flow: 0 -> retry -> 1 -> fail
- // Set delay to 0 so it processes immediately
- $worker = new PQueueWorker($transport, $consumer, [
- 'stopWhenEmpty' => true,
- 'maxRetryAttempts' => 1,
- 'initialRetryDelayMs' => 0
- ]);
- $worker->run();
- $this->assertCount(1, $transport->retried);
- $this->assertCount(1, $transport->failed);
- }
- public function testMaxRetryAttempts()
- {
- // Max retries 3. Flow: 0 -> 1 -> 2 -> 3 -> Fail
- $transport = $this->createMockTransport([new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)]);
- $consumer = new PQueueConsumer([\stdClass::class => new class {
- public function __invoke(\stdClass $msg)
- {
- throw new \Exception("fail");
- }
- }]);
- $worker = new PQueueWorker($transport, $consumer, [
- 'stopWhenEmpty' => true,
- 'maxRetryAttempts' => 3,
- 'initialRetryDelayMs' => 0
- ]);
- $worker->run();
- $this->assertCount(3, $transport->retried, "Should retry 3 times");
- $this->assertCount(1, $transport->failed, "Should fail eventually");
- }
- public function testRetryBackoff()
- {
- $transport = $this->createMockTransport([new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)]);
- $consumer = new PQueueConsumer([\stdClass::class => new class {
- public function __invoke(\stdClass $msg)
- {
- throw new \Exception("fail");
- }
- }]);
- // initialDelay 1000ms, multiplier 2.
- // Attempt 0 fails -> retry (attempt 1). Delay = 1000 * 2^0 = 1000ms.
- $worker = new PQueueWorker($transport, $consumer, [
- 'stopWhenEmpty' => true,
- 'initialRetryDelayMs' => 1000,
- 'retryBackoffMultiplier' => 2,
- 'maxRetryAttempts' => 1,
- ]);
- $worker->run();
- $this->assertCount(1, $transport->retried);
- $retryInfo = $transport->retried[0];
- $availableAt = $retryInfo['at'];
- $diff = $availableAt->getTimestamp() - time();
- // It should be around 1 second in future.
- $this->assertTrue($diff >= 1 && $diff <= 2, "Backoff should be around 1s");
- }
- public function testMaxRuntime()
- {
- $this->assertTrue(true);
- }
- public function testMaxMemory()
- {
- // Custom transport needed for memory test
- $transport = new class implements TransportInterface {
- public function getNextAvailableMessages(): iterable
- {
- while (true) {
- $data = str_repeat('a', 1024 * 1024);
- yield new Message('1', new Envelope(MessageSerializer::serialize(new \stdClass()), true));
- }
- }
- public function send(Envelope $message): void {}
- public function success(Message $message): void {}
- public function retry(Message $message, string $errorMessage, \DateTimeInterface $availableAt): void {}
- public function failed(Message $message, string $errorMessage): void {}
- public function supportMultiWorker(): bool
- {
- return false;
- }
- public static function create(array $options): TransportInterface
- {
- // TODO: Implement create() method.
- }
- };
- $consumer = new PQueueConsumer([\stdClass::class => new class {
- public function __invoke(\stdClass $msg) {}
- }]);
- $worker = new PQueueWorker($transport, $consumer, ['maxMemory' => 1]);
- $startTime = microtime(true);
- $worker->run();
- $duration = microtime(true) - $startTime;
- $this->assertTrue($duration < 5.0, "Worker should stop when memory limit exceeded");
- }
- public function testMessageDelay()
- {
- $transport = $this->createMockTransport([
- new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0),
- new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0)
- ]);
- $consumer = new PQueueConsumer([\stdClass::class => new class {
- public function __invoke(\stdClass $msg) {}
- }]);
- $worker = new PQueueWorker($transport, $consumer, ['stopWhenEmpty' => true, 'messageDelayMs' => 500]);
- $startTime = microtime(true);
- $worker->run();
- $duration = microtime(true) - $startTime;
- $this->assertTrue($duration >= 1.0, "Worker should respect message delay");
- }
- public function testHighVolume()
- {
- $count = 1000;
- $envelopes = [];
- for ($i = 0; $i < $count; $i++) {
- $envelopes[] = new Envelope(MessageSerializer::serialize(new \stdClass()), true, 0);
- }
- $transport = $this->createMockTransport($envelopes);
- $consumer = new PQueueConsumer([\stdClass::class => new class {
- public function __invoke(\stdClass $msg) {}
- }]);
- $worker = new PQueueWorker($transport, $consumer, ['stopWhenEmpty' => true]);
- $startTime = microtime(true);
- $worker->run();
- $duration = microtime(true) - $startTime;
- $this->assertCount($count, $transport->processed);
- $this->assertTrue($duration < 2.0, "High volume processing should be fast");
- }
- private function createMockTransport(array $envelopes): TransportInterface
- {
- return new class($envelopes) implements TransportInterface {
- public array $queue = [];
- public array $processed = [];
- public array $retried = [];
- public array $failed = [];
- public function __construct($envelopes)
- {
- foreach ($envelopes as $k => $e) {
- $this->queue[] = new Message((string)$k, $e);
- }
- }
- public function send(Envelope $message): void
- {
- $this->queue[] = new Message(uniqid(), $message);
- }
- public function getNextAvailableMessages(): iterable
- {
- $now = new \DateTimeImmutable();
- foreach ($this->queue as $k => $msg) {
- $av = $msg->getEnvelope()->getAvailableAt();
- if ($av === null || $av <= $now) {
- unset($this->queue[$k]);
- yield $msg;
- }
- }
- }
- public function success(Message $message): void
- {
- $this->processed[] = $message;
- }
- public function retry(Message $message, string $errorMessage, \DateTimeInterface $availableAt): void
- {
- $this->retried[] = ['msg' => $message, 'error' => $errorMessage, 'at' => $availableAt];
- $env = $message->getEnvelope();
- $newEnv = new Envelope($env->getBody(), true, $env->getAttempts() + 1, $availableAt);
- $this->queue[] = new Message($message->getId(), $newEnv);
- }
- public function failed(Message $message, string $errorMessage): void
- {
- $this->failed[] = ['msg' => $message, 'error' => $errorMessage];
- }
- public function supportMultiWorker(): bool
- {
- return false;
- }
- public static function create(array $options): TransportInterface
- {
- // TODO: Implement create() method.
- }
- };
- }
- protected function setUp(): void {}
- protected function tearDown(): void {}
- }
|