2
0

PQueueWorkerEventsTest.php 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. <?php
  2. namespace Test\Michel\PQueue;
  3. use Michel\PQueue\PQueueConsumer;
  4. use Michel\PQueue\PQueueWorker;
  5. use Michel\PQueue\Serializer\MessageSerializer;
  6. use Michel\PQueue\Transport\Envelope;
  7. use Michel\PQueue\Transport\FilesystemTransport;
  8. use Michel\PQueue\Transport\Message\Message;
  9. use Michel\UniTester\TestCase;
  10. use Test\Michel\PQueue\Extra\TestMessage;
  11. class PQueueWorkerEventsTest extends TestCase
  12. {
  13. private string $transportDir;
  14. protected function setUp(): void
  15. {
  16. $this->transportDir = sys_get_temp_dir() . '/pqueue_test_events_' . uniqid();
  17. if (!is_dir($this->transportDir)) {
  18. mkdir($this->transportDir);
  19. }
  20. }
  21. protected function tearDown(): void
  22. {
  23. if (is_dir($this->transportDir)) {
  24. $this->recursiveRemove($this->transportDir);
  25. }
  26. }
  27. protected function execute(): void
  28. {
  29. $this->testEvents();
  30. }
  31. private function recursiveRemove(string $dir): void
  32. {
  33. $files = array_diff(scandir($dir), ['.', '..']);
  34. foreach ($files as $file) {
  35. (is_dir("$dir/$file")) ? $this->recursiveRemove("$dir/$file") : unlink("$dir/$file");
  36. }
  37. rmdir($dir);
  38. }
  39. public function testEvents()
  40. {
  41. $transport = new FilesystemTransport($this->transportDir);
  42. $consumer = new PQueueConsumer([]);
  43. // Add a message to the queue
  44. $envelope = new Envelope(
  45. MessageSerializer::serialize(new TestMessage()),
  46. true,
  47. 0,
  48. null
  49. );
  50. $transport->send($envelope);
  51. $options = [
  52. 'stopWhenEmpty' => true,
  53. 'idleSleepMs' => 100,
  54. 'maxMemory' => 128,
  55. 'maxRuntimeSeconds' => 60,
  56. 'maxRetryAttempts' => 3,
  57. 'initialRetryDelayMs' => 1000,
  58. 'retryBackoffMultiplier' => 3,
  59. 'messageDelayMs' => 0,
  60. ];
  61. $worker = new PQueueWorker($transport, $consumer, $options);
  62. $failed = false;
  63. $stopped = false;
  64. $worker->onFailure(function ($msg) use (&$failed) {
  65. $failed = true;
  66. });
  67. $worker->onStop(function () use (&$stopped) {
  68. $stopped = true;
  69. });
  70. $worker->run();
  71. $this->assertTrue($failed, 'onFailure callback should be failed');
  72. $this->assertTrue($stopped, 'onStop callback should be called');
  73. }
  74. }