Skip to content

Conversation

@ASCE-D
Copy link
Contributor

@ASCE-D ASCE-D commented Nov 14, 2025

Summary by CodeRabbit

  • Bug Fixes

    • Improved message handling when cancellation occurs—buffered content is now properly flushed before tasks are cancelled.
    • Enhanced cleanup and status tracking after task completion or cancellation to prevent orphaned processes.
  • Refactor

    • Strengthened background task tracking to better monitor and manage active operations.
    • Refined cancellation logic to ensure consistent state across agent and message regeneration flows.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 14, 2025

Walkthrough

This PR adds cancellation and task management improvements across the agent execution pipeline: buffer flushing before task cancellation in agent tasks, enhanced status computation in base task cleanup, dependency injection for session and Redis management in conversation service, and explicit task ID tracking for revocation and session cleanup.

Changes

Cohort / File(s) Summary
Cancellation & Buffer Flushing
app/celery/tasks/agent_tasks.py
Flush buffered AI message chunks before cancellation on both agent and regeneration paths; return completion status boolean to distinguish successful execution from cancellation; add MessageType import for regeneration buffering.
Base Task Status Management
app/celery/tasks/base_task.py
Compute task status as "cancelled" if retval is False, otherwise "completed successfully"; wrap DB cleanup in try/finally block to ensure cleanup occurs regardless of logging or status computation errors.
Session & Stop Generation
app/modules/conversations/conversation/conversation_service.py
Inject session_service and redis_manager via constructor (with defaults); refactor stop_generation to fetch active session if run_id missing, set cancellation flag, revoke Celery task, and clear session via redis_manager.
Task ID Tracking
app/modules/conversations/conversations_router.py
Capture AsyncResult from background task invocation and store task ID via redis_manager.set_task_id() with logging.
Session & Task Management Utilities
app/modules/conversations/utils/redis_streaming.py
Add set_task_id() (10-min expiry), get_task_id(), and clear_session() (publishes "cancelled" end event, marks status) to RedisStreamManager.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Router as conversations_router
    participant ConvService as conversation_service
    participant AgentTask as agent_tasks
    participant Redis as redis_streaming
    participant Celery
    
    Client->>Router: POST cancel/stop_generation
    Router->>ConvService: stop_generation(run_id)
    alt run_id missing
        ConvService->>ConvService: Fetch active session via session_service
        alt No active session
            ConvService-->>Client: Success (no active session)
        end
    end
    ConvService->>Redis: Set cancellation flag
    ConvService->>Redis: get_task_id(conversation_id, run_id)
    Redis-->>ConvService: task_id
    ConvService->>Celery: revoke(task_id, terminate=True)
    Celery-->>AgentTask: Cancellation signal
    AgentTask->>AgentTask: Flush buffered message chunks
    AgentTask->>Redis: Publish flushed chunks
    AgentTask->>Redis: Return completion status (False)
    ConvService->>Redis: clear_session(conversation_id, run_id)
    Redis->>Redis: Publish end event (status=cancelled)
    Redis-->>ConvService: Session cleared
    ConvService-->>Client: Cancellation signal sent & task revoked
Loading

Estimated Code Review Effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Areas requiring extra attention:

  • agent_tasks.py — Verify buffer flushing logic does not interfere with task cancellation flow; ensure completion status (boolean) correctly propagates to on_success.
  • base_task.py — Confirm try/finally structure guarantees DB cleanup on all paths; validate "cancelled" vs. "completed successfully" status computation.
  • conversation_service.py — Validate DI initialization of session_service and redis_manager (both with defaults); verify fallback logic when run_id is missing and no active session exists.
  • Cascading task ID tracking — Ensure task ID is consistently set in Router, retrieved in ConversationService, and used correctly in redis_streaming for revocation.

Possibly Related PRs

Suggested Reviewers

  • dhirenmathur

Poem

