Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3dc8573
feat: surface response data when receiving an unexpected status code …
chengxilo Feb 22, 2026
3cd1bf5
refactor: remove nonGRPCDataCollectionState enum, use collecting(bool…
chengxilo Feb 23, 2026
95cf20b
Merge branch 'grpc:master' into surface-response-body
chengxilo Feb 26, 2026
038dd99
refactor: stop exporting constant nonGRPCDataMaxLen
chengxilo Feb 26, 2026
6dd9c4f
refactor: remove unnecessary timer
chengxilo Feb 26, 2026
4e270af
refactor: replace unnecessary mutex with atmoic.Bool
chengxilo Feb 26, 2026
b59203a
style: inline errMsg to simplify error message joining
chengxilo Feb 26, 2026
685f4ef
Revert "refactor: replace unnecessary mutex with atmoic.Bool"
chengxilo Feb 26, 2026
b6f3bf2
Merge branch 'master' into surface-response-body
chengxilo Mar 30, 2026
1effa8e
Merge branch 'master' into surface-response-body
chengxilo Mar 31, 2026
e60496c
doc: update stale comment
chengxilo Mar 31, 2026
8725f9f
fix: hold collectionMu in Status()
chengxilo Mar 31, 2026
3ae8eaf
Merge branch 'master' into surface-response-body
chengxilo Apr 2, 2026
a83d60e
Revert "fix: hold collectionMu in Status()"
chengxilo Apr 9, 2026
6e579f2
refactor: refactor the process to finalize the data collected from no…
chengxilo Apr 9, 2026
7895aef
doc: add comment for collectionMu
chengxilo Apr 9, 2026
e3d1434
refactor: introduce nonGRPCStatus field and inline finalization in cl…
chengxilo Apr 25, 2026
2069ce9
refactor: defer headerChan closure until stream closes for non-gRPC r…
chengxilo Apr 25, 2026
15e67d0
test: split headerChan test into non-gRPC response and malformed head…
chengxilo Apr 25, 2026
db6ea10
chore: change invalidHeaderField to invalidContentType for readability
chengxilo Apr 25, 2026
6afa8f8
refactor: inline closeNonGRPCStream and provide comment to clarify
chengxilo Apr 25, 2026
df12e44
docs: provide explanation for nonGRPCStatus finalize
chengxilo Apr 25, 2026
f5c04f2
typo: if is not -> if not
chengxilo Apr 25, 2026
44e4144
chore: nix this newline
chengxilo May 6, 2026
0d23730
style: avoid checking the code again in wantErr
chengxilo May 6, 2026
df306a0
style: improve the Errorf output
chengxilo May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion internal/transport/client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package transport

import (
"sync"
"sync/atomic"

"golang.org/x/net/http2"
Expand All @@ -28,6 +29,12 @@ import (
"google.golang.org/grpc/status"
)

// nonGRPCDataMaxLen is the maximum length of nonGRPCDataBuf.
//
// NOTE: If changed this value, you MUST update the corresponding test in:
// - /test/end2end_test.go:TestHTTPServerSendsNonGRPCHeaderSurfaceFurtherData
const nonGRPCDataMaxLen = 1024

// ClientStream implements streaming functionality for a gRPC client.
type ClientStream struct {
Stream // Embed for common stream functionality.
Expand All @@ -46,14 +53,51 @@ type ClientStream struct {
// headerValid indicates whether a valid header was received. Only
// meaningful after headerChan is closed (always call waitOnHeader() before
// reading its value).
headerValid bool
headerValid bool

collectionMu sync.Mutex // protects nonGRPCStatus and nonGRPCDataBuf during the non-gRPC data collection lifecycle.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may not need the mutex here. The three places where the nonGRPC fields are accessed are:

  1. operateHeaders
  2. handleData
  3. closeStream

Methods 1 and 2 are called serially by the goroutine running the controlbuf loop.

Method 3 reads the fields to overrides the arguments passed to it. This seems undesirable because it ignores the status passed by the caller.

The way to gracefully close streams in HTTP/2 is by using the "end of stream" (EOS) flag in the frame header. Since this flag is only present in HEADERS and DATA frames, we should have operateHeaders and handleData check the nonGRPCStatus, call closeStream with the resolved status, and remove the extra logic from closeStream. This approach should also eliminate the need for the mutex.

nonGRPCStatus *status.Status // the initial status from the non-gRPC response header, finalized with collected data before closing.
nonGRPCDataBuf []byte // stores the data of a non-gRPC response.

noHeaders bool // set if the client never received headers (set only after the stream is done).
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
statsHandler stats.Handler // nil for internal streams (e.g., health check, ORCA) where telemetry is not supported.
}

func (s *ClientStream) startNonGRPCDataCollection(st *status.Status) {
s.collectionMu.Lock()
defer s.collectionMu.Unlock()
if s.nonGRPCStatus != nil {
// If nonGRPCStatus is already set, it means the stream is already in
// the non-gRPC data collection lifecycle.
return
}
s.nonGRPCStatus = st
s.nonGRPCDataBuf = make([]byte, 0, nonGRPCDataMaxLen)
}

// tryHandleNonGRPCData tries to collect non-gRPC body from the given data frame.
// It returns two booleans:
// handle indicates whether the frame should be handled as a non-gRPC response body,
// end indicates whether the stream should be closed after handling this frame.
func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, end bool) {
s.collectionMu.Lock()
defer s.collectionMu.Unlock()
if s.nonGRPCStatus == nil {
// if not in the non-gRPC data collection lifecycle, do not handle this frame.
return false, false
}

n := min(f.data.Len(), nonGRPCDataMaxLen-len(s.nonGRPCDataBuf))
s.nonGRPCDataBuf = append(s.nonGRPCDataBuf, f.data.ReadOnlyData()[0:n]...)
if len(s.nonGRPCDataBuf) >= nonGRPCDataMaxLen || f.StreamEnded() {
return true, true
}
return true, false
}

// Read reads an n byte message from the input stream.
func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
b, err := s.Stream.read(n)
Expand Down
42 changes: 38 additions & 4 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,9 +944,21 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode
<-s.done
return
}
// status and trailers can be updated here without any synchronization because the stream goroutine will
// only read it after it sees an io.EOF error from read or write and we'll write those errors
// only after updating this.

// If the stream is in the non-gRPC data collection lifecycle, use the
// nonGRPCStatus and nonGRPCDataBuf to construct the final status and
// error to return to the user. This is to ensure that non-gRPC data
// collected is included in the final status message returned to the user.
s.collectionMu.Lock()
if s.nonGRPCStatus != nil {
data := "\ndata: " + strconv.Quote(string(s.nonGRPCDataBuf))
st = status.New(s.nonGRPCStatus.Code(), s.nonGRPCStatus.Message()+data)
err = st.Err()
// Clear the nonGRPCStatus to indicate the non-grpc data collection is done.
s.nonGRPCStatus = nil
}
s.collectionMu.Unlock()

s.status = st
if len(mdata) > 0 {
s.trailer = mdata
Expand Down Expand Up @@ -1224,6 +1236,23 @@ func (t *http2Client) handleData(f *parsedDataFrame) {
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
return
}

handle, end := s.tryHandleNonGRPCData(f)
if handle {
if w := s.fc.onRead(size); w > 0 {
t.controlBuf.put(&outgoingWindowUpdate{
streamID: s.id,
increment: w,
})
}
if end {
// Close the stream; closeStream will finalize the nonGRPCStatus and nonGRPCDataBuf,
// and provide them as err and st.
t.closeStream(s, nil, true, http2.ErrCodeProtocol, nil, nil, true)
}
return
}

dataLen := f.data.Len()
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
Expand Down Expand Up @@ -1568,7 +1597,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}

se := status.New(grpcErrorCode, strings.Join(errs, "; "))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
if endStream {
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, true)
return
}

