Skip to content

Commit 96019ed

Browse files
authored
Merge pull request #4 from alanprot/stream-workers
Allowing to set the number of stream workers
2 parents a334c20 + caba0fc commit 96019ed

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ type Config struct {
101101
GPRCServerMaxRecvMsgSize int `yaml:"grpc_server_max_recv_msg_size"`
102102
GRPCServerMaxSendMsgSize int `yaml:"grpc_server_max_send_msg_size"`
103103
GPRCServerMaxConcurrentStreams uint `yaml:"grpc_server_max_concurrent_streams"`
104+
GPRCServerNumStreamWorkers uint `yaml:"grpc_server_num_stream_workers"`
104105
GRPCServerMaxConnectionIdle time.Duration `yaml:"grpc_server_max_connection_idle"`
105106
GRPCServerMaxConnectionAge time.Duration `yaml:"grpc_server_max_connection_age"`
106107
GRPCServerMaxConnectionAgeGrace time.Duration `yaml:"grpc_server_max_connection_age_grace"`
@@ -159,6 +160,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
159160
f.IntVar(&cfg.GPRCServerMaxRecvMsgSize, "server.grpc-max-recv-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can receive (bytes).")
160161
f.IntVar(&cfg.GRPCServerMaxSendMsgSize, "server.grpc-max-send-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can send (bytes).")
161162
f.UintVar(&cfg.GPRCServerMaxConcurrentStreams, "server.grpc-max-concurrent-streams", 100, "Limit on the number of concurrent streams for gRPC calls (0 = unlimited)")
163+
f.UintVar(&cfg.GPRCServerNumStreamWorkers, "server.grpc_server-num-stream-workers", 0, "Number of worker goroutines that should be used to process incoming streams.Setting this 0 (default) will disable workers and spawn a new goroutine for each stream.")
162164
f.DurationVar(&cfg.GRPCServerMaxConnectionIdle, "server.grpc.keepalive.max-connection-idle", infinty, "The duration after which an idle connection should be closed. Default: infinity")
163165
f.DurationVar(&cfg.GRPCServerMaxConnectionAge, "server.grpc.keepalive.max-connection-age", infinty, "The duration for the maximum amount of time a connection may exist before it will be closed. Default: infinity")
164166
f.DurationVar(&cfg.GRPCServerMaxConnectionAgeGrace, "server.grpc.keepalive.max-connection-age-grace", infinty, "An additive period after max-connection-age after which the connection will be forcibly closed. Default: infinity")
@@ -354,6 +356,7 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) {
354356
grpcOptions := []grpc.ServerOption{
355357
grpc.ChainUnaryInterceptor(grpcMiddleware...),
356358
grpc.ChainStreamInterceptor(grpcStreamMiddleware...),
359+
grpc.NumStreamWorkers(uint32(cfg.GPRCServerNumStreamWorkers)),
357360
grpc.KeepaliveParams(grpcKeepAliveOptions),
358361
grpc.KeepaliveEnforcementPolicy(grpcKeepAliveEnforcementPolicy),
359362
grpc.MaxRecvMsgSize(cfg.GPRCServerMaxRecvMsgSize),

0 commit comments

Comments
 (0)