🐰 Buffers flush before the signal rings,
Cancelled tasks shed their pending wings,
Sessions cleared, IDs tracked with care,
Completion status floats through the air!
Celery tasks bow and depart with grace,

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.37% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The title partially relates to the changeset but uses vague phrasing. 'stop functionality' is accurate but 'partial db flush' is misleading—the changes primarily implement task cancellation and message flushing, not database flushing. The title doesn't capture the core accomplishment: coordinated Celery task revocation with message buffering. Revise to more accurately reflect the main changes, e.g., 'Implement task cancellation with message buffering and task tracking' or 'Add graceful Celery task revocation with buffered flush logic.'
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat-task-stop

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sonarqubecloud
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
B Maintainability Rating on New Code (required ≥ A)
B Security Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
app/celery/tasks/base_task.py (1)

85-91: DB cleanup improvement looks good; clarify retval contract

Wrapping the status log and DB cleanup in a try/finally is a solid improvement and ensures SessionLocal() is always closed and dereferenced.

This now implicitly assumes that BaseTask tasks return a strict boolean, where False means “cancelled” and anything else means “completed successfully”. If any existing tasks using BaseTask return False in non‑cancellation cases, their logs will now say “cancelled”.

If that contract is intentional and consistent across all base=BaseTask tasks, this change is fine; otherwise, consider either:

  • documenting the convention (“BaseTask tasks must return True/False for success/cancel”), or
  • narrowing the “cancelled” check to only the specific tasks that actually use this pattern.
app/modules/conversations/utils/redis_streaming.py (1)

200-234: Task‑ID tracking and clear_session behavior look consistent; consider richer error logging

The new set_task_id/get_task_id APIs and clear_session flow are consistent with the existing Redis naming/TTL patterns and give stop_generation what it needs to revoke Celery tasks and mark sessions as cancelled.

Given clear_session is intentionally best‑effort and non‑throwing, the broad except Exception is reasonable. If you want more debuggability with minimal behavior change, consider:

  • logging with exc_info=True (or logger.exception) so stack traces are preserved, and/or
  • narrowing the catch to Redis‑related exceptions.

Functionally this looks correct as‑is.

app/celery/tasks/agent_tasks.py (3)

103-116: Cancellation now flushes partial AI output before ending

The added flush in the agent cancellation path ensures any buffered AI‑generated chunks are persisted before you emit the "end"/"cancelled" event and bail out, which is exactly what you want for partial‑response durability.

Swallowing all exceptions from flush_message_buffer with a warning keeps cancellation robust; if you want more diagnosability without changing behavior, you could log with exc_info=True or catch ChatHistoryServiceError explicitly and let truly unexpected exceptions propagate.


284-297: Regenerate cancellation flush mirrors agent behavior correctly

The regeneration path now mirrors the agent path by flushing MessageType.AI_GENERATED from service.history_manager on cancellation, logging the resulting message ID, and warning (but not failing) if the flush raises.

This keeps the regeneration flow’s persistence semantics aligned with the main agent execution and still ensures cancellation proceeds even if DB writes fail.


156-180: Returning a boolean completion flag—update type hints or document the convention

Both background tasks now return completed: bool from the outer Celery task function so that BaseTask.on_success can distinguish cancellation (False) from normal completion (True). Given the inner coroutines only ever return True or False (exceptions go to the except block), this is a clear and reliable signal.

Two minor nits to consider:

  • The function signatures still declare -> None, which no longer matches the implementation. If you care about static typing, consider changing the annotations to -> bool or dropping the explicit return type.
  • It would be worth briefly documenting (in a docstring or comment) that these tasks follow the “True = completed, False = cancelled” convention so future BaseTask subclasses can depend on it consistently.

Also applies to: 347-371

app/modules/conversations/conversation/conversation_service.py (1)

1080-1145: stop_generation pipeline is well‑structured; response message could be more precise

The updated stop_generation flow is coherent end‑to‑end:

  • If run_id is omitted, it uses SessionService.get_active_session to find the latest session and returns early with a success message when none exists.
  • It always sets the Redis cancellation flag so background tasks polling check_cancellation can exit gracefully.
  • When a stored task ID is present, it revokes the Celery task with terminate=True, covering both queued and running tasks, and logs either success or failure of the revoke.
  • Regardless of task ID, it calls clear_session so clients see an "end" event with a cancelled status and the session status is updated, which also handles stale session IDs.

One small improvement: the final response message is always "Cancellation signal sent and task revoked", even when:

  • no task ID was found, or
  • revocation raised and you logged a warning.

