Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker-compose.override.yaml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ x-mutagen:
permissions:
defaultDirectoryMode: 0755
defaultFileMode: 0644
ignore:
vcs: true
# ignore:
# vcs: true
code:
alpha: "."
beta: "volume://code"
Expand Down
22 changes: 0 additions & 22 deletions docker-compose.override.yml.dist

This file was deleted.

75 changes: 38 additions & 37 deletions src/Command/ConsumeTasksCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,26 @@ protected function configure(): void
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$symfonyStyle = new SymfonyStyle($input, $output);
$symfonyStyle = new SymfonyStyle(input: $input, output: $output);

$wait = $input->getOption('wait');
$force = $input->getOption('force');
$lazy = $input->getOption('lazy');
$strict = $input->getOption('strict');
$wait = $input->getOption(name: 'wait');
$force = $input->getOption(name: 'force');
$lazy = $input->getOption(name: 'lazy');
$strict = $input->getOption(name: 'strict');

$dueTasks = $this->scheduler->getDueTasks(true === $lazy, true === $strict)->filter(static fn (TaskInterface $task): bool => !$task instanceof ProbeTask);
$dueTasks = $this->scheduler->getDueTasks(lazy: true === $lazy, strict: true === $strict)->filter(static fn (TaskInterface $task): bool => !$task instanceof ProbeTask);
if (0 === $dueTasks->count() && false === $wait) {
$symfonyStyle->warning('No due tasks found');
$symfonyStyle->warning(message: 'No due tasks found');

return self::SUCCESS;
}

