Skip to content

grpc: support per-message compression and enforce embedding in ServerTransportStream implementations#8972

Open
Dostonlv wants to merge 11 commits into
grpc:masterfrom
Dostonlv:master
Open

grpc: support per-message compression and enforce embedding in ServerTransportStream implementations#8972
Dostonlv wants to merge 11 commits into
grpc:masterfrom
Dostonlv:master

Conversation

@Dostonlv
Copy link
Copy Markdown

@Dostonlv Dostonlv commented Mar 13, 2026

Fixes #8662

RELEASE NOTES:

  • grpc: add support for configuring per-message compression using the new SetServerStreamMessageCompression and SetClientStreamMessageCompression functions.
  • grpc: add an internal method to the experimental ServerTransportStream interface to force implementors to embed a delegate implementation. This helps gRPC add new methods to the interface without breaking builds for users of this interface.

@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 13, 2026

Codecov Report

❌ Patch coverage is 91.48936% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.09%. Comparing base (8360b4c) to head (04e6bdc).
⚠️ Report is 81 commits behind head on master.

Files with missing lines Patch % Lines
stream.go 90.00% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8972      +/-   ##
==========================================
- Coverage   83.23%   83.09%   -0.15%     
==========================================
  Files         410      413       +3     
  Lines       32572    33514     +942     
==========================================
+ Hits        27111    27848     +737     
- Misses       4066     4239     +173     
- Partials     1395     1427      +32     
Files with missing lines Coverage Δ
internal/internal.go 60.00% <ø> (ø)
internal/transport/handler_server.go 93.85% <100.00%> (+0.02%) ⬆️
internal/transport/http2_server.go 90.52% <100.00%> (-0.67%) ⬇️
internal/transport/server_stream.go 100.00% <100.00%> (+3.38%) ⬆️
server.go 82.73% <100.00%> (+0.58%) ⬆️
stream.go 82.15% <90.00%> (+0.73%) ⬆️

... and 74 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@easwars easwars self-assigned this Mar 16, 2026
@easwars easwars added Type: API Change Breaking API changes (experimental APIs only!) Area: RPC Features Includes Compression, Encoding, Attributes/Metadata, Interceptors. labels Mar 16, 2026
@easwars easwars added this to the 1.81 Release milestone Mar 16, 2026
Comment thread stream.go Outdated
Comment thread stream.go Outdated
Comment thread stream.go Outdated
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetMessageCompression(ctx context.Context, enable bool) error {
opts, ok := ctx.Value(compressKey{}).(*compressOptions)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few things here that I'm not convinced about:

  • We are adding the compressKey for every stream context whether or not condition compression enabling/disabling will be used or not. I'm not sure how much of a performance overhead this would be. @arjan-bal : Do you have any thoughts?
  • We are modifying a value stored in a context here, which breaks the guarantee that contexts are immutable in Go. I agree that we do have this pattern in a few places already in our codebase, but if we can avoid it, that would be great. One option is to avoid it would be not store a pointer to compressOptions, but instead store it by value, and change this function to return a new context that contains compressKey with the given value. This would mean that the caller on the client and the server would have to use the newly returned context going forward. @dfawley : Do you have any objections to change this function to return a context instead?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I thought about this approach too, but returning a new context doesn't work for streams.
The reason is that serverStream.SendMsg reads from ss.ctx — the context that was captured when the stream was created. Even if SetMessageCompression returns a new context, ss.ctx inside the stream object doesn't change, so SendMsg would never pick it up. The same issue applies to clientStream and addrConnStream.
Since there's no way to update the stream's internal context from the outside, mutating the pointer was the approach I went with. That said, I'm open to other ideas if there's a cleaner way to handle this — happy to adjust if you have something in mind.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned, I wanted to share my thinking on the compressKey overhead. Since SetMessageCompression is already a no-op without a compressor, I considered adding it only when a compressor is actually set. But the tricky part is when SetSendCompressor is called first — the compressor isn't known at stream creation time. So I kept it as is for now. Happy to explore this further if you think it's worth optimizing.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thought — to address the mutability concern, I'm planning to
switch from plain bool to atomic.Bool in compressOptions. Would that work?

Copy link
Copy Markdown
Contributor

@arjan-bal arjan-bal Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presently we have ~24 allocs/op for streaming RPCs and ~140 allocs/op for unary RPCs for client and server side combined. Even a single extra heap allocation does show up as a <1% QPS drop in our benchmarks when sending very small messages. Here is how to run the benchmarks for unary RPCs:

$ git checkout master
$ RUN_NAME=unary-before
$ go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \
   -compression=off -maxConcurrentCalls=200 -trace=off \
   -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}"