To avoid misleading callers, consider wording like "Cancellation signal sent; any associated task will be revoked if possible" or tailoring the message based on whether a task_id was actually found and revoked.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 617e683 and d35a819.

📒 Files selected for processing (5)
  • app/celery/tasks/agent_tasks.py (5 hunks)
  • app/celery/tasks/base_task.py (1 hunks)
  • app/modules/conversations/conversation/conversation_service.py (6 hunks)
  • app/modules/conversations/conversations_router.py (3 hunks)
  • app/modules/conversations/utils/redis_streaming.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
app/celery/tasks/agent_tasks.py (2)
app/modules/intelligence/memory/chat_history_service.py (1)
  • flush_message_buffer (83-133)
app/modules/conversations/message/message_model.py (1)
  • MessageType (17-20)
app/modules/conversations/conversation/conversation_service.py (4)
app/modules/conversations/session/session_service.py (2)
  • SessionService (15-163)
  • get_active_session (23-98)
app/modules/conversations/utils/redis_streaming.py (4)
  • RedisStreamManager (11-247)
  • set_cancellation (182-186)
  • get_task_id (206-210)
  • clear_session (212-233)
app/modules/conversations/conversation/conversation_schema.py (1)
  • ActiveSessionErrorResponse (78-80)
app/modules/conversations/conversations_router.py (1)
  • get_active_session (568-594)
app/modules/conversations/conversations_router.py (2)
app/celery/tasks/agent_tasks.py (2)
  • execute_agent_background (16-201)
  • execute_regenerate_background (209-391)
app/modules/conversations/utils/redis_streaming.py (1)
  • set_task_id (200-204)
app/modules/conversations/utils/redis_streaming.py (1)
tests/conftest.py (1)
  • get (142-150)
🪛 Ruff (0.14.4)
app/celery/tasks/agent_tasks.py

112-112: Do not catch blind exception: Exception

(BLE001)


114-114: Use explicit conversion flag

Replace with conversion flag

(RUF010)


180-180: Consider moving this statement to an else block

(TRY300)


293-293: Do not catch blind exception: Exception

(BLE001)


295-295: Use explicit conversion flag

Replace with conversion flag

(RUF010)


371-371: Consider moving this statement to an else block

(TRY300)

app/modules/conversations/conversation/conversation_service.py

1122-1122: Do not catch blind exception: Exception

(BLE001)


1123-1123: Use explicit conversion flag

Replace with conversion flag

(RUF010)


1136-1136: Do not catch blind exception: Exception

(BLE001)


1138-1138: Use explicit conversion flag

Replace with conversion flag

(RUF010)

app/modules/conversations/utils/redis_streaming.py

230-230: Do not catch blind exception: Exception

(BLE001)


231-233: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


232-232: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🔇 Additional comments (3)
app/modules/conversations/conversations_router.py (2)

358-372: Capturing and storing agent task ID is correct and aligns with stop flow

Using the AsyncResult from execute_agent_background.delay(...) and persisting task_result.id via redis_manager.set_task_id cleanly connects request initiation to the later revocation path in ConversationService.stop_generation.

Given that you already set status to "queued" and publish the queued event before starting the task, this fits the existing semantics with no extra failure modes.


493-505: Regenerate task ID tracking mirrors agent path appropriately

The regenerate endpoint now mirrors the agent endpoint by capturing the Celery AsyncResult and storing task_result.id in Redis. This keeps the stop/cancellation semantics consistent across both flows and reuses the same Redis keying scheme.

No functional issues here.

app/modules/conversations/conversation/conversation_service.py (1)

88-107: SessionService/RedisStreamManager DI looks sound and backward‑compatible

Injecting SessionService and RedisStreamManager via __init__ while still defaulting to concrete instances—and wiring them through ConversationService.create()—keeps the public construction path intact but makes testing and future customization much easier.

Storing self.celery_app here is a reasonable compromise to allow stop‑related logic inside the service without forcing every caller to pass a Celery app explicitly. I don’t see any obvious behavioral regressions from these changes.

Also applies to: 128-147

@nndn nndn self-requested a review November 20, 2025 06:54
@nndn nndn merged commit 6027079 into main Nov 20, 2025
1 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants