Skip to content

Commit bbdc169

Browse files
committed
feat: Add WebSocket, gRPC support and fix single backend buffering
- Change default retry policy to fail-fast for better single backend performance - Add tryBackendDirect for zero-copy streaming without response buffering - Add WebSocket support with Hijacker interface implementation - Add gRPC support with h2c (HTTP/2 cleartext) handler - Add request kind detection for websocket and grpc in metrics - Add Flusher interface to responseWriterWrapper for streaming
1 parent 664ab07 commit bbdc169

File tree

2 files changed

+132
-12
lines changed

2 files changed

+132
-12
lines changed

examples/proxies.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,17 @@ services:
110110
- url: "http://localhost:8545"
111111
strip_path: false
112112
strip_query: false
113+
114+
# gRPC test service - Pocket Network Beta testnet
115+
- name: grpc_test
116+
backends:
117+
- url: "https://grpc-seed-one-beta.infra.pocket.network"
118+
strip_path: false
119+
strip_query: false
120+
121+
# WebSocket test service - Pocket Network Beta testnet RPC
122+
- name: ws_test
123+
backends:
124+
- url: "https://rpc-seed-one-beta.infra.pocket.network"
125+
strip_path: false
126+
strip_query: false

main.go

Lines changed: 118 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"bufio"
45
"context"
56
"encoding/json"
67
"errors"
@@ -34,6 +35,7 @@ import (
3435
"github.com/redis/go-redis/v9"
3536
"github.com/robfig/cron/v3"
3637
"golang.org/x/net/http2"
38+
"golang.org/x/net/http2/h2c"
3739
"gopkg.in/yaml.v3"
3840
)
3941

@@ -493,19 +495,43 @@ func normalizeBackendLabel(backendURL string) string {
493495
return hostname
494496
}
495497

496-
// getRequestKind determines the kind of request based on the URL path
497-
// Returns: "health", "ready", "metrics", or "rpc"
498-
func getRequestKind(path string) string {
499-
switch path {
498+
// getRequestKind determines the kind of request based on the URL path and headers
499+
// Returns: "health", "ready", "metrics", "websocket", "grpc", or "rpc"
500+
func getRequestKind(r *http.Request) string {
501+
// Check path-based kinds first
502+
switch r.URL.Path {
500503
case "/health", "/healthz":
501504
return "health"
502505
case "/ready", "/readyz":
503506
return "ready"
504507
case "/metrics":
505508
return "metrics"
506-
default:
507-
return "rpc"
508509
}
510+
511+
// Check for WebSocket upgrade
512+
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
513+
return "websocket"
514+
}
515+
516+
// Check for gRPC
517+
contentType := r.Header.Get("Content-Type")
518+
if strings.HasPrefix(contentType, "application/grpc") {
519+
return "grpc"
520+
}
521+
522+
return "rpc"
523+
}
524+
525+
// isStreamingRequest checks if the request is a streaming type (websocket or grpc)
526+
// These requests should bypass response buffering and retry logic
527+
func isStreamingRequest(r *http.Request) bool {
528+
// WebSocket upgrade
529+
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
530+
return true
531+
}
532+
533+
// gRPC (uses HTTP/2 streaming)
534+
return strings.HasPrefix(r.Header.Get("Content-Type"), "application/grpc")
509535
}
510536

511537
// boolToString converts a boolean to "true" or "false" string for metric labels
@@ -1248,10 +1274,10 @@ func (s *ProxyService) HandleProxy(w http.ResponseWriter, r *http.Request) {
12481274
}
12491275
}
12501276

1251-
// Check retry policy from the header (default: retry-all)
1277+
// Check retry policy from the header (default: fail-fast)
12521278
retryPolicy := r.Header.Get("Retry-Policy")
12531279
if retryPolicy == "" {
1254-
retryPolicy = "retry-all"
1280+
retryPolicy = "fail-fast"
12551281
} else {
12561282
retryPolicy = strings.ToLower(strings.TrimSpace(retryPolicy))
12571283
}
@@ -1265,6 +1291,15 @@ func (s *ProxyService) HandleProxy(w http.ResponseWriter, r *http.Request) {
12651291
return
12661292
}
12671293

1294+
// Streaming requests (WebSocket, gRPC) must use direct proxy - no buffering, no retry
1295+
// This ensures proper HTTP upgrade handling and streaming semantics
1296+
if isStreamingRequest(r) {
1297+
rule := lb.Next()
1298+
s.tryBackendDirect(w, r, subdomain, rule, start, host, clientIP)
1299+
return
1300+
}
1301+
1302+
// Retry-all with multiple backends: use buffered retry logic
12681303
if retryPolicy == "retry-all" && len(backends) > 1 {
12691304
triedURLs := make(map[string]bool) // Track which backends we've tried
12701305
attemptCount := 0
@@ -1309,9 +1344,10 @@ func (s *ProxyService) HandleProxy(w http.ResponseWriter, r *http.Request) {
13091344
return
13101345
}
13111346

1312-
// Default: fail-fast - use single backend via round-robin
1347+
// Default: fail-fast or single backend - use direct proxy (no buffering)
1348+
// This provides true zero-copy streaming for optimal performance
13131349
rule := lb.Next()
1314-
s.tryBackend(w, r, subdomain, rule, start, host, clientIP, true)
1350+
s.tryBackendDirect(w, r, subdomain, rule, start, host, clientIP)
13151351
}
13161352

