Skip to content

feat: Add StatusMessageWatcher #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jun 13, 2025
Merged

feat: Add StatusMessageWatcher #407

merged 29 commits into from
Jun 13, 2025

Conversation

Pijukatel
Copy link
Contributor

@Pijukatel Pijukatel commented May 20, 2025

Description

Add the option to log status and status messages of another Actor run. If such Actor run is also redirecting logs from another Actor run, then those logs can propagate all the way to the top through the StreamedLog - deep redirection. If the Actor run from which we are redirecting status messages is not redirecting logs from its children, then the log and status redirection remains shallow - only from that Actor run.

Example usage:

Through ActorClient(Async).call

By default it is turned on, so just calling actor like this already redirects the logs to pre-configured logger:

actor_client = ApifyClientAsync(token='token').actor(actor_id='actor_id')
await actor_client.call()

To turn off, pass logger=None

...
await actor_client.call(logger=None)

Or pass custom logger, if desired:

...
await actor_client.call(logger=some_other_logger)

Through RunClient(Async).get_status_message_watcher

With context:

run_client=ApifyClientAsync(token='token').run(run_id='run_id')
status_message_watcher = await run_client.get_status_message_watcher()

async with status_message_watcher:
    # Do stuff while the status from the other actor is being redirected to the logs. Leaving the context stops the redirection.
    ...

Manually:

...
status_message_watcher.start()
# Do stuff while the status from the other actor is being redirected to the logs.
await status_message_watcher.stop()

Issues

@github-actions github-actions bot added this to the 115th sprint - Tooling team milestone May 20, 2025
@github-actions github-actions bot added t-tooling Issues with this label are in the ownership of the tooling team. tested Temporary label used only programatically for some analytics. labels May 20, 2025
@Pijukatel
Copy link
Contributor Author

Pijukatel commented May 20, 2025

Example log of recursive Actor run with highlighted redirected status messages
image

Example Actor:
https://console.apify.com/actors/3QIkuY6bClKBxC9wK

@Pijukatel Pijukatel force-pushed the redirect-status-messages branch from e903a68 to 18f4f51 Compare May 21, 2025 07:51
@Pijukatel Pijukatel requested a review from Copilot May 21, 2025 07:54
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds the new StatusMessageRedirector utility to stream status and status messages from a target Actor run (both shallow and deep redirection), integrates it into the client APIs, and updates tests to cover the new behavior.

  • Introduce StatusMessageRedirector (sync & async) in log.py
  • Add get_status_message_redirector to RunClient (sync & async) in run.py
  • Hook status redirection into ActorClient.call (sync & async) in actor.py
  • Update and extend test_logging.py to exercise status message streaming

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
tests/unit/test_logging.py Add fixtures and new tests for StatusMessageRedirector
src/apify_client/clients/resource_clients/run.py New get_status_message_redirector methods for sync and async runs
src/apify_client/clients/resource_clients/log.py Core StatusMessageRedirector, StatusMessageRedirectorSync/Async
src/apify_client/clients/resource_clients/actor.py Wire up status redirector into ActorClient.call sync & async variants
Comments suppressed due to low confidence (5)

tests/unit/test_logging.py:386

  • [nitpick] We should also add a test for the manual start()/stop() usage of StatusMessageRedirector (outside of a context manager) to ensure that flow is covered.
@respx.mock

src/apify_client/clients/resource_clients/run.py:349

  • create_redirect_logger is used here but not imported; add from apify_client._logging import create_redirect_logger at the top.
to_logger = create_redirect_logger(f'apify.{name}')

src/apify_client/clients/resource_clients/log.py:455

  • asyncio.create_task is called but asyncio is not imported; add import asyncio.
self._logging_task = asyncio.create_task(self._log_changed_status_message())

tests/unit/test_logging.py:168

  • This fixture mutates a class‐level variable without resetting it, which can leak state across tests; consider using yield and restoring the original value after the test.
StatusMessageRedirector._force_propagate = True

src/apify_client/clients/resource_clients/run.py:337

  • Docstring says it returns StatusMessageRedirector, but in sync/async methods it actually returns StatusMessageRedirectorSync/StatusMessageRedirectorAsync; clarify the concrete return type.
Returns:
            `StatusMessageRedirector` instance for redirected logs.

@Pijukatel Pijukatel marked this pull request as ready for review May 21, 2025 08:09
@Pijukatel Pijukatel requested review from vdusek and janbuchar May 21, 2025 08:10
@janbuchar
Copy link
Contributor

Just a question - are we planning to also show the status message of the callee as the status message of the caller? Or display something like "Waiting for ... to finish"?

actor_name = actor_data.get('name', '') if run_data else ''

if not to_logger:
name = '-'.join(part for part in (actor_name, run_id) if part)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it's a good idea to include the run id by default? The user won't know what it is and it makes the logline kinda messy, especially with nested invocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actor name is for readability and run id is for uniqueness and ability to track the log to it's origin. Keeping only the actor name would make it harder to find the origin of the log.
Depending on the use-case this might seem less or more relevant. Maybe it could be something like f"{actor_name}-runId:{run_id}" to make it more explicit and clear what the id refers to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in which case do you actually need to track the log to its origin? Isn't the most frequent case calling a single Actor and showing the status so that there's some activity in the log?

