Skip to content

[Feat] /stream/v2 백엔드 SSE 어댑터#117

Merged
haein45 merged 3 commits into
devfrom
feat/116-stream-v2-adapter
Jun 2, 2026
Merged

[Feat] /stream/v2 백엔드 SSE 어댑터#117
haein45 merged 3 commits into
devfrom
feat/116-stream-v2-adapter

Conversation

@haein45

@haein45 haein45 commented Jun 2, 2026

Copy link
Copy Markdown
Collaborator

📌 관련 이슈

🏷️ PR 타입

  • ✨ 기능 추가 (Feature)
  • 🐛 버그 수정 (Bug Fix)
  • ♻️ 리팩토링 (Refactoring)
  • 📝 문서 수정 (Documentation)
  • 🎨 스타일 변경 (Style)
  • ✅ 테스트 추가 (Test)

📝 작업 내용

백엔드(Proovy-server)가 호출하는 POST /stream/v2 + GET /health 계약에 맞춰 SSE 어댑터를 추가했습니다. 내부 그래프·emitter·envelope 설계는 그대로 두고 경계에서만 변환합니다 — /solve(내부 envelope)는 dev/e2e용으로 유지.

  • events.py: to_stream_v2 추가 — token→llm.token.delta, done→run.completed, error→run.failed, 그 외 내부 이벤트는 None으로 drop. 모든 datathread_id 포함(백엔드가 첫 턴 threadId 영속화에 사용)
  • emitter.stream(serializer=...) 파라미터화 — 직렬화 결과가 None이면 skip. /solveto_sse, /stream/v2to_stream_v2
  • app/api/_runner.py: start_graph_task 공유 실행 헬퍼로 solve.py_run 추출 (/solve·/stream/v2 단일 소스, dead code 방지)
  • app/schemas/stream.py: StreamInput — 백엔드 camelCase(message/threadId/userId/...) 수용, 내부 매핑(message→problem 등). credit/auth/files 필드는 v1 수용만(백엔드 책임)
  • app/api/stream.py: POST /stream/v2 + GET /health, main.py에 prefix 없이 등록(ingress가 /ai strip 가정)
  • solve.py: 공유 runner로 리팩터 — 동작·기존 테스트 동일

백엔드 계약 매핑:

목적 event: data
토큰 스트림 llm.token.delta {"delta", "thread_id"}
성공 종료 run.completed {"thread_id"}
실패 종료 run.failed {"message", "thread_id"}

멀티턴(threadId 왕복 + Postgres 체크포인트)은 양쪽 설계가 호환 → 계약을 맞추면 기존 동작 그대로 됩니다.

📸 스크린샷

tests/app/test_stream_endpoint.py + test_events.py(to_stream_v2)  →  36 passed
전체 스위트(외부 서비스 마커 제외)                                  →  235 passed

실서버 /stream/v2 호출(백엔드와 동일 camelCase 요청) — 실제 LLM 스트림:

$ curl -N -X POST localhost:8011/stream/v2 \
    -d '{"message":"2x + 3 = 7 을 풀어줘.","userId":"u1","threadId":"v2-demo","chosenFeatures":["solve"]}'

event: llm.token.delta
data: {"delta": "네", "thread_id": "v2-demo"}
...
event: llm.token.delta
data: {"delta": "최종 답은 **x = 2** 입니다.", "thread_id": "v2-demo"}

event: run.completed
data: {"thread_id": "v2-demo"}

$ curl -s localhost:8011/health
{"status":"ok"}

내부 이벤트(page_start/tool_*/node_result/credit_settled)는 seq 건너뜀으로 drop 확인됨.

✅ 체크리스트

  • 코드 리뷰를 받을 준비가 완료되었습니다
  • 테스트를 작성하고 모두 통과했습니다
  • 문서를 업데이트했습니다 (필요한 경우)
  • 코드 스타일 가이드를 준수했습니다
  • 셀프 리뷰를 완료했습니다