s.startNonGRPCDataCollection(se)
return
}

Expand Down
74 changes: 66 additions & 8 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ const (
notifyCall
misbehaved
encodingRequiredStatus
invalidHeaderField
invalidContentType
malformedHeader
delayRead
pingpong
)
Expand Down Expand Up @@ -220,7 +221,7 @@ func (h *testStreamHandler) handleStreamEncodingRequiredStatus(s *ServerStream)
s.Read(math.MaxInt)
}

func (h *testStreamHandler) handleStreamInvalidHeaderField(s *ServerStream) {
func (h *testStreamHandler) handleStreamInvalidContentType(s *ServerStream) {
headerFields := []hpack.HeaderField{}
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: expectedInvalidHeaderField})
h.t.controlBuf.put(&headerFrame{
Expand All @@ -230,6 +231,19 @@ func (h *testStreamHandler) handleStreamInvalidHeaderField(s *ServerStream) {
})
}

func (h *testStreamHandler) handleStreamMalformedHeader(s *ServerStream) {
headerFields := []hpack.HeaderField{
{Name: ":status", Value: "200"},
{Name: "content-type", Value: "application/grpc"},
{Name: "x-bad-bin", Value: "!!!invalid-base64!!!"},
}
h.t.controlBuf.put(&headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
})
}