$ git checkout feature-branch
$ RUN_NAME=unary-after
$ go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \
   -compression=off -maxConcurrentCalls=200 -trace=off \
   -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}"

# Compare the results
$ go run benchmark/benchresult/main.go unary-before unary-after

We should ensure that users not utilizing the new SetMessageCompression API avoid the extra allocation. Could we treat the absence of compressKey{} as "compression enabled" to match current behavior? This should should avoid heap allocations for existing use-cases.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the benchmarks and the results confirm your concern:

  • Allocs/op: 143 → 147 (+2.79%)
  • 99th percentile latency: 2.636ms → 3.051ms (+15.74%)

I'll fix this by treating the absence of compressKey as
"compression enabled", so users not using SetMessageCompression
avoid the extra allocation entirely.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the implementation to address the allocation concern.

Server side: moved doNotCompress into transport.ServerStream directly, no context allocation needed.
Client side: compressKey is added to context only when a compressor is set, so users not using SetMessageCompression get zero extra allocations.

Benchmark results (unary, compression=off):

           Title       Before        After Percentage
        TotalOps      8318091      8070904    -2.97%
        Bytes/op      9927.29      9883.28    -0.44%
       Allocs/op       143.22       143.03     0.00%
        50th-Lat   1.210583ms   1.255084ms     3.68%
        90th-Lat   2.083459ms   2.034416ms    -2.35%
        99th-Lat   4.868417ms   4.718292ms    -3.08%
         Avg-Lat   1.440822ms   1.483993ms     3.00%

Benchmark results (unary, compression=gzip):

           Title       Before        After Percentage
        TotalOps      2591595      2578328    -0.51%
        Bytes/op     14332.39     14312.50    -0.14%
       Allocs/op       183.36       185.21    +1.09%
        50th-Lat   4.095041ms     4.1095ms     0.35%
        90th-Lat   6.164916ms   6.204125ms     0.64%
        99th-Lat  15.452875ms   15.49425ms     0.27%
         Avg-Lat   4.624957ms   4.648262ms     0.50%

The +2 allocs in gzip case is expected since compressKey is only added when a compressor is set.

@arjan-bal @easwars
This is one of my first contributions to the project, so any guidance or feedback would be greatly appreciated.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for optimizing this. The results seem good.