Also, a - may not be an ideal separator, those appear in Actor names quite often.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Any scenario where one actor is calling two different runs of another actor. There it would be crucial, but it is not a common use case.
  2. In normal "one-to-one" scenario it is useful to easily navigate to the relevant run of the called actor. That I do not think is uncommon scenario. Is there any other convenient way how to navigate to the called actor run that would make this information redundant in the logs?

About separator I am fine with anything. In logger it can be even white space I guess. Do you have any preference there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case 1, it would be nice if the user could opt into logging the run id.

In case 2, I'd prefer to just log the run id when starting the Actor. But don't take it as an authoritative answer, I'm just concerned about printing the run id a zillion times on each line.

As a separator, a space sounds like a good idea. Maybe with some arrow or >> for nested Actor runs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I do not want to add any arguments for customization of the default logger is that it will be part of functions that already have many arguments, for example actor.call . So my idea was, either use default without any customization options, or create your own logger and nothing in between. This gives you all the freedom and does not add too many new arguments to functions that already have too many of them.

What about following idea to reduce the run id spam in the logs. Each redirected logger will first print some full identification message, like: "Redirect logger for actor XYZ and run id ABCDEF... will be shown as {some alias}"

Then we can talk about this alias. Which can be for example "apify.{actor_name} {redirect_logger_counter}" or "apify.redirect {n}" or something else ...?

So it will be shorter and you will still be able to uniquely identify the origin of the redirected log with the usage of the first log message.

BUT!!! Going on with such approach you will not be able to identify deeply nested redirected loggers if you choose an option to not redirect from the start of the actor run (in case of long running standby actors) as you will miss the first redirected logger identification message.

Having that in mind, I still feel that full info in each message is the safest for the default logger, even though it is quite verbose. I will add documentation once it is also integrated into SDK that will show how easy it is to create logger that allows the customization.

check_period: The period with which the status message will be polled.
"""
self._to_logger = to_logger
self._to_logger.propagate = self._force_propagate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not nice to silently reconfigure a logger passed in by an unsuspecting client 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It should have been reconfigured only if the internal _force_propagate is changed to True, which is only the test use-case for being able to use caplog.

@Pijukatel
Copy link
Contributor Author

Pijukatel commented May 28, 2025

Just a question - are we planning to also show the status message of the callee as the status message of the caller? Or display something like "Waiting for ... to finish"?

Currently this change is only about logging callee stuff. The status message of the caller is already available in the actor run. So if someone wants to duplicate it into logs then I think they can do it by manually both setting the status message and logging the status message, but I would not do it by default for now.

@Pijukatel Pijukatel changed the title feat: Add StatusMessageRedirector feat: Add StatusMessageWatcher May 28, 2025
@Pijukatel Pijukatel requested a review from janbuchar June 11, 2025 08:59
Comment on lines 452 to 477
def start(self) -> Task:
"""Start the logging task. The caller has to handle any cleanup by manually calling the `stop` method."""
if self._logging_task:
raise RuntimeError('Logging task already active')
self._logging_task = asyncio.create_task(self._log_changed_status_message())
return self._logging_task

def stop(self) -> None:
"""Stop the logging task."""
if not self._logging_task:
raise RuntimeError('Logging task is not active')

self._logging_task.cancel()
self._logging_task = None

async def __aenter__(self) -> Self:
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
self.start()
return self

async def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""Cancel the logging task."""
await asyncio.sleep(self._final_sleep_time_s)
self.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to remove start/stop and implement __aenter__ / __aexit__ directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__aenter__ / __aexit__ are there as well for convenience, but start and stop is exposed for flexibility to make it possible for the users to call these methods outside of sometimes limiting context manager and without the need to call double-underscored methods directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm not a fan of exposing additional start/stop methods just to provide an alternative way to manually control what the context manager takes care of. If a user really wants to call enter / exit methods directly, I think he can just do it directly, rather than introducing extra methods that just duplicate that logic. Not gonna block the merge though, up to you.

Copy link
Contributor

@janbuchar janbuchar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM, some random nits

actor_runs_responses = iter(
(
httpx.Response(
def create_status_responses_generator() -> Iterator[httpx.Response]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - I think you can omit the create_ prefix here and make the function name a bit less java-esque 😁

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

@@ -378,3 +382,160 @@ async def _stream_log(self) -> None:

# If the stream is finished, then the last part will be also processed.
self._log_buffer_content(include_last_part=True)


class StatusMessageWatcher:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these new classes be exposed publicly? They seem like implementation details to me - you usually create these using helper methods on the resource client, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helper methods on clients are convenient constructors for these classes, but the user will interact with them directly calling either start, close or using them as context managers.

(From ActorClient point of view this is indeed implementation detail hidden in the call method, but from RunClient point of view it is actual public return value of one of the public method.)

if not self._logging_task:
raise RuntimeError('Logging task is not active')

self._logging_task.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid there might be GC-related warnings if you don't await the task (docs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added awaits

@Pijukatel Pijukatel force-pushed the redirect-status-messages branch from 842777c to 350fc67 Compare June 13, 2025 09:59
@Pijukatel Pijukatel requested review from janbuchar and vdusek June 13, 2025 11:06
Copy link
Contributor

@vdusek vdusek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Pijukatel Pijukatel merged commit a535512 into master Jun 13, 2025
51 of 52 checks passed
@Pijukatel Pijukatel deleted the redirect-status-messages branch June 13, 2025 11:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
t-tooling Issues with this label are in the ownership of the tooling team. tested Temporary label used only programatically for some analytics.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Redirect status messages from other actors
3 participants