Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions src/vercel/_internal/workflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
P = ParamSpec("P")
T = TypeVar("T")
SUSPENDED_MESSAGE = "<WORKFLOW SUSPENDED>"
logger = logging.getLogger(__name__)
logger = logging.getLogger("vercel.workflow")


@dataclasses.dataclass(kw_only=True)
Expand Down Expand Up @@ -518,7 +518,7 @@ async def step_handler(
f"Step '{step.name}' exceeded max retries "
f"({retry_count} {'retry' if retry_count == 1 else 'retries'})"
)
print(f"[Workflows] '{req.workflow_run_id}' - {error_message}")
logger.error("[Workflows] '%s' - %s", req.workflow_run_id, error_message)

# Fail the step via event
await world.events_create(
Expand All @@ -541,10 +541,12 @@ async def step_handler(
try:
# Check step status
if step_run.status not in ["pending", "running"]:
print(
f"[Workflows] '{req.workflow_run_id}' - Step invoked erroneously, "
f"expected status 'pending' or 'running', got '{step_run.status}' instead, "
f"skipping execution"
logger.warning(
"[Workflows] '%s' - Step invoked erroneously, "
"expected status 'pending' or 'running', got '%s' instead, "
"skipping execution",
req.workflow_run_id,
step_run.status,
)

# Re-enqueue workflow if step is in terminal state
Expand Down Expand Up @@ -610,11 +612,16 @@ async def step_handler(
f"Step '{step.name}' failed after {step.max_retries} "
f"{'retry' if step.max_retries == 1 else 'retries'}: {str(e)}"
)
print(
f"[Workflows] '{req.workflow_run_id}' - Encountered Error "
f"while executing step '{step.name}' (attempt {step_run.attempt}, "
f"{retry_count} {'retry' if retry_count == 1 else 'retries'}): "
f"{str(e)}\n\n Max retries reached\n Bubbling error to parent workflow"
logger.error(
"[Workflows] '%s' - Encountered Error "
"while executing step '%s' (attempt %d, "
"%d %s): %s\n\n Max retries reached\n Bubbling error to parent workflow",
req.workflow_run_id,
step.name,
step_run.attempt,
retry_count,
"retry" if retry_count == 1 else "retries",
e,
)

# Fail the step via event
Expand All @@ -627,10 +634,14 @@ async def step_handler(
)
else:
# Not at max retries yet - retry the step
print(
f"[Workflows] '{req.workflow_run_id}' - Encountered Error "
f"while executing step '{step.name}' (attempt {current_attempt}): "
f"{str(e)}\n\n This step has failed but will be retried"
logger.warning(
"[Workflows] '%s' - Encountered Error "
"while executing step '%s' (attempt %d): "
"%s\n\n This step has failed but will be retried",
req.workflow_run_id,
step.name,
current_attempt,
e,
)

# Set step to pending for retry
Expand Down