📎 기타 참고사항

  • 배포 정렬 필요: ingress가 https://.../ai/* → 앱 /* 매핑(/ai strip)하는지 확인. 그래야 백엔드 proovy.ai.host(.../ai)의 /stream/v2·/health가 본 라우터에 닿습니다.
  • 크레딧·인증(chosenFeatures/authToken)은 백엔드 책임 → AI는 v1에서 수용 후 무시.
  • Last-Event-ID 재연결/durable store는 백엔드(BFF) 관할로 본 PR 범위 밖.
  • 프론트/백 동시 배포: 백엔드는 이미 /stream/v2 기대 → 본 어댑터 배포 시점에 PROOVY_AI_HOST가 이 서비스를 가리키게.

Summary by CodeRabbit

  • 새로운 기능

    • 백엔드용 실시간 스트리밍 API(/stream/v2) 및 헬스 체크(/health) 엔드포인트 추가
  • 개선 사항

    • 스트리밍에서 토큰·완료·오류 이벤트만 전달해 불필요한 데이터 제거
    • 요청의 camelCase 입력 지원 및 userId 유효성 검사 강화 (: 포함 불가)
    • 클라이언트 연결 종료 시에도 백그라운드 작업이 중단되지 않도록 처리
  • 테스트

    • 스트리밍/헬스 엔드포인트 관련 자동화 테스트 추가

백엔드(Proovy-server)가 호출하는 /stream/v2 + /health 계약에 맞춰 SSE 어댑터를
추가한다. 내부 그래프·emitter·envelope은 그대로 두고 경계에서만 변환한다.

- events.py: to_stream_v2 — token→llm.token.delta, done→run.completed,
  error→run.failed, 그 외 drop. 모든 data에 thread_id 포함
- emitter.stream(serializer=) 파라미터화 — None 직렬화 결과는 skip
- _runner.start_graph_task — /solve·/stream/v2 공유 실행 헬퍼 (dead code 방지)
- schemas/stream.py StreamInput — camelCase(message/threadId/userId) 수용
- api/stream.py POST /stream/v2 + GET /health, main에 prefix 없이 등록
- solve.py는 공유 runner로 리팩터 (동작·기존 테스트 동일)

멀티턴은 threadId 왕복 + Postgres 체크포인트로 기존 동작 그대로.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@haein45 haein45 self-assigned this Jun 2, 2026
@coderabbitai

coderabbitai Bot commented Jun 2, 2026

Copy link
Copy Markdown

Review Change Stack

Warning

Review limit reached

@haein45, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 12 minutes and 46 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 25e4338e-d192-4d2a-9334-7d374cfe7b1d

📥 Commits

Reviewing files that changed from the base of the PR and between a3f0a57 and 359cc87.

📒 Files selected for processing (2)
  • src/proovy_agent/app/schemas/stream.py
  • tests/app/test_stream_endpoint.py
📝 Walkthrough

Walkthrough

PR은 백엔드 /stream/v2 SSE 계약을 구현하기 위해, SSEEmitter에 직렬화 주입을 추가하고 shared start_graph_task()로 그래프 실행을 fire-and-forget 태스크로 관리한 뒤, camelCase 요청을 ProovyState로 매핑하고 백엔드 어휘(llm.token.delta/run.completed/run.failed)로 SSE를 스트리밍하며 기존 /solve를 리팩터링합니다.

Changes

Stream v2 어댑터 및 SSE 통합

