diff --git a/packages/grpc-js-xds/interop/xds-interop-server.ts b/packages/grpc-js-xds/interop/xds-interop-server.ts index b8af6e3aa..7c1aacb3a 100644 --- a/packages/grpc-js-xds/interop/xds-interop-server.ts +++ b/packages/grpc-js-xds/interop/xds-interop-server.ts @@ -232,12 +232,36 @@ function getIPv6Addresses(): string[] { return ipv6Addresses; } +interface ConfiguredMetrics { + qps?: number; + applicationUtilization?: number; + eps?: number; +} + +function createInBandMetricsInterceptor(metrics: ConfiguredMetrics) { + return function inBandMetricsInterceptor(methodDescriptor: grpc.ServerMethodDefinition, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall { + const metricsRecorder = call.getMetricsRecorder() + if (metrics.qps) { + metricsRecorder.recordQpsMetric(metrics.qps); + } + if (metrics.applicationUtilization) { + metricsRecorder.recordApplicationUtilizationMetric(metrics.applicationUtilization); + } + if (metrics.eps) { + metricsRecorder.recordEpsMetric(metrics.eps); + } + return new grpc.ServerInterceptingCall(call); + } +} + async function main() { const argv = yargs - .string(['port', 'maintenance_port', 'address_type', 'secure_mode']) + .string(['port', 'maintenance_port', 'address_type', 'secure_mode', 'metrics_mode']) + .number(['qps', 'application_utilization', 'eps']) .demandOption(['port']) .default('address_type', 'IPV4_IPV6') .default('secure_mode', 'false') + .default('metrics_mode', 'NONE') .parse() console.log('Starting xDS interop server. Args: ', argv); const healthImpl = new HealthImplementation({'': 'NOT_SERVING'}); @@ -253,7 +277,28 @@ async function main() { } const reflection = new ReflectionService(packageDefinition, { services: ['grpc.testing.TestService'] - }) + }); + let metricInterceptor: grpc.ServerInterceptor | null = null; + const metricsMode = argv.metrics_mode.toUpperCase(); + let metricRecorder: grpc.ServerMetricRecorder | null = null; + if (metricsMode === 'IN_BAND') { + metricInterceptor = createInBandMetricsInterceptor({ + qps: argv.qps, + applicationUtilization: argv.application_utilization, + eps: argv.eps + }); + } else if (metricsMode === 'OUT_OF_BAND') { + metricRecorder = new grpc.ServerMetricRecorder(); + if (argv.qps) { + metricRecorder.setQpsMetric(argv.qps); + } + if (argv.application_utilization) { + metricRecorder.setApplicationUtilizationMetric(argv.application_utilization); + } + if (argv.eps) { + metricRecorder.setEpsMetric(argv.eps); + } + } const addressType = argv.address_type.toUpperCase(); const secureMode = argv.secure_mode.toLowerCase() == 'true'; if (secureMode) { @@ -266,19 +311,33 @@ async function main() { reflection.addToServer(maintenanceServer); grpc.addAdminServicesToServer(maintenanceServer); - const server = new grpc_xds.XdsServer({interceptors: [testInfoInterceptor]}); + const interceptorList = [testInfoInterceptor]; + if (metricInterceptor) { + interceptorList.push(metricInterceptor); + } + const server = new grpc_xds.XdsServer({interceptors: interceptorList}); server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler); + if (metricRecorder) { + metricRecorder.addToServer(server); + } const xdsCreds = new grpc_xds.XdsServerCredentials(grpc.ServerCredentials.createInsecure()); await Promise.all([ serverBindPromise(maintenanceServer, `[::]:${argv.maintenance_port}`, grpc.ServerCredentials.createInsecure()), serverBindPromise(server, `0.0.0.0:${argv.port}`, xdsCreds) ]); } else { + const interceptorList = [unifiedInterceptor]; + if (metricInterceptor) { + interceptorList.push(metricInterceptor); + } const server = new grpc.Server({interceptors: [unifiedInterceptor]}); server.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl); healthImpl.addToServer(server); reflection.addToServer(server); grpc.addAdminServicesToServer(server); + if (metricRecorder) { + metricRecorder.addToServer(server); + } server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler); const creds = grpc.ServerCredentials.createInsecure(); switch (addressType) {