13171353
// tryBackend attempts to proxy to a single backend
@@ -1432,6 +1468,56 @@ func (s *ProxyService) tryBackend(w http.ResponseWriter, r *http.Request, subdom
14321468
return success, rec.Code, true
14331469
}
14341470

1471+
// tryBackendDirect proxies to a backend without response buffering
1472+
// This is used for streaming requests (WebSocket, gRPC), fail-fast policy, and single backend scenarios
1473+
// It provides true zero-copy streaming and supports HTTP upgrades
1474+
func (s *ProxyService) tryBackendDirect(w http.ResponseWriter, r *http.Request, subdomain string, rule ProxyRule, start time.Time, host string, clientIP string) {
1475+
// Parse backend URL
1476+
targetURL, err := url.Parse(rule.ProxyTo)
1477+
if err != nil {
1478+
log.Printf("ERROR: Invalid proxy_to URL for subdomain '%s': %v", subdomain, err)
1479+
http.Error(w, "Internal Server Error: Invalid backend URL", http.StatusInternalServerError)
1480+
return
1481+
}
1482+
1483+
backend := targetURL.Host
1484+
1485+
// Store all metadata in the request context
1486+
ctx := context.WithValue(r.Context(), proxyMetadataField, proxyMetadata{
1487+
subdomain: subdomain,
1488+
startTime: start,
1489+
rule: rule,
1490+
targetURL: targetURL,
1491+
backend: backend,
1492+
scheme: targetURL.Scheme,
1493+
host: host,
1494+
clientIP: clientIP,
1495+
originalPath: r.URL.Path,
1496+
originalQuery: r.URL.RawQuery,
1497+
})
1498+
r = r.WithContext(ctx)
1499+
1500+
// Update metricsContext with backend info
1501+
if mctx, ok := r.Context().Value(metricsContextKey).(*metricsContext); ok {
1502+
mctx.backend = backend
1503+
}
1504+
1505+
// Get or create a cached proxy for this backend
1506+
cacheKey := targetURL.Scheme + "://" + targetURL.Host
1507+
var proxy *httputil.ReverseProxy
1508+
1509+
if cached, ok := s.proxies.Load(cacheKey); ok {
1510+
proxy = cached.(*httputil.ReverseProxy)
1511+
} else {
1512+
// Create a new reverse proxy
1513+
proxy = s.createReverseProxy()
1514+
s.proxies.Store(cacheKey, proxy)
1515+
}
1516+
1517+
// Direct proxy - no buffering, supports streaming, WebSocket, gRPC
1518+
proxy.ServeHTTP(w, r)
1519+
}
1520+
14351521
// createReverseProxy creates a new httputil.ReverseProxy with custom Director, ModifyResponse, and ErrorHandler
14361522
func (s *ProxyService) createReverseProxy() *httputil.ReverseProxy {
14371523
return &httputil.ReverseProxy{
@@ -1612,6 +1698,21 @@ func (rw *responseWriterWrapper) Write(b []byte) (int, error) {
16121698
return rw.ResponseWriter.Write(b)
16131699
}
16141700

1701+
// Hijack implements http.Hijacker interface for WebSocket support
1702+
func (rw *responseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) {
1703+
if hijacker, ok := rw.ResponseWriter.(http.Hijacker); ok {
1704+
return hijacker.Hijack()
1705+
}
1706+
return nil, nil, fmt.Errorf("underlying ResponseWriter does not support hijacking")
1707+
}
1708+
1709+
// Flush implements http.Flusher interface for streaming support
1710+
func (rw *responseWriterWrapper) Flush() {
1711+
if flusher, ok := rw.ResponseWriter.(http.Flusher); ok {
1712+
flusher.Flush()
1713+
}
1714+
}
1715+
16151716
// metricsWrapper wraps an HTTP handler to guarantee metrics recording for ALL requests
16161717
// It creates a metricsContext that travels with the request, handlers populate it,
16171718
// and the wrapper records metrics exactly once at the end via defer
@@ -1621,7 +1722,7 @@ func metricsWrapper(next http.Handler) http.Handler {
16211722
mctx := &metricsContext{
16221723
subdomain: extractSubdomainFromHost(r.Host),
16231724
backend: "unknown",
1624-
kind: getRequestKind(r.URL.Path),
1725+
kind: getRequestKind(r),
16251726
retried: false,
16261727
startTime: time.Now(),
16271728
statusCode: http.StatusOK,
@@ -2003,9 +2104,14 @@ func main() {
20032104

20042105
// Configure HTTP server with VERY generous settings for streaming/long-running requests
20052106
// We don't control what backends or clients expect, so timeouts are minimal
2107+
2108+
// Wrap handler with h2c (HTTP/2 cleartext) support for gRPC
2109+
// This allows both HTTP/1.1 and HTTP/2 on the same port
2110+
h2cHandler := h2c.NewHandler(service.Router(), &http2.Server{})
2111+
20062112
server := &http.Server{
20072113
Addr: ":" + port,
2008-
Handler: service.Router(),
2114+
Handler: h2cHandler,
20092115
// ReadTimeout covers: time to read request headers + body
20102116
// Set to 0 to support long-running uploads (e.g., large file uploads, streaming requests)
20112117
ReadTimeout: 0,

0 commit comments

Comments
 (0)