Comment thread test/compressor_test.go Outdated
Comment thread test/compressor_test.go Outdated
Comment thread test/compressor_test.go Outdated
Comment thread test/compressor_test.go Outdated
Comment thread test/compressor_test.go Outdated
Comment thread stream.go Outdated
Comment thread stream.go Outdated
@arjan-bal arjan-bal assigned arjan-bal and unassigned easwars Apr 7, 2026
Comment thread internal/transport/server_stream.go Outdated
Comment thread stream.go Outdated
Comment thread internal/transport/server_stream.go Outdated
Comment thread stream.go Outdated
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetMessageCompression(ctx context.Context, enable bool) error {
opts, ok := ctx.Value(compressKey{}).(*compressOptions)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for optimizing this. The results seem good.

@Dostonlv
Copy link
Copy Markdown
Author

Dostonlv commented Apr 7, 2026

@arjan-bal Assalamu alaykum. Updated the PR based on the feedback. Would appreciate another review

@arjan-bal arjan-bal self-assigned this Apr 9, 2026
@arjan-bal arjan-bal changed the title stream: respect DoNotCompress option in serverStream and addrConnStream grpc: support per-message compression and enforce embedding in ServerTransportStream implementations Apr 10, 2026
arjan-bal
arjan-bal previously approved these changes Apr 10, 2026
Copy link
Copy Markdown
Contributor

@arjan-bal arjan-bal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just one minor comment to address. I'm also adding a second reviewer.

Comment thread stream.go Outdated
@arjan-bal arjan-bal assigned easwars and Dostonlv and unassigned Dostonlv Apr 10, 2026
Comment thread stream.go Outdated
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetClientStreamMessageCompression(ctx context.Context, enable bool) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this be used on the client-side? The reason I'm asking is because I want to see if we can avoid mutating the context. This is an anti-pattern that we already have enough occurrences of and it would be nice if we can avoid adding more.

What I want to see if whether having a function like:

func NewClientStreamWithCompression(cs ClientStream, enable bool) ClientSteam {
  // Create a new context with the compression enabled flag set to the value of `enable`
  // Wrap the passed in stream with the new one, with all methods except Context being a passthrough
  // The Context() method will return the newly created context that has the correct value of `enable`
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm coming back to how this will be used. I'm wondering if an approach similar to the one on the server side can be used, where this function will accept a ClientStream instead of a context. The implementation will then propagate the value of the enable flag down to the client stream in the transport whenever appropriate (the clientStream struct in the grpc package has a reference to a csAttempt which has a ref to the underlying ClientStream in the transport).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arjan-bal

I still want to explore the possibility of not having to mutate the context on the client side.

One option is to mirror what we have on the server side. So:

  • Introduce a new ClientTransportStream interface
  • Have functions to set and get the above interface from the stream context
  • Have a method on this interface to enable/disable compression. This method would be implemented by the clientStream type in the grpc package. The implementation would reach into the attempt and grab the transport stream from there and call a corresponding method to enable/disable compression on the client transport stream

Do you have other options?

If you think doing that in a separate PR and keeping this PR only for server-side changes would be better, I'm fine with that too. I'm good with the current server-side changes in this PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud.

Wrapping the ClientStream (as mentioned in the initial comment) may not work. Even if the application uses the wrapper, gRPC's internals would continue using the inner stream, completely bypassing the overridden methods.

Technically speaking, we're currently mutating a value in the context for both the server and the client. On the server side, we mutate ctx -> ServerStream -> enableCompression, while on the client side, we set ctx -> compressKey directly. The ServerTransportStream interface was added in PR #1904 specifically to abstract different transport implementations. Exposing a new API on the client side would increase our public API surface without actually solving the mutable state-in-context problem.

Our core issue stems from the fact that the streaming interfaces are stable. The ideal solution would allow additional parameters to be set while sending messages. Similar to how DialOptions apply to a channel and CallOptions apply to an RPC, we could introduce MessageOptions that apply to individual messages being read or written.

We could have introduced an unstable extension interface with additional methods, for example:

type ExperimentalStream interface {
    grpc.ServerStream // or ClientStream
    SendWithParams(m interface{}, opts ...MessageOption) error
}

However, since we don't enforce that users embed the existing stream within their custom interceptors, the delegation chain can easily break. If an interceptor wraps the stream without embedding it, the application won't be able to upcast the stream to ExperimentalStream, making the experimental methods unreachable.

Without breaking backward compatibility, the only path forward is to piggyback on an existing parameter. If SendMsg took a struct, we could simply add a new field. Since it doesn't, the only existing avenue to configure additional parameters is the stream's Context().

We could have the context return an ExperimentalStream, but that would allow the application to send messages directly to the underlying transport, entirely bypassing the interceptor chain.

Given all this, it seems we are cornered into needing a setter of some kind on the context. Alternatively, we could invert the control flow and introduce a call option that provides a getter for the compression flag. The user could pass this getter as a call option, and gRPC would invoke it right before sending each message. This appears to solve the mutable state problem.

Comment thread internal/transport/server_stream.go
@easwars
Copy link
Copy Markdown
Contributor

easwars commented Apr 15, 2026

@Dostonlv : Apologies for the delay in getting back.

@github-actions
Copy link
Copy Markdown

This PR is labeled as requiring an update from the reporter, and no update has been received after 6 days. If no update is provided in the next 7 days, this issue will be automatically closed.

@github-actions github-actions Bot added stale and removed stale labels Apr 21, 2026
Comment thread internal/transport/server_stream.go Outdated
Comment on lines +62 to +64
// SetEnableCompression sets whether per-message compression is enabled for
// subsequent messages sent on this stream.
func (s *ServerStream) SetEnableCompression(v bool) { s.enableCompression = v }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Let's name this EnableCompression instead.

Comment thread stream.go Outdated
// server handler. Compression is enabled by default and is a no-op if no
// compressor is configured on the stream (e.g. via SetSendCompressor).
//
// This method must not be called concurrently with SendMsg.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead have the enableCompression field in transport.ServerStream be an atomic.Bool and remove this restriction that this method not be called concurrently with SendMsg?

I don't expect people to invoke this method repeatedly to cause any performance concerns. Therefore, simplifying the API seems like a better win.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is advisable. If a call to SendMessage is in flight and someone toggles compression during that call, we don't want to non-deterministically have compression for that message. I think it's better to leave the messaging about not calling it concurrently with SendMessage, and it's even better without the atomic so that tsan can detect the race for you.

Sorry for the conflicting advice.

Comment thread server.go
Comment on lines +1473 to +1476
compV0, compV1 := cp, comp
if !stream.IsCompressionEnabled() {
compV0, compV1 = nil, nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is executed before the server handler is invoked. This would mean that stream.IsCompressionEnabled would always return true. Am I missing something here? Do we really need this check here?

And if we really need this check, why is it missing for the streaming case?

Comment thread encoding/compressor_test.go Outdated
Comment on lines +322 to +323
t.Fatalf("After call 1 (compression enabled): got compress=%d decompress=%d, want compress=1 decompress=1",
sh.compress.Load(), sh.decompress.Load())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Here and elsewhere in this test, please have these calls to t.Fatal be on a single line. Thanks.

Comment thread encoding/compressor_test.go Outdated
Comment on lines +384 to +387
backendConn, err := grpc.NewClient(
backend.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread encoding/compressor_test.go Outdated
Comment on lines +393 to +395
clientStream, err := testgrpc.NewTestServiceClient(backendConn).FullDuplexCall(
serverStream.Context(), grpc.UseCompressor("gzip"),
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too

@arjan-bal arjan-bal removed their assignment May 11, 2026
@arjan-bal arjan-bal self-requested a review May 11, 2026 06:36
@arjan-bal arjan-bal dismissed their stale review May 11, 2026 06:37

Having more discussions on the approach.

@Dostonlv
Copy link
Copy Markdown
Author

Dostonlv commented May 12, 2026

Apologies for the delay in getting back to this. @arjan-bal @easwars

Addressed the review feedback:

  • Renamed SetEnableCompressionEnableCompression on transport.ServerStream
  • Made enableCompression an atomic.Bool in transport.ServerStream, removing the "must not be called concurrently with SendMsg" restriction
  • Fixed SetClientStreamMessageCompression to accept a ClientStream instead of a context.Context, avoiding context mutation. Internally, *clientStream is stored in the stream context at creation time (only when a compressor is configured) and retrieved via cs.Context().Value(clientStreamKey{}) — this mirrors the server-side pattern and works transparently with generated stream wrappers since Context() is a promoted method through embedding.

Benchmark results (unary, compression=off):

       Title       Before        After Percentage
    TotalOps     9882132     10352477     +4.76%
    Bytes/op      9919.44      9908.54     -0.11%
   Allocs/op       143.20       143.02      0.00%
    50th-Lat   1.120875ms   1.089625ms     -2.79%
    90th-Lat   1.641583ms    1.52775ms     -6.93%
    99th-Lat   2.592333ms   2.334041ms     -9.96%
     Avg-Lat   1.212849ms   1.157112ms     -4.60%

Benchmark results (unary, compression=gzip):

       Title       Before        After Percentage
    TotalOps     2952080      2947236      -0.16%
    Bytes/op    14216.16     14232.90      +0.11%
   Allocs/op      183.33       184.19      +0.55%
    50th-Lat   3.955625ms   3.984167ms     +0.72%
    90th-Lat   5.338583ms   5.374125ms     +0.67%
    99th-Lat   8.599667ms   8.116083ms     -5.62%
     Avg-Lat   4.062617ms    4.06868ms     +0.15%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area: RPC Features Includes Compression, Encoding, Attributes/Metadata, Interceptors. Status: Requires Reporter Clarification Type: API Change Breaking API changes (experimental APIs only!)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Conditional compression

5 participants