// handleStreamDelayRead delays reads so that the other side has to halt on
// stream-level flow control.
// This handler assumes dynamic flow control is turned off and assumes window
Expand Down Expand Up @@ -425,12 +439,23 @@ func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hT
})
wg.Done()
}()
case invalidHeaderField:
case invalidContentType:
go func() {
transport.HandleStreams(ctx, func(s *ServerStream) {
wg.Add(1)
go func() {
h.handleStreamInvalidHeaderField(s)
h.handleStreamInvalidContentType(s)
wg.Done()
}()
})
wg.Done()
}()
case malformedHeader:
go func() {
transport.HandleStreams(ctx, func(s *ServerStream) {
wg.Add(1)
go func() {
h.handleStreamMalformedHeader(s)
wg.Done()
}()
})
Expand Down Expand Up @@ -1638,8 +1663,8 @@ func (s) TestEncodingRequiredStatus(t *testing.T) {
s.Read(math.MaxInt)
}

func (s) TestInvalidHeaderField(t *testing.T) {
server, ct, cancel := setUp(t, 0, invalidHeaderField)
func (s) TestInvalidContentType(t *testing.T) {
server, ct, cancel := setUp(t, 0, invalidContentType)
defer cancel()
callHdr := &CallHdr{
Host: "localhost",
Expand All @@ -1660,8 +1685,32 @@ func (s) TestInvalidHeaderField(t *testing.T) {
server.stop()
}

func (s) TestHeaderChanClosedAfterReceivingNonGRPCResponse(t *testing.T) {
server, ct, cancel := setUp(t, 0, invalidContentType)
defer cancel()
defer server.stop()
defer ct.Close(fmt.Errorf("closed manually by test"))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
s, err := ct.NewStream(ctx, &CallHdr{Host: "localhost", Method: "foo"}, nil)
if err != nil {
t.Fatalf("failed to create the stream")
}
// The server sends a non-gRPC response without ending the stream, so the
// stream enters data collection mode. headerChan is not closed until the
// stream itself closes.
if _, err := s.Header(); err == nil {
t.Fatalf("Header() succeeded, want error")
}
select {
case <-s.headerChan:
default:
t.Errorf("s.headerChan: got open, want closed")
}
}

func (s) TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) {
server, ct, cancel := setUp(t, 0, invalidHeaderField)
server, ct, cancel := setUp(t, 0, malformedHeader)
defer cancel()
defer server.stop()
defer ct.Close(fmt.Errorf("closed manually by test"))
Expand Down Expand Up @@ -2685,6 +2734,7 @@ func (s) TestClientDecodeHeader(t *testing.T) {
name string
metaHeaderFrame *http2.MetaHeadersFrame
wantStatus *status.Status
isNonGRPCStatus bool
}{
{
name: "valid_header",
Expand All @@ -2708,6 +2758,7 @@ func (s) TestClientDecodeHeader(t *testing.T) {
codes.Unknown,
"unexpected HTTP status code received from server: 200 (OK); malformed header: missing HTTP content-type",
),
isNonGRPCStatus: true,
},
{
name: "invalid_grpc_status",
Expand All @@ -2734,6 +2785,7 @@ func (s) TestClientDecodeHeader(t *testing.T) {
codes.Internal,
"malformed header: missing HTTP status; transport: received unexpected content-type \"application/json\"",
),
isNonGRPCStatus: true,
},
{
name: "invalid_content_type_with_http_status_504",
Expand All @@ -2747,6 +2799,7 @@ func (s) TestClientDecodeHeader(t *testing.T) {
codes.Unavailable,
"unexpected HTTP status code received from server: 504 (Gateway Timeout); transport: received unexpected content-type \"application/json\"",
),
isNonGRPCStatus: true,
},
{
name: "http_fallback_and_invalid_http_status",
Expand Down Expand Up @@ -2803,7 +2856,12 @@ func (s) TestClientDecodeHeader(t *testing.T) {
}

s.operateHeaders(tc.metaHeaderFrame)
got := cs.status
var got *status.Status
if tc.isNonGRPCStatus {
got = cs.nonGRPCStatus
} else {
got = cs.status
}
want := tc.wantStatus
if got.Code() != want.Code() || got.Message() != want.Message() {
t.Errorf("operateHeaders(%v) got status %q, want %q", tc.metaHeaderFrame, got, want)
Expand Down
Loading
Loading