client: use singleflight to reduce request press#10632
client: use singleflight to reduce request press#10632bufferflies wants to merge 3 commits intotikv:masterfrom
Conversation
Signed-off-by: tongjian <1045931706@qq.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller
participant SD as ServiceDiscovery
participant SF as singleflight.Group
participant PD as PD gRPC
Caller->>SD: Request GetClusterInfo (concurrent)
SD->>SF: Do("GetClusterInfo:<url>", fn)
alt First caller
SF->>PD: Invoke GetClusterInfo RPC
PD-->>SF: Response
SF-->>SD: result (shared)
SD-->>Caller: Parsed response
else Concurrent callers
SF-->>SD: Wait for shared result
SD-->>Caller: Parsed response (from shared result)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@client/servicediscovery/service_discovery.go`:
- Around line 448-449: singleflight usage currently collapses calls by verb only
(flight.Do("GetClusterInfo") / flight.Do("GetMembers")), causing result leaks
across PD URLs and losing caller deadlines; update the calls in getMembers and
GetClusterInfo to include the target URL (e.g., key := fmt.Sprintf("%s:%s",
verb, url)) or better, replace Do with DoChan and wait on both the DoChan result
and the caller's ctx.Done() so each caller preserves its own deadline and only
receives results for the requested URL; make these changes around the methods
getMembers, GetClusterInfo and their callers that loop over c.GetServiceURLs(),
and apply the same pattern at the other locations noted in the comment.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2f7a0e2f-a5a7-4a42-8a41-e9e4781188ab
⛔ Files ignored due to path filters (1)
client/go.sumis excluded by!**/*.sum
📒 Files selected for processing (2)
client/go.modclient/servicediscovery/service_discovery.go
|
/retest |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #10632 +/- ##
==========================================
+ Coverage 78.96% 79.02% +0.05%
==========================================
Files 532 532
Lines 71883 71969 +86
==========================================
+ Hits 56766 56875 +109
+ Misses 11093 11087 -6
+ Partials 4024 4007 -17
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
| start := time.Now() | ||
| defer func() { metrics.InternalCmdDurationGetMembers.Observe(time.Since(start).Seconds()) }() | ||
| members, err := pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{}) | ||
| res, err, _ := c.flight.Do("GetMembers", func() (interface{}, error) { |
There was a problem hiding this comment.
Same issue here: please include the target URL in the singleflight key. Otherwise a GetMembers call for one PD endpoint can share the response or error from another endpoint.
There was a problem hiding this comment.
done, add url into the key
Signed-off-by: tongjian <1045931706@qq.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
♻️ Duplicate comments (1)
client/servicediscovery/service_discovery.go (1)
926-928:⚠️ Potential issue | 🟠 MajorPreserve each caller’s timeout while deduplicating requests.
Line 926 and Line 952 use
singleflight.Group.Do, so duplicate callers block until the first call finishes and cannot honor their ownctx.Done()while waiting. This can exceed caller timeout budgets during contention.Suggested fix (keep constant keys, switch to DoChan + caller-context select)
@@ - res, err, _ := c.flight.Do("GetClusterInfo", func() (any, error) { - return pdpb.NewPDClient(cc).GetClusterInfo(ctx, &pdpb.GetClusterInfoRequest{}) - }) - if err != nil { + ch := c.flight.DoChan("GetClusterInfo", func() (any, error) { + return pdpb.NewPDClient(cc).GetClusterInfo(ctx, &pdpb.GetClusterInfoRequest{}) + }) + var r singleflight.Result + select { + case <-ctx.Done(): + metrics.InternalCmdFailedDurationGetClusterInfo.Observe(time.Since(start).Seconds()) + attachErr := errors.Errorf("error:%s target:%s status:%s", ctx.Err(), cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetClusterInfo.Wrap(attachErr).GenWithStackByCause() + case r = <-ch: + } + if r.Err != nil { metrics.InternalCmdFailedDurationGetClusterInfo.Observe(time.Since(start).Seconds()) - attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String()) + attachErr := errors.Errorf("error:%s target:%s status:%s", r.Err, cc.Target(), cc.GetState().String()) return nil, errs.ErrClientGetClusterInfo.Wrap(attachErr).GenWithStackByCause() } - clusterInfo := res.(*pdpb.GetClusterInfoResponse) + clusterInfo := r.Val.(*pdpb.GetClusterInfoResponse) @@ - res, err, _ := c.flight.Do("GetMembers", func() (any, error) { - return pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{}) - }) - if err != nil { + ch := c.flight.DoChan("GetMembers", func() (any, error) { + return pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{}) + }) + var r singleflight.Result + select { + case <-ctx.Done(): + metrics.InternalCmdFailedDurationGetMembers.Observe(time.Since(start).Seconds()) + attachErr := errors.Errorf("error:%s target:%s status:%s", ctx.Err(), cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() + case r = <-ch: + } + if r.Err != nil { metrics.InternalCmdFailedDurationGetMembers.Observe(time.Since(start).Seconds()) - attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String()) + attachErr := errors.Errorf("error:%s target:%s status:%s", r.Err, cc.Target(), cc.GetState().String()) return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() } - members := res.(*pdpb.GetMembersResponse) + members := r.Val.(*pdpb.GetMembersResponse)In golang.org/x/sync/singleflight (v0.19.0), does Group.Do allow duplicate callers to stop waiting when their context is canceled, or is DoChan+select on ctx.Done() required to preserve per-caller deadlines?As per coding guidelines, "Use context-aware timeouts and backoff for retries" and "Prevent goroutine leaks: pair with cancellation; consider errgroup".
Also applies to: 952-954
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@client/servicediscovery/service_discovery.go` around lines 926 - 928, The current use of c.flight.Do(...) (e.g., the GetClusterInfo call that wraps pdpb.NewPDClient(cc).GetClusterInfo) blocks duplicate callers until the first finishes and prevents honoring each caller’s ctx.Done(); replace Group.Do with Group.DoChan and, after calling c.flight.DoChan("GetClusterInfo", func() (any, error) { ... }), select between the result channel and the caller's ctx.Done() so a canceled/deadlined caller returns promptly, and apply the same pattern to the other occurrence (the call at the other duplicated key around lines 952–954); ensure the key strings remain constant and that you cancel/ignore the result if ctx.Done() fires to avoid goroutine/resource leaks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@client/servicediscovery/service_discovery.go`:
- Around line 926-928: The current use of c.flight.Do(...) (e.g., the
GetClusterInfo call that wraps pdpb.NewPDClient(cc).GetClusterInfo) blocks
duplicate callers until the first finishes and prevents honoring each caller’s
ctx.Done(); replace Group.Do with Group.DoChan and, after calling
c.flight.DoChan("GetClusterInfo", func() (any, error) { ... }), select between
the result channel and the caller's ctx.Done() so a canceled/deadlined caller
returns promptly, and apply the same pattern to the other occurrence (the call
at the other duplicated key around lines 952–954); ensure the key strings remain
constant and that you cancel/ignore the result if ctx.Done() fires to avoid
goroutine/resource leaks.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: db1b73ef-f8b5-49de-9e6e-995d4013ed3f
📒 Files selected for processing (1)
client/servicediscovery/service_discovery.go
Signed-off-by: tongjian <1045931706@qq.com>
|
@bufferflies: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
| defer func() { metrics.InternalCmdDurationGetClusterInfo.Observe(time.Since(start).Seconds()) }() | ||
| clusterInfo, err := pdpb.NewPDClient(cc).GetClusterInfo(ctx, &pdpb.GetClusterInfoRequest{}) | ||
| key := "GetClusterInfo-" + url | ||
| res, err, _ := c.flight.Do(key, func() (any, error) { |
There was a problem hiding this comment.
singleflight.Do still makes later callers wait for the first same-URL RPC, so their own timeout/cancel may not be respected.
Could we use DoChan + select ctx.Done() to preserve caller deadline semantics?
What problem does this PR solve?
Issue Number: Close #10633
What is changed and how does it work?
Check List
Tests
Code changes
Side effects
Related changes
pingcap/docs/pingcap/docs-cn:pingcap/tiup:Release note
Summary by CodeRabbit
Bug Fixes
Chores