diff --git a/src/Worker/ExecutionPolicy/SupervisorPolicy.php b/src/Worker/ExecutionPolicy/SupervisorPolicy.php new file mode 100644 index 00000000..e164528a --- /dev/null +++ b/src/Worker/ExecutionPolicy/SupervisorPolicy.php @@ -0,0 +1,36 @@ + + */ +final class SupervisorPolicy implements ExecutionPolicyInterface +{ + public function __construct(private WorkerSupervisorInterface $supervisor) + { + } + + /** + * {@inheritdoc} + */ + public function execute( + TaskListInterface $toExecuteTasks, + Closure $handleTaskFunc + ): void { + } + + /** + * {@inheritdoc} + */ + public function support(string $policy): bool + { + return 'supervisor' === $policy; + } +} diff --git a/src/Worker/Supervisor/Process.php b/src/Worker/Supervisor/Process.php new file mode 100644 index 00000000..8f89213f --- /dev/null +++ b/src/Worker/Supervisor/Process.php @@ -0,0 +1,30 @@ + + */ +final class Process +{ + public function __construct( + private Closure $process, + private int $identifier + ) { + } + + public function run(): TaskInterface + { + return ($this->process)(); + } + + public function getIdentifier(): int + { + return $this->identifier; + } +} diff --git a/src/Worker/Supervisor/WorkerSupervisor.php b/src/Worker/Supervisor/WorkerSupervisor.php new file mode 100644 index 00000000..fa2a03e6 --- /dev/null +++ b/src/Worker/Supervisor/WorkerSupervisor.php @@ -0,0 +1,36 @@ + + */ +final class WorkerSupervisor implements WorkerSupervisorInterface +{ + public function __construct(private WorkerRegistryInterface $registry) + { + if (!function_exists(function: 'pcntl_fork')) { + throw new LogicException('The supervisor cannot be used without pcntl'); + } + } + + public function start( + WorkerSupervisorConfiguration $configuration, + callable ...$processes + ): void { + if ($configuration->shouldStop()) { + return; + } + + $processOutputList = []; + + while ($configuration->isRunning()) { + } + } +} diff --git a/src/Worker/Supervisor/WorkerSupervisorConfiguration.php b/src/Worker/Supervisor/WorkerSupervisorConfiguration.php new file mode 100644 index 00000000..959f7e7c --- /dev/null +++ b/src/Worker/Supervisor/WorkerSupervisorConfiguration.php @@ -0,0 +1,59 @@ + + */ +final class WorkerSupervisorConfiguration +{ + private int $processesAmount; + private int $runningProcesses; + private bool $shouldStop; + + private function __construct() + { + } + + public static function create(): self + { + $self = new self(); + $self->processesAmount = 0; + $self->runningProcesses = 0; + $self->shouldStop = false; + + return $self; + } + + public function setProcessesAmount(int $processesAmount): void + { + $this->processesAmount = $processesAmount; + } + + public function getProcessesAmount(): int + { + return $this->processesAmount; + } + + public function setRunningProcesses(int $runningProcesses): void + { + $this->runningProcesses = $runningProcesses; + } + + public function getRunningProcesses(): int + { + return $this->runningProcesses; + } + + public function isRunning(): bool + { + return $this->runningProcesses > 0; + } + + public function shouldStop(): bool + { + return $this->shouldStop; + } +} diff --git a/src/Worker/Supervisor/WorkerSupervisorInterface.php b/src/Worker/Supervisor/WorkerSupervisorInterface.php new file mode 100644 index 00000000..8a8c7f59 --- /dev/null +++ b/src/Worker/Supervisor/WorkerSupervisorInterface.php @@ -0,0 +1,16 @@ + + */ +interface WorkerSupervisorInterface +{ + public function start( + WorkerSupervisorConfiguration $configuration, + callable ...$processes + ): void; +} diff --git a/src/Worker/WorkerRegistry.php b/src/Worker/WorkerRegistry.php index 4f1c3c17..2bda5285 100644 --- a/src/Worker/WorkerRegistry.php +++ b/src/Worker/WorkerRegistry.php @@ -4,6 +4,7 @@ namespace SchedulerBundle\Worker; +use Closure; use function count; use function is_array; use function iterator_to_array; @@ -26,6 +27,22 @@ public function __construct(iterable $workers) $this->workers = is_array(value: $workers) ? $workers : iterator_to_array(iterator: $workers); } + /** + * {@inheritdoc} + */ + public function add(WorkerInterface $worker): void + { + $this->workers[] = $worker; + } + + /** + * {@inheritdoc} + */ + public function walk(Closure $func): void + { + array_walk(array: $this->workers, callback: $func); + } + /** * {@inheritdoc} */ diff --git a/src/Worker/WorkerRegistryInterface.php b/src/Worker/WorkerRegistryInterface.php index bae26e57..4d8f874b 100644 --- a/src/Worker/WorkerRegistryInterface.php +++ b/src/Worker/WorkerRegistryInterface.php @@ -4,6 +4,7 @@ namespace SchedulerBundle\Worker; +use Closure; use Countable; /** @@ -11,6 +12,13 @@ */ interface WorkerRegistryInterface extends Countable { + /** + * Add a new @param WorkerInterface $worker into the registry. + */ + public function add(WorkerInterface $worker): void; + + public function walk(Closure $func): void; + /** * Return the workers. * diff --git a/tests/Worker/ExecutionPolicy/SupervisorPolicyTest.php b/tests/Worker/ExecutionPolicy/SupervisorPolicyTest.php new file mode 100644 index 00000000..b6a8cf2c --- /dev/null +++ b/tests/Worker/ExecutionPolicy/SupervisorPolicyTest.php @@ -0,0 +1,25 @@ + + */ +final class SupervisorPolicyTest extends TestCase +{ + public function testPolicySupport(): void + { + $supervisor = $this->createMock(originalClassName: WorkerSupervisorInterface::class); + + $policy = new SupervisorPolicy(supervisor: $supervisor); + + self::assertTrue(condition: $policy->support(policy: 'supervisor')); + self::assertFalse(condition: $policy->support(policy: 'foo')); + } +} diff --git a/tests/Worker/Supervisor/ProcessTest.php b/tests/Worker/Supervisor/ProcessTest.php new file mode 100644 index 00000000..5d9a27e4 --- /dev/null +++ b/tests/Worker/Supervisor/ProcessTest.php @@ -0,0 +1,14 @@ + + */ +final class ProcessTest extends TestCase +{ +} diff --git a/tests/Worker/Supervisor/WorkerSupervisorConfigurationTest.php b/tests/Worker/Supervisor/WorkerSupervisorConfigurationTest.php new file mode 100644 index 00000000..79d984d9 --- /dev/null +++ b/tests/Worker/Supervisor/WorkerSupervisorConfigurationTest.php @@ -0,0 +1,24 @@ + + */ +final class WorkerSupervisorConfigurationTest extends TestCase +{ + public function testConfigurationCanBeCreated(): void + { + $configuration = WorkerSupervisorConfiguration::create(); + + self::assertSame(0, $configuration->getProcessesAmount()); + self::assertSame(0, $configuration->getRunningProcesses()); + self::assertFalse($configuration->isRunning()); + self::assertFalse($configuration->shouldStop()); + } +} diff --git a/tests/Worker/Supervisor/WorkerSupervisorTest.php b/tests/Worker/Supervisor/WorkerSupervisorTest.php new file mode 100644 index 00000000..ec63d6e7 --- /dev/null +++ b/tests/Worker/Supervisor/WorkerSupervisorTest.php @@ -0,0 +1,14 @@ + + */ +final class WorkerSupervisorTest extends TestCase +{ +} diff --git a/tests/Worker/WorkerRegistryTest.php b/tests/Worker/WorkerRegistryTest.php index e745f022..2d0f66ce 100644 --- a/tests/Worker/WorkerRegistryTest.php +++ b/tests/Worker/WorkerRegistryTest.php @@ -15,23 +15,35 @@ final class WorkerRegistryTest extends TestCase { public function testRegistryCanCount(): void { - $worker = $this->createMock(WorkerInterface::class); + $worker = $this->createMock(originalClassName: WorkerInterface::class); - $pool = new WorkerRegistry([ + $pool = new WorkerRegistry(workers: [ $worker, ]); - self::assertCount(1, $pool); + self::assertCount(expectedCount: 1, haystack: $pool); + } + + public function testRegistryCanAddWorker(): void + { + $worker = $this->createMock(originalClassName: WorkerInterface::class); + + $pool = new WorkerRegistry(workers: [ + $worker, + ]); + $pool->add(worker: $worker); + + self::assertCount(expectedCount: 2, haystack: $pool->getWorkers()); } public function testRegistryCanReturnWorkers(): void { - $worker = $this->createMock(WorkerInterface::class); + $worker = $this->createMock(originalClassName: WorkerInterface::class); - $pool = new WorkerRegistry([ + $pool = new WorkerRegistry(workers: [ $worker, ]); - self::assertCount(1, $pool->getWorkers()); + self::assertCount(expectedCount: 1, haystack: $pool->getWorkers()); } }