| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- <?php
- namespace Test\Michel\PQueue;
- use Michel\PQueue\PQueueConsumer;
- use Michel\PQueue\PQueueWorker;
- use Michel\PQueue\Serializer\MessageSerializer;
- use Michel\PQueue\Transport\Envelope;
- use Michel\PQueue\Transport\FilesystemTransport;
- use Michel\PQueue\Transport\Message\Message;
- use Michel\UniTester\TestCase;
- use Test\Michel\PQueue\Extra\TestMessage;
- class PQueueWorkerEventsTest extends TestCase
- {
- private string $transportDir;
- protected function setUp(): void
- {
- $this->transportDir = sys_get_temp_dir() . '/pqueue_test_events_' . uniqid();
- if (!is_dir($this->transportDir)) {
- mkdir($this->transportDir);
- }
- }
- protected function tearDown(): void
- {
- if (is_dir($this->transportDir)) {
- $this->recursiveRemove($this->transportDir);
- }
- }
- protected function execute(): void
- {
- $this->testEvents();
- }
- private function recursiveRemove(string $dir): void
- {
- $files = array_diff(scandir($dir), ['.', '..']);
- foreach ($files as $file) {
- (is_dir("$dir/$file")) ? $this->recursiveRemove("$dir/$file") : unlink("$dir/$file");
- }
- rmdir($dir);
- }
- public function testEvents()
- {
- $transport = new FilesystemTransport($this->transportDir);
- $consumer = new PQueueConsumer([]);
- // Add a message to the queue
- $envelope = new Envelope(
- MessageSerializer::serialize(new TestMessage()),
- true,
- 0,
- null
- );
- $transport->send($envelope);
- $options = [
- 'stopWhenEmpty' => true,
- 'idleSleepMs' => 100,
- 'maxMemory' => 128,
- 'maxRuntimeSeconds' => 60,
- 'maxRetryAttempts' => 3,
- 'initialRetryDelayMs' => 1000,
- 'retryBackoffMultiplier' => 3,
- 'messageDelayMs' => 0,
- ];
- $worker = new PQueueWorker($transport, $consumer, $options);
- $failed = false;
- $stopped = false;
- $worker->onFailure(function ($msg) use (&$failed) {
- $failed = true;
- });
- $worker->onStop(function () use (&$stopped) {
- $stopped = true;
- });
- $worker->run();
- $this->assertTrue($failed, 'onFailure callback should be failed');
- $this->assertTrue($stopped, 'onStop callback should be called');
- }
- }
|