if (false === $force) {
$nonPausedTasks = $dueTasks->filter(static fn (TaskInterface $task): bool => $task->getState() !== TaskInterface::PAUSED);
$nonPausedTasks = $dueTasks->filter(filter: static fn (TaskInterface $task): bool => $task->getState() !== TaskInterface::PAUSED);
if (0 === $nonPausedTasks->count()) {
$symfonyStyle->warning([
$symfonyStyle->warning(message: [
'Each tasks has already been executed for the current minute',
sprintf('Consider calling this command again at "%s"', (new DateTimeImmutable('+ 1 minute'))->format('Y-m-d h:i')),
sprintf('Consider calling this command again at "%s"', (new DateTimeImmutable(datetime: '+ 1 minute'))->format(format: 'Y-m-d h:i')),
]);

return self::SUCCESS;
Expand All @@ -135,55 +135,56 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$stopOptions = [];

if (null !== $limit = $input->getOption('limit')) {
if (null !== $limit = $input->getOption(name: 'limit')) {
$stopOptions[] = sprintf('%s task%s %s been consumed', $limit, (int) $limit > 1 ? 's' : '', (int) $limit > 1 ? 'have' : 'has');
$this->eventDispatcher->addSubscriber(new StopWorkerOnTaskLimitSubscriber((int) $limit, $this->logger));
$this->eventDispatcher->addSubscriber(subscriber: new StopWorkerOnTaskLimitSubscriber((int) $limit, $this->logger));
}

if (null !== $timeLimit = $input->getOption('time-limit')) {
if (null !== $timeLimit = $input->getOption(name: 'time-limit')) {
$stopOptions[] = sprintf('it has been running for %d seconds', $timeLimit);
$this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitSubscriber((int) $timeLimit, $this->logger));
$this->eventDispatcher->addSubscriber(subscriber: new StopWorkerOnTimeLimitSubscriber((int) $timeLimit, $this->logger));
}

if (null !== $failureLimit = $input->getOption('failure-limit')) {
if (null !== $failureLimit = $input->getOption(name: 'failure-limit')) {
$stopOptions[] = sprintf('%d task%s %s failed', $failureLimit, (int) $failureLimit > 1 ? 's' : '', (int) $failureLimit > 1 ? 'have' : 'has');
$this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitSubscriber((int) $failureLimit, $this->logger));
$this->eventDispatcher->addSubscriber(subscriber: new StopWorkerOnFailureLimitSubscriber((int) $failureLimit, $this->logger));
}

if ([] !== $stopOptions) {
$last = array_pop($stopOptions);
$stopsWhen = ([] !== $stopOptions ? implode(', ', $stopOptions).' or ' : '').$last;
$symfonyStyle->comment([
$symfonyStyle->comment(message: [
'The worker will automatically exit once:',
sprintf('- %s', $stopsWhen),
]);
}

if (true === $wait) {
$symfonyStyle->note('The worker will wait for tasks every minutes');
$symfonyStyle->note(message: 'The worker will wait for tasks every minutes');
}

$symfonyStyle->comment('Quit the worker with CONTROL-C.');
$symfonyStyle->comment(message: 'Quit the worker with CONTROL-C.');

if (OutputInterface::VERBOSITY_VERY_VERBOSE !== $output->getVerbosity()) {
$symfonyStyle->note(sprintf('The task%s output can be displayed if the -vv option is used', $dueTasks->count() > 1 ? 's' : ''));
$symfonyStyle->note(message: sprintf('The task%s output can be displayed if the -vv option is used', $dueTasks->count() > 1 ? 's' : ''));
}

if ($output->isVeryVerbose()) {
$this->registerOutputSubscriber($symfonyStyle);
$this->registerOutputSubscriber(symfonyStyle: $symfonyStyle);
}

$this->registerWorkerSleepingListener($symfonyStyle);
$this->registerTaskExecutedSubscriber($symfonyStyle);
$this->registerWorkerSleepingListener(symfonyStyle: $symfonyStyle);
$this->registerTaskExecutedSubscriber(symfonyStyle: $symfonyStyle);

$workerConfiguration = WorkerConfiguration::create();
$workerConfiguration->mustStrictlyCheckDate(true === $strict);
$workerConfiguration->mustSleepUntilNextMinute(true === $wait);
$workerConfiguration->mustStrictlyCheckDate(mustStrictlyCheckDate: true === $strict);
$workerConfiguration->mustSleepUntilNextMinute(sleepUntilNextMinute: true === $wait);
$workerConfiguration->lockTask(lock: true);

try {
$this->worker->execute($workerConfiguration);
$this->worker->execute(configuration: $workerConfiguration);
} catch (Throwable $throwable) {
$symfonyStyle->error([
$symfonyStyle->error(message: [
'An error occurred when executing the tasks',
$throwable->getMessage(),
]);
Expand All @@ -196,28 +197,28 @@ protected function execute(InputInterface $input, OutputInterface $output): int

private function registerOutputSubscriber(SymfonyStyle $symfonyStyle): void
{
$this->eventDispatcher->addListener(TaskExecutedEvent::class, static function (TaskExecutedEvent $event) use ($symfonyStyle): void {
$this->eventDispatcher->addListener(eventName: TaskExecutedEvent::class, listener: static function (TaskExecutedEvent $event) use ($symfonyStyle): void {
$output = $event->getOutput();
if (null === $output->getOutput()) {
return;
}

$symfonyStyle->note(sprintf('Output for task "%s":', $event->getTask()->getName()));
$symfonyStyle->text($output->getOutput());
$symfonyStyle->note(message: sprintf('Output for task "%s":', $event->getTask()->getName()));
$symfonyStyle->text(message: $output->getOutput());
});
}

private function registerTaskExecutedSubscriber(SymfonyStyle $symfonyStyle): void
{
$this->eventDispatcher->addListener(TaskExecutedEvent::class, static function (TaskExecutedEvent $event) use ($symfonyStyle): void {
$this->eventDispatcher->addListener(eventName: TaskExecutedEvent::class, listener: static function (TaskExecutedEvent $event) use ($symfonyStyle): void {
$task = $event->getTask();
$output = $event->getOutput();

$taskExecutionDuration = Helper::formatTime((int) $task->getExecutionComputationTime() / 100);
$taskExecutionMemoryUsage = Helper::formatMemory($task->getExecutionMemoryUsage());

if (in_array($task->getExecutionState(), [TaskInterface::TO_RETRY, TaskInterface::INCOMPLETE], true)) {
$symfonyStyle->warning([
if (in_array(needle: $task->getExecutionState(), haystack: [TaskInterface::TO_RETRY, TaskInterface::INCOMPLETE], strict: true)) {
$symfonyStyle->warning(message: [
sprintf('The task "%s" cannot be executed fully', $task->getName()),
'The task will be retried next time',
]);
Expand All @@ -226,23 +227,23 @@ private function registerTaskExecutedSubscriber(SymfonyStyle $symfonyStyle): voi
}

if (Output::ERROR === $output->getType()) {
$symfonyStyle->error([
$symfonyStyle->error(message: [
sprintf('Task "%s" failed. (Duration: %s, Memory used: %s)', $task->getName(), $taskExecutionDuration, $taskExecutionMemoryUsage),
]);

return;
}

$symfonyStyle->success([
$symfonyStyle->success(message: [
sprintf('Task "%s" succeed. (Duration: %s, Memory used: %s)', $task->getName(), $taskExecutionDuration, $taskExecutionMemoryUsage),
]);
});
}

private function registerWorkerSleepingListener(SymfonyStyle $symfonyStyle): void
{
$this->eventDispatcher->addListener(WorkerSleepingEvent::class, static function (WorkerSleepingEvent $event) use ($symfonyStyle): void {
$symfonyStyle->info(sprintf('The worker is currently sleeping during %d seconds', $event->getSleepDuration()));
$this->eventDispatcher->addListener(eventName: WorkerSleepingEvent::class, listener: static function (WorkerSleepingEvent $event) use ($symfonyStyle): void {
$symfonyStyle->info(message: sprintf('The worker is currently sleeping during %d seconds', $event->getSleepDuration()));
});
}
}
8 changes: 4 additions & 4 deletions src/Command/DebugMiddlewareCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public function __construct(
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$style = new SymfonyStyle($input, $output);
$style = new SymfonyStyle(input: $input, output: $output);

$schedulerMiddlewareList = $this->schedulerMiddlewareStack->getMiddlewareList();
if ([] === $schedulerMiddlewareList) {
$style->warning('No middleware found for the scheduling phase');
$style->warning(message: 'No middleware found for the scheduling phase');
} else {
$style->info(sprintf('Found %d middleware for the scheduling phase', count($schedulerMiddlewareList)));
$style->info(message: sprintf('Found %d middleware for the scheduling phase', count($schedulerMiddlewareList)));

$schedulerTable = new Table($output);
$schedulerTable = new Table(output: $output);
$schedulerTable->setHeaders(['Name', 'PreScheduling', 'PostScheduling', 'Priority', 'Required']);
$schedulerTable->addRows(array_map(static fn (PostExecutionMiddlewareInterface|PreExecutionMiddlewareInterface|PreSchedulingMiddlewareInterface|PostSchedulingMiddlewareInterface|RequiredMiddlewareInterface|OrderedMiddlewareInterface $middleware): array => [
(new ReflectionClass($middleware))->getShortName(),
Expand Down
Loading