Layer / File(s) Summary
SSE Emitter 직렬화 및 이벤트 매핑 확장
src/proovy_agent/common/sse/emitter.py, src/proovy_agent/common/sse/events.py
SSEEmitter.stream()serializer 파라미터로 확장하여 직렬화 함수를 주입 가능하게 하고, to_stream_v2()를 추가해 TokenEvent/DoneEvent/ErrorEvent를 백엔드 어휘(llm.token.delta/run.completed/run.failed)로 매핑하며 내부 이벤트는 drop합니다.
공유 실행 헬퍼 start_graph_task()
src/proovy_agent/app/api/_runner.py
SSEEmitter를 생성하고 그래프 실행을 fire-and-forget asyncio 태스크로 시작하는 start_graph_task()를 추가해 current_emitter 컨텍스트를 설정하고 user_id 네임스페이스로 thread_id를 구성하여 체크포인트에 주입하며, 완료/취소/예외 상황에서 payload emit과 emitter.close/reset, 태스크 레퍼런스 관리를 수행합니다.
백엔드 요청 스키마 StreamInput
src/proovy_agent/app/schemas/stream.py
message, threadId, userId, filesUrl, chosenFeatures, streamTokens, agentConfig, authToken 필드를 camelCase alias로 수용하는 Pydantic 모델을 추가하고, user_id에 ':' 문자가 포함되면 검증 오류를 발생시킵니다.
/stream/v2 SSE 엔드포인트 및 헬스 체크
src/proovy_agent/app/api/stream.py
GET /health 엔드포인트와 POST /stream/v2 엔드포인트를 추가하여 StreamInput 페이로드를 ProovyState로 변환하고 start_graph_task()를 호출한 뒤 EventSourceResponse(emitter.stream(to_stream_v2), ping=SSE_PING_INTERVAL)으로 SSE를 스트리밍합니다.
/solve 엔드포인트 리팩터링
src/proovy_agent/app/api/v1/solve.py
로컬 _run 태스크의 그래프 실행·done/error emit·취소 처리·emitter 종료·context reset·_active_tasks 관리 로직을 제거하고, start_graph_task()가 반환한 emitter.stream()을 EventSourceResponse로 연결하도록 단순화합니다.
애플리케이션 라우터 등록
src/proovy_agent/app/main.py
stream_router를 FastAPI 애플리케이션에 prefix 없이 등록하여 /health 및 /stream/v2 엔드포인트를 루트 레벨에 노출합니다.
엔드포인트 및 이벤트 매핑 테스트
tests/app/test_stream_endpoint.py, tests/common/sse/test_events.py
/health 응답, 토큰 누적 및 이벤트 매핑(llm.token.delta→run.completed), 내부 이벤트 drop, camelCase 수용 및 thread_id 자동 생성, 그래프 예외 처리(run.failed 매핑), 입력 검증(userId 콜론 금지, message 필수)을 검증하는 단위 및 통합 테스트를 추가합니다.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

혜인

Suggested reviewers

  • chowon442
  • gaeunee2

Poem

🐇 귀여운 토큰이 춤추네,
작은 방울들 llm.token.delta,
완료는 run.completed로 노래하고,
실패면 run.failed이 살짝 울고,
토끼는 다시 숨을 고르며 응답을 전하네.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 65.71% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed PR 제목이 주요 변경 사항을 명확하게 반영하고 있으며, 백엔드 SSE 어댑터 추가라는 핵심 기능을 간결하게 표현합니다.
Description check ✅ Passed PR 설명이 템플릿을 따르고 관련 이슈, PR 타입, 작업 내용, 테스트 결과, 체크리스트를 모두 포함하여 완성도 높게 작성되었습니다.
Linked Issues check ✅ Passed PR의 모든 주요 변경 사항이 #116 이슈의 작업 항목을 완전히 충족합니다. to_stream_v2, emitter.stream(serializer=...), StreamInput, _runner.py, stream.py 추가, thread_id 포함, 테스트 작성이 모두 구현되었습니다.
Out of Scope Changes check ✅ Passed 모든 변경 사항이 #116 이슈 범위 내에 있습니다. 내부 envelope 유지, /solve 엔드포인트 보존, 공유 runner 추출 등 모두 이슈에서 지정한 범위를 따릅니다.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/116-stream-v2-adapter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

요약

백엔드 /stream/v2 vocab 어댑터와 _runner 공유 리팩터 방향은 적절하고, 계약 매핑·drop 동작 테스트가 잘 갖춰져 있습니다. 다만 StreamInput.userId가 빈 문자열을 허용해 /solve와 달리 체크포인트 키 :{thread_id}로 수렴할 수 있어 멀티턴 격리가 약해집니다. 루트 /health는 프로세스 생존만 확인하며, to_stream_v2의 SSE id는 내부 seq를 그대로 써 drop 시 gap이 남을 수 있습니다(백엔드가 id seq를 쓰는지는 추가 확인 필요).


