Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/Worker/ExecutionPolicy/SupervisorPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace SchedulerBundle\Worker\ExecutionPolicy;

use Closure;
use SchedulerBundle\Task\TaskListInterface;
use SchedulerBundle\Worker\Supervisor\WorkerSupervisorInterface;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
final class SupervisorPolicy implements ExecutionPolicyInterface
{
public function __construct(private WorkerSupervisorInterface $supervisor)
{
}

/**
* {@inheritdoc}
*/
public function execute(
TaskListInterface $toExecuteTasks,
Closure $handleTaskFunc
): void {
}

/**
* {@inheritdoc}
*/
public function support(string $policy): bool
{
return 'supervisor' === $policy;
}
}
30 changes: 30 additions & 0 deletions src/Worker/Supervisor/Process.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace SchedulerBundle\Worker\Supervisor;

use Closure;
use SchedulerBundle\Task\TaskInterface;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
final class Process
{
public function __construct(
private Closure $process,
private int $identifier
) {
}

public function run(): TaskInterface
{
return ($this->process)();
}

public function getIdentifier(): int
{
return $this->identifier;
}
}
36 changes: 36 additions & 0 deletions src/Worker/Supervisor/WorkerSupervisor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace SchedulerBundle\Worker\Supervisor;

use SchedulerBundle\Exception\LogicException;
use SchedulerBundle\Worker\WorkerRegistryInterface;
use function function_exists;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
final class WorkerSupervisor implements WorkerSupervisorInterface
{
public function __construct(private WorkerRegistryInterface $registry)
{
if (!function_exists(function: 'pcntl_fork')) {
throw new LogicException('The supervisor cannot be used without pcntl');
}
}

public function start(
WorkerSupervisorConfiguration $configuration,
callable ...$processes
): void {
if ($configuration->shouldStop()) {
return;
}

$processOutputList = [];

while ($configuration->isRunning()) {
}
}
}
59 changes: 59 additions & 0 deletions src/Worker/Supervisor/WorkerSupervisorConfiguration.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

namespace SchedulerBundle\Worker\Supervisor;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
final class WorkerSupervisorConfiguration
{
private int $processesAmount;
private int $runningProcesses;
private bool $shouldStop;

private function __construct()
{
}

public static function create(): self
{
$self = new self();
$self->processesAmount = 0;
$self->runningProcesses = 0;
$self->shouldStop = false;

return $self;
}

public function setProcessesAmount(int $processesAmount): void
{
$this->processesAmount = $processesAmount;
}

public function getProcessesAmount(): int
{
return $this->processesAmount;
}

public function setRunningProcesses(int $runningProcesses): void
{
$this->runningProcesses = $runningProcesses;
}

public function getRunningProcesses(): int
{
return $this->runningProcesses;
}

public function isRunning(): bool
{
return $this->runningProcesses > 0;
}

public function shouldStop(): bool
{
return $this->shouldStop;
}
}
16 changes: 16 additions & 0 deletions src/Worker/Supervisor/WorkerSupervisorInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace SchedulerBundle\Worker\Supervisor;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
interface WorkerSupervisorInterface
{
public function start(
WorkerSupervisorConfiguration $configuration,
callable ...$processes
): void;
}
17 changes: 17 additions & 0 deletions src/Worker/WorkerRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace SchedulerBundle\Worker;

use Closure;
use function count;
use function is_array;
use function iterator_to_array;
Expand All @@ -26,6 +27,22 @@ public function __construct(iterable $workers)
$this->workers = is_array(value: $workers) ? $workers : iterator_to_array(iterator: $workers);
}

/**
* {@inheritdoc}
*/
public function add(WorkerInterface $worker): void
{
$this->workers[] = $worker;
}

/**
* {@inheritdoc}
*/
public function walk(Closure $func): void
{
array_walk(array: $this->workers, callback: $func);
}

/**
* {@inheritdoc}
*/
Expand Down
8 changes: 8 additions & 0 deletions src/Worker/WorkerRegistryInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@

namespace SchedulerBundle\Worker;

use Closure;
use Countable;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
interface WorkerRegistryInterface extends Countable
{
/**
* Add a new @param WorkerInterface $worker into the registry.
*/
public function add(WorkerInterface $worker): void;

public function walk(Closure $func): void;

/**
* Return the workers.
*
Expand Down
25 changes: 25 additions & 0 deletions tests/Worker/ExecutionPolicy/SupervisorPolicyTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Tests\SchedulerBundle\Worker\ExecutionPolicy;

use PHPUnit\Framework\TestCase;
use SchedulerBundle\Worker\ExecutionPolicy\SupervisorPolicy;
use SchedulerBundle\Worker\Supervisor\WorkerSupervisorInterface;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
final class SupervisorPolicyTest extends TestCase
{
public function testPolicySupport(): void
{
$supervisor = $this->createMock(originalClassName: WorkerSupervisorInterface::class);

$policy = new SupervisorPolicy(supervisor: $supervisor);

self::assertTrue(condition: $policy->support(policy: 'supervisor'));
self::assertFalse(condition: $policy->support(policy: 'foo'));
}
}
14 changes: 14 additions & 0 deletions tests/Worker/Supervisor/ProcessTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Tests\SchedulerBundle\Worker\Supervisor;

use PHPUnit\Framework\TestCase;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
final class ProcessTest extends TestCase
{
}
24 changes: 24 additions & 0 deletions tests/Worker/Supervisor/WorkerSupervisorConfigurationTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);

namespace Tests\SchedulerBundle\Worker\Supervisor;

use PHPUnit\Framework\TestCase;
use SchedulerBundle\Worker\Supervisor\WorkerSupervisorConfiguration;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
final class WorkerSupervisorConfigurationTest extends TestCase
{
public function testConfigurationCanBeCreated(): void
{
$configuration = WorkerSupervisorConfiguration::create();

self::assertSame(0, $configuration->getProcessesAmount());
self::assertSame(0, $configuration->getRunningProcesses());
self::assertFalse($configuration->isRunning());
self::assertFalse($configuration->shouldStop());
}
}
14 changes: 14 additions & 0 deletions tests/Worker/Supervisor/WorkerSupervisorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Tests\SchedulerBundle\Worker\Supervisor;

use PHPUnit\Framework\TestCase;

/**
* @author Guillaume Loulier <contact@guillaumeloulier.fr>
*/
final class WorkerSupervisorTest extends TestCase
{
}
24 changes: 18 additions & 6 deletions tests/Worker/WorkerRegistryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,35 @@ final class WorkerRegistryTest extends TestCase
{
public function testRegistryCanCount(): void
{
$worker = $this->createMock(WorkerInterface::class);
$worker = $this->createMock(originalClassName: WorkerInterface::class);

$pool = new WorkerRegistry([
$pool = new WorkerRegistry(workers: [
$worker,
]);

self::assertCount(1, $pool);
self::assertCount(expectedCount: 1, haystack: $pool);
}

public function testRegistryCanAddWorker(): void
{
$worker = $this->createMock(originalClassName: WorkerInterface::class);

$pool = new WorkerRegistry(workers: [
$worker,
]);
$pool->add(worker: $worker);

self::assertCount(expectedCount: 2, haystack: $pool->getWorkers());
}

public function testRegistryCanReturnWorkers(): void
{
$worker = $this->createMock(WorkerInterface::class);
$worker = $this->createMock(originalClassName: WorkerInterface::class);

$pool = new WorkerRegistry([
$pool = new WorkerRegistry(workers: [
$worker,
]);

self::assertCount(1, $pool->getWorkers());
self::assertCount(expectedCount: 1, haystack: $pool->getWorkers());
}
}