[문제 1]

  • 심각도: High
  • 범주: 보안 / 정확성
  • 근거: StreamInput.user_id 기본값이 ""이고 필수 검증이 없어, userId 생략·빈 값 요청이 200으로 통과합니다. _runner의 체크포인트 키는 f"{user_id}:{thread_id}"이므로 user_id==""이면 키가 :{threadId} 형태가 되어, 서로 다른 호출자가 동일 threadId만 맞추면 같은 Postgres 체크포인트 스레드를 공유할 수 있습니다. /api/v1/solveSolveRequestuser_id를 필수(Field(...))로 두어 이 경로와 불일치합니다.
  • 수정안: SolveRequest와 동일하게 user_id: str = Field(..., alias="userId", min_length=1)로 필수화하거나, validator에서 빈 문자열을 422로 거부합니다. test_stream_v2_missing_user_id_returns_422 같은 회귀 테스트를 추가하는 것이 좋습니다.

[문제 2]

  • 심각도: Medium
  • 범주: 정확성 (추가 확인 필요)
  • 근거: to_stream_v2는 내부 이벤트를 None으로 drop하지만 SSE id는 여전히 f"{thread_id}:{event.seq}"(내부 단조 seq)를 사용합니다. 예: page_start(seq=0) drop 후 첫 토큰이 seq=3이면 wire idth-1:3이 되어 seq 1·2가 비어 보입니다. 백엔드/BFF가 Last-Event-ID의 seq로 gap·재전송을 판단하면 오탐 가능성이 있습니다. PR 설명상 재연결은 BFF 관할이라 영향 범위는 추가 확인 필요입니다.
  • 수정안: (1) 백엔드가 data.thread_id만 쓰고 id seq는 무시한다면 docstring/계약에 명시. (2) seq 기반 판단이 필요하면 v2 전용 wire seq를 별도 카운터로 부여하거나, drop된 이벤트는 id에 반영하지 않도록 매핑합니다.

[문제 3]

  • 심각도: Medium
  • 범주: 유지보수성 / 운영
  • 근거: 루트 GET /health는 항상 {"status":"ok"}만 반환합니다. lifespan에서 초기화한 checkpointer·Daytona·build_graph 실패 여부, DB 연결 불가 등은 반영되지 않아, 백엔드 사전 체크가 “트래픽 투입 가능”과 동치가 아닐 수 있습니다.
  • 수정안: 의도가 liveness만이면 summary/docstring에 “ready 아님”을 명시하거나, /api/v1/health처럼 최소 의존성(예: checkpointer ping)을 검사하는 readiness를 분리합니다.

[문제 4]

  • 심각도: Low
  • 범주: 정확성 (추가 확인 필요)
  • 근거: run.failed payload에 messagethread_id만 포함하고, 내부 ErrorPayload.code(credit_exhausted, timeout 등)는 전달하지 않습니다. 백엔드가 실패 유형별 분기를 code로 한다면 정보 손실입니다.
  • 수정안: 백엔드 계약에 code 필드가 있으면 datacode(및 필요 시 recoverable)를 추가하고 단위 테스트를 보강합니다. 계약에 없으면 현 상태 유지 가능합니다.
Open in Web View Automation 

Sent by Cursor Automation: Chowon Reviewer

Comment thread src/proovy_agent/app/schemas/stream.py Outdated

message: str = Field(..., description="사용자 입력(문제)")
thread_id: str | None = Field(None, alias="threadId", description="대화 맥락 ID (없으면 생성)")
user_id: str = Field("", alias="userId", description="사용자 ID")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[High · 보안/정확성] user_id 기본값 "" + 비필수라 userId 생략 시 200이 됩니다. 체크포인트 키가 :{threadId}로 수렴되어 동일 threadId를 아는 호출자 간 멀티턴 상태가 공유될 수 있습니다 (/solveuser_id 필수).

수정안: Field(..., alias="userId", min_length=1) 또는 validator로 빈 문자열 422. 회귀 테스트 추가 권장.

Comment thread src/proovy_agent/common/sse/events.py Outdated
return None
return {
"event": name,
"id": f"{event.thread_id}:{event.seq}",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium · 정확성 · 추가 확인 필요] 내부 이벤트 drop 후에도 id가 내부 event.seq를 그대로 사용해 wire상 seq gap이 생깁니다. BFF가 Last-Event-ID의 seq로 gap/재전송을 보면 오탐 가능성이 있습니다.

수정안: 백엔드가 id를 무시한다면 계약에 명시. seq 기반이면 v2 전용 wire seq 카운터를 두세요.

@router.get("/health", summary="헬스체크 (백엔드 사전 체크)")
async def health() -> dict[str, str]:
"""백엔드가 SSE 시작 전 호출하는 헬스 엔드포인트."""
return {"status": "ok"}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium · 운영] 항상 ok만 반환해 checkpointer/DB/Daytona/그래프 준비 상태와 무관합니다. 백엔드 사전 체크가 readiness로 쓰이면 장애 파드로 트래픽이 갈 수 있습니다.

수정안: liveness 전용이면 docstring에 명시하거나, 최소 의존성 검사(readiness)를 추가/분리하세요.

)

cursor 리뷰 3건 반영:
- StreamInput.user_id 필수+min_length=1 — 빈 userId 시 체크포인트 키가
  ":{threadId}"로 수렴해 threadId를 아는 호출자끼리 멀티턴 상태가 공유되는 문제
  차단 (userId 누락·빈문자열 422 회귀 테스트 추가)
- to_stream_v2: SSE id 제거 — 내부 이벤트 drop으로 wire seq가 불연속이라
  소비자가 gap 오탐할 수 있음. 백엔드는 서버-투-서버 Flux로 Last-Event-ID
  미사용이라 id 불필요 (재연결/replay는 백엔드 durable store 관할)
- /health: liveness 전용임을 docstring 명시 — DB/Daytona/그래프 readiness는
  검사하지 않음(백엔드 checkProovyAiHealth는 연결 가능 여부만 봄)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@chowon442 chowon442 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/stream/v2 백엔드 계약 호환성 이슈 1건만 남깁니다.


# v1 미사용 — 수용만 (백엔드가 크레딧·인증·파일을 책임)
files_url: list[str] = Field(default_factory=list, alias="filesUrl")
chosen_features: list[str] = Field(default_factory=list, alias="chosenFeatures")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[High · 백엔드 계약 호환성] chosenFeatures: null 요청이 422로 거부됩니다. Proovy-server dev 기준 ConversationRequest.chosenFeatures는 optional이고, ChatServiceImpl에서 그대로 ProovyAiRequest.chosenFeatures에 넣습니다. Jackson 기본 null 직렬화라면 사용자가 기능을 선택하지 않은 일반 요청이 AI /stream/v2 경계에서 실패할 수 있습니다.

수정안: chosen_featureslist[str] | None으로 받거나 validator로 None -> [] 처리하고, 서버 payload 형태(chosenFeatures: null)의 회귀 테스트를 추가해 주세요.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반영했습니다 (359cc87) 🙏 — chosenFeatures뿐 아니라 filesUrl/agentConfig/streamTokens까지 mode="before" validator로 null → 기본값([], {}, True) 흡수하도록 했고, 백엔드 null 직렬화 페이로드(chosenFeatures: null 등) 회귀 테스트도 추가했습니다.

chowon442 리뷰 반영: 백엔드(Proovy-server)는 기능 미선택 시
chosenFeatures/filesUrl/agentConfig/streamTokens를 null로 직렬화하는데,
기존 타입(list/dict/bool)이 null을 거부해 일반 요청이 422로 깨질 수 있었다.
mode="before" validator로 None→기본값([], {}, True) 흡수. null 페이로드
회귀 테스트 추가.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@chowon442 chowon442 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

빠른 반영 감사합니다~ 🐇🐇

@haein45 haein45 merged commit a01322d into dev Jun 2, 2026
1 check passed
@coderabbitai coderabbitai Bot mentioned this pull request Jun 2, 2026
11 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feat] /stream/v2 백엔드 SSE 어댑터

2 participants