diff --git a/api/BUILD b/api/BUILD index 49ecae5f3e52d..64d8983096759 100644 --- a/api/BUILD +++ b/api/BUILD @@ -195,6 +195,7 @@ proto_library( "//envoy/extensions/filters/http/file_system_buffer/v3:pkg", "//envoy/extensions/filters/http/gcp_authn/v3:pkg", "//envoy/extensions/filters/http/geoip/v3:pkg", + "//envoy/extensions/filters/http/grpc_compressor/v3:pkg", "//envoy/extensions/filters/http/grpc_field_extraction/v3:pkg", "//envoy/extensions/filters/http/grpc_http1_bridge/v3:pkg", "//envoy/extensions/filters/http/grpc_http1_reverse_bridge/v3:pkg", diff --git a/api/envoy/extensions/filters/http/grpc_compressor/v3/BUILD b/api/envoy/extensions/filters/http/grpc_compressor/v3/BUILD new file mode 100644 index 0000000000000..09a37ad16b837 --- /dev/null +++ b/api/envoy/extensions/filters/http/grpc_compressor/v3/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/core/v3:pkg", + "@com_github_cncf_xds//udpa/annotations:pkg", + ], +) diff --git a/api/envoy/extensions/filters/http/grpc_compressor/v3/compressor.proto b/api/envoy/extensions/filters/http/grpc_compressor/v3/compressor.proto new file mode 100644 index 0000000000000..71d133dd175f3 --- /dev/null +++ b/api/envoy/extensions/filters/http/grpc_compressor/v3/compressor.proto @@ -0,0 +1,67 @@ +syntax = "proto3"; + +package envoy.extensions.filters.http.grpc_compressor.v3; + +import "envoy/config/core/v3/base.proto"; +import "envoy/config/core/v3/extension.proto"; + +import "google/protobuf/wrappers.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.http.grpc_compressor.v3"; +option java_outer_classname = "CompressorProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/grpc_compressor/v3;grpc_compressorv3"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: gRPC Compressor] +// gRPC Compressor :ref:`configuration overview `. +// [#extension: envoy.filters.http.grpc_compressor] + +message Compressor { + // Common configuration for filter behavior on both the request and response direction. + message CommonDirectionConfig { + // Runtime flag that controls whether compression is enabled for the direction this + // common config is applied to. When this field is ``false``, the filter will operate as a + // pass-through filter in the chosen direction, unless overridden by ``CompressorPerRoute``. + // If this field is not specified, the filter will be enabled. + config.core.v3.RuntimeFeatureFlag enabled = 1; + + // If set to true, the filter modifies the grpc-accept-encoding header by removing the compressor_library's encoding. + // This can help induce the peer not to compress the message to delegate compression to the filter if compression in + // the other direction is enabled. + // Defaults to false. + google.protobuf.BoolValue remove_grpc_accept_encoding = 2; + + // Minimum message length, in bytes, that will trigger compression. Defaults to 30. + google.protobuf.UInt32Value min_message_length = 3; + + // Maximum message length, in bytes. Defaults to 10240. If a message is larger than this value, + // Envoy will reset the stream. This is to ensure the filter does not buffer excessive data + // while waiting for a message to be fully received. + google.protobuf.UInt32Value max_message_length = 4; + } + + // Configuration for filter behavior on the request direction. + message RequestDirectionConfig { + CommonDirectionConfig common_config = 1; + } + + // Configuration for filter behavior on the response direction. + message ResponseDirectionConfig { + CommonDirectionConfig common_config = 1; + } + + // A compressor library to use for compression. + // [#extension-category: envoy.compression.compressor] + config.core.v3.TypedExtensionConfig compressor_library = 1 + [(validate.rules).message = {required: true}]; + + // Configuration for request compression. If this field is not specified, request compression is disabled. + RequestDirectionConfig request_direction_config = 2; + + // Configuration for response compression. If this field is not specified, response compression is enabled. + ResponseDirectionConfig response_direction_config = 3; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index 1be54a0698bb4..389a39e3494b4 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -134,6 +134,7 @@ proto_library( "//envoy/extensions/filters/http/file_system_buffer/v3:pkg", "//envoy/extensions/filters/http/gcp_authn/v3:pkg", "//envoy/extensions/filters/http/geoip/v3:pkg", + "//envoy/extensions/filters/http/grpc_compressor/v3:pkg", "//envoy/extensions/filters/http/grpc_field_extraction/v3:pkg", "//envoy/extensions/filters/http/grpc_http1_bridge/v3:pkg", "//envoy/extensions/filters/http/grpc_http1_reverse_bridge/v3:pkg", diff --git a/docs/root/configuration/http/http_filters/grpc_compression_filter.rst b/docs/root/configuration/http/http_filters/grpc_compression_filter.rst new file mode 100644 index 0000000000000..00919ccc01391 --- /dev/null +++ b/docs/root/configuration/http/http_filters/grpc_compression_filter.rst @@ -0,0 +1,11 @@ +.. _config_http_filters_grpc_compression: + +gRPC Compression +================ + +* gRPC :ref:`architecture overview ` +* This filter should be configured with the type URL ``type.googleapis.com/envoy.extensions.filters.http.grpc_compressor.v3.Compressor``. +* :ref:`v3 API reference ` + +The gRPC compression filter enables compression of messages following the `gRPC over HTTP/2 specification +`_. diff --git a/envoy/compression/compressor/factory.h b/envoy/compression/compressor/factory.h index 4587e3a297b36..c140afb3a407e 100644 --- a/envoy/compression/compressor/factory.h +++ b/envoy/compression/compressor/factory.h @@ -10,6 +10,7 @@ class CompressorFactory { public: virtual ~CompressorFactory() = default; + // Create a new compressor instance. This should be thread-safe. virtual CompressorPtr createCompressor() PURE; virtual const std::string& statsPrefix() const PURE; virtual const std::string& contentEncoding() const PURE; diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index c3337ef7932d9..a6ff5515094db 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -176,6 +176,7 @@ EXTENSIONS = { "envoy.filters.http.file_system_buffer": "//source/extensions/filters/http/file_system_buffer:config", "envoy.filters.http.gcp_authn": "//source/extensions/filters/http/gcp_authn:config", "envoy.filters.http.geoip": "//source/extensions/filters/http/geoip:config", + "envoy.filters.http.grpc_compressor": "//source/extensions/filters/http/grpc_compressor:config", "envoy.filters.http.grpc_field_extraction": "//source/extensions/filters/http/grpc_field_extraction:config", "envoy.filters.http.grpc_http1_bridge": "//source/extensions/filters/http/grpc_http1_bridge:config", "envoy.filters.http.grpc_http1_reverse_bridge": "//source/extensions/filters/http/grpc_http1_reverse_bridge:config", diff --git a/source/extensions/extensions_metadata.yaml b/source/extensions/extensions_metadata.yaml index 8d5b04d781e0f..a5a0fd2dec9d7 100644 --- a/source/extensions/extensions_metadata.yaml +++ b/source/extensions/extensions_metadata.yaml @@ -369,6 +369,13 @@ envoy.filters.http.compressor: type_urls: - envoy.extensions.filters.http.compressor.v3.Compressor - envoy.extensions.filters.http.compressor.v3.CompressorPerRoute +envoy.filters.http.grpc_compressor: + categories: + - envoy.filters.http + security_posture: unknown + status: wip + type_urls: + - envoy.extensions.filters.http.grpc_compressor.v3.Compressor envoy.filters.http.connect_grpc_bridge: categories: - envoy.filters.http diff --git a/source/extensions/filters/http/grpc_compressor/BUILD b/source/extensions/filters/http/grpc_compressor/BUILD new file mode 100644 index 0000000000000..447fedde16cdb --- /dev/null +++ b/source/extensions/filters/http/grpc_compressor/BUILD @@ -0,0 +1,44 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +# HTTP L7 filter that performs compression with configurable compression libraries +# Public docs: https://envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/compressor_filter + +envoy_extension_package() + +envoy_cc_library( + name = "compressor_filter_lib", + srcs = ["compressor_filter.cc"], + hdrs = ["compressor_filter.h"], + deps = [ + "//envoy/compression/compressor:compressor_config_interface", + "//envoy/compression/compressor:compressor_factory_interface", + "//envoy/registry", + "//envoy/stats:stats_macros", + "//source/common/config:utility_lib", + "//source/common/grpc:codec_lib", + "//source/common/runtime:runtime_lib", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "@envoy_api//envoy/extensions/filters/http/grpc_compressor/v3:pkg_cc_proto", + ], +) + +envoy_cc_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + ":compressor_filter_lib", + "//envoy/compression/compressor:compressor_config_interface", + "//source/common/config:utility_lib", + "//source/extensions/filters/http/common:factory_base_lib", + "//source/server:generic_factory_context_lib", + "@envoy_api//envoy/extensions/filters/http/grpc_compressor/v3:pkg_cc_proto", + ], +) diff --git a/source/extensions/filters/http/grpc_compressor/compressor_filter.cc b/source/extensions/filters/http/grpc_compressor/compressor_filter.cc new file mode 100644 index 0000000000000..c01c5a05127e4 --- /dev/null +++ b/source/extensions/filters/http/grpc_compressor/compressor_filter.cc @@ -0,0 +1,282 @@ +#include "source/extensions/filters/http/grpc_compressor/compressor_filter.h" + +#include "source/common/config/utility.h" +#include "source/common/grpc/common.h" +#include "source/common/http/headers.h" +#include "source/common/http/utility.h" +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace GrpcCompressor { + +namespace { + +Http::RegisterCustomInlineHeader + request_grpc_accept_encoding_handle(Http::CustomHeaders::get().GrpcAcceptEncoding); +Http::RegisterCustomInlineHeader + response_grpc_accept_encoding_handle(Http::CustomHeaders::get().GrpcAcceptEncoding); +Http::RegisterCustomInlineHeader + request_grpc_encoding_handle(Http::CustomHeaders::get().GrpcEncoding); +Http::RegisterCustomInlineHeader + response_grpc_encoding_handle(Http::CustomHeaders::get().GrpcEncoding); + +// Default minimum length of an upstream response that allows compression. +const uint32_t DefaultMinimumMessageLength = 30; + +void compressAndUpdateStats(Compression::Compressor::CompressorFactory& compressor_factory, + const CommonCompressorStats& stats, Grpc::Frame& frame) { + auto compressor = + compressor_factory.createCompressor(); // each message is compressed individually + stats.total_uncompressed_bytes_.add(frame.data_->length()); + compressor->compress(*frame.data_, Envoy::Compression::Compressor::State::Finish); + stats.total_compressed_bytes_.add(frame.data_->length()); + frame.flags_ = frame.flags_ | Grpc::GRPC_FH_COMPRESSED; + frame.length_ = frame.data_->length(); +} + +void flushAndResetFrames(Buffer::Instance& output, std::vector& frames, + Compression::Compressor::CompressorFactory& compressor_factory, + Grpc::Encoder& encoder, const CommonCompressorStats& stats) { + for (auto& frame : frames) { + compressAndUpdateStats(compressor_factory, stats, frame); + encoder.prependFrameHeader(frame.flags_, *frame.data_); + output.move(*frame.data_); + } + frames.clear(); +} + +} // namespace + +CompressorFilterConfig::DirectionConfig::DirectionConfig( + const envoy::extensions::filters::http::grpc_compressor::v3::Compressor::CommonDirectionConfig& + proto_config, + const std::string& stats_prefix, Stats::Scope& scope, Runtime::Loader& runtime) + : compression_enabled_(proto_config.enabled(), runtime), + remove_grpc_accept_encoding_( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config, remove_grpc_accept_encoding, false)), + min_message_length_{PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config, min_message_length, + DefaultMinimumMessageLength)}, + max_message_length_{PROTOBUF_GET_OPTIONAL_WRAPPED(proto_config, max_message_length)}, + stats_{generateStats(stats_prefix, scope)} {} + +CompressorFilterConfig::RequestDirectionConfig::RequestDirectionConfig( + const envoy::extensions::filters::http::grpc_compressor::v3::Compressor& proto_config, + const std::string& stats_prefix, Stats::Scope& scope, Runtime::Loader& runtime) + : DirectionConfig(proto_config.request_direction_config().common_config(), + stats_prefix + "request.", scope, runtime), + is_set_{proto_config.has_request_direction_config()} {} + +CompressorFilterConfig::ResponseDirectionConfig::ResponseDirectionConfig( + const envoy::extensions::filters::http::grpc_compressor::v3::Compressor& proto_config, + const std::string& stats_prefix, Stats::Scope& scope, Runtime::Loader& runtime) + : DirectionConfig(proto_config.response_direction_config().common_config(), + stats_prefix + "response.", scope, runtime), + is_set_{proto_config.has_response_direction_config()} {} + +CompressorFilterConfig::CompressorFilterConfig( + const envoy::extensions::filters::http::grpc_compressor::v3::Compressor& proto_config, + const std::string& stats_prefix, Stats::Scope& scope, Runtime::Loader& runtime, + Compression::Compressor::CompressorFactoryPtr compressor_factory) + : common_stats_prefix_(fmt::format("{}grpc_compressor.{}.{}", stats_prefix, + proto_config.compressor_library().name(), + compressor_factory->statsPrefix())), + request_direction_config_(proto_config, common_stats_prefix_, scope, runtime), + response_direction_config_(proto_config, common_stats_prefix_, scope, runtime), + grpc_encoding_( + compressor_factory + ->contentEncoding()), // so far, grpc matches content encoding value for http + compressor_factory_(std::move(compressor_factory)) {} + +CompressorFilter::CompressorFilter(const CompressorFilterConfigSharedPtr config) + : config_(std::move(config)) { + if (config_->requestDirectionConfig().maximumMessageLength().has_value()) { + request_decoder_.setMaxFrameLength( + config_->requestDirectionConfig().maximumMessageLength().value()); + } + if (config_->responseDirectionConfig().maximumMessageLength().has_value()) { + response_decoder_.setMaxFrameLength( + config_->responseDirectionConfig().maximumMessageLength().value()); + } +} + +Http::FilterHeadersStatus CompressorFilter::decodeHeaders(Http::RequestHeaderMap& headers, bool) { + is_grpc_ = Grpc::Common::isGrpcRequestHeaders(headers); + if (!is_grpc_) { + return Http::FilterHeadersStatus::Continue; + } + + const auto& request_config = config_->requestDirectionConfig(); + const auto& response_config = config_->responseDirectionConfig(); + + // Handle response direction + const Http::HeaderEntry* accept_encoding = + headers.getInline(request_grpc_accept_encoding_handle.handle()); + if (accept_encoding != nullptr) { + // Capture the value of the "Accept-Encoding" request header to use it later when making + // decision on compressing the corresponding HTTP response. + request_grpc_accept_encoding_ = accept_encoding->value().getStringView(); + } + + // Handle request direction + // Check removing encoding from accept encoding header if response decompression is enabled. + if (compressionEnabled(response_config) && request_config.removeGrpcAcceptEncoding()) { + removeGrpcAcceptEncoding(headers); + } + + request_compression_enabled_ = compressionEnabled(request_config) && + isGrpcEncodingAllowed(headers) && connectionAllowsGrpcEncoding(); + if (request_compression_enabled_) { + headers.setInline(request_grpc_encoding_handle.handle(), getGrpcEncoding()); + request_config.stats().compressed_rpc_.inc(); + } else { + request_config.stats().not_compressed_rpc_.inc(); + } + return Http::FilterHeadersStatus::Continue; +} + +Http::FilterDataStatus CompressorFilter::decodeData(Buffer::Instance& data, bool) { + if (!request_compression_enabled_) { + return Http::FilterDataStatus::Continue; + } + + const auto& request_config = config_->requestDirectionConfig(); + absl::Status status = request_decoder_.decode(data, request_frames_); + if (!status.ok()) { + // Sending a response is not guaranteed to be sent to the client, since the response may already + // be in flight. This will in that case simply reset the stream. + if (status.code() == absl::StatusCode::kResourceExhausted) { + request_config.stats().message_length_too_large_.inc(); + decoder_callbacks_->sendLocalReply( + Http::Code::PayloadTooLarge, "Request message length too large", nullptr, + Grpc::Status::WellKnownGrpcStatus::ResourceExhausted, "grpc_compressor_rq_msg_too_large"); + } else { + request_config.stats().message_decoding_error_.inc(); + decoder_callbacks_->sendLocalReply( + Http::Code::InternalServerError, "Could not decode message in request direction", nullptr, + Grpc::Status::WellKnownGrpcStatus::Internal, "grpc_compressor_rq_msg_decode_error"); + } + + return Http::FilterDataStatus::StopIterationNoBuffer; + } + + flushAndResetFrames(data, request_frames_, config_->compressorFactory(), request_encoder_, + request_config.stats()); + + return Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus CompressorFilter::decodeTrailers(Http::RequestTrailerMap&) { + return Http::FilterTrailersStatus::Continue; +} + +Http::FilterHeadersStatus CompressorFilter::encodeHeaders(Http::ResponseHeaderMap& headers, bool) { + if (!is_grpc_) { + return Http::FilterHeadersStatus::Continue; + } + + const auto& request_config = config_->requestDirectionConfig(); + const auto& response_config = config_->responseDirectionConfig(); + + // Check adding encoding to accept encoding header if request compression is enabled. + if (compressionEnabled(request_config) && response_config.removeGrpcAcceptEncoding()) { + removeGrpcAcceptEncoding(headers); + } + + // Handle response direction + response_compression_enabled_ = + compressionEnabled(response_config) && isGrpcEncodingAllowed(headers) && + isGrpcAcceptEncodingAllowed(request_grpc_accept_encoding_.value_or("")); + if (response_compression_enabled_) { + headers.setInline(response_grpc_encoding_handle.handle(), getGrpcEncoding()); + response_config.stats().compressed_rpc_.inc(); + } else { + response_config.stats().not_compressed_rpc_.inc(); + } + return Http::FilterHeadersStatus::Continue; +} + +Http::FilterDataStatus CompressorFilter::encodeData(Buffer::Instance& data, bool) { + if (!response_compression_enabled_) { + return Http::FilterDataStatus::Continue; + } + + const auto& response_config = config_->responseDirectionConfig(); + absl::Status status = response_decoder_.decode(data, response_frames_); + if (!status.ok()) { + // Since the response is already in flight, we can only reset the stream. + if (status.code() == absl::StatusCode::kResourceExhausted) { + response_config.stats().message_length_too_large_.inc(); + decoder_callbacks_->resetStream(Http::StreamResetReason::LocalReset, + "Response message length too large"); + } else { + response_config.stats().message_decoding_error_.inc(); + decoder_callbacks_->resetStream(Http::StreamResetReason::LocalReset, + "Could not decode message in response direction"); + } + + return Http::FilterDataStatus::StopIterationNoBuffer; + } + + flushAndResetFrames(data, response_frames_, config_->compressorFactory(), response_encoder_, + response_config.stats()); + + return Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus CompressorFilter::encodeTrailers(Http::ResponseTrailerMap&) { + return Http::FilterTrailersStatus::Continue; +} + +bool CompressorFilter::compressionEnabled(const DirectionConfigOptRef config) const { + return config->compressionEnabled(); +} + +std::string CompressorFilter::getGrpcEncoding() const { return config_->grpcEncoding(); } + +bool CompressorFilter::connectionAllowsGrpcEncoding() const { + // TODO(wtzhang23): Handle request direction caching grpc-accept-encoding header for subsequent + // requests made on the same connection. + return true; +} + +bool CompressorFilter::isGrpcAcceptEncodingAllowed( + const absl::string_view grpc_accept_encoding) const { + for (absl::string_view header_value : StringUtil::splitToken(grpc_accept_encoding, ",")) { + const auto trimmed_value = StringUtil::trim(header_value); + if (absl::EqualsIgnoreCase(trimmed_value, getGrpcEncoding())) { + return true; + } + } + return false; +} + +template <> +Http::CustomInlineHeaderRegistry::Handle +CompressorFilter::getGrpcEncodingHandle() { + return request_grpc_encoding_handle.handle(); +} + +template <> +Http::CustomInlineHeaderRegistry::Handle +CompressorFilter::getGrpcEncodingHandle() { + return response_grpc_encoding_handle.handle(); +} + +template <> +Http::CustomInlineHeaderRegistry::Handle +CompressorFilter::getGrpcAcceptEncodingHandle() { + return request_grpc_accept_encoding_handle.handle(); +} + +template <> +Http::CustomInlineHeaderRegistry::Handle +CompressorFilter::getGrpcAcceptEncodingHandle() { + return response_grpc_accept_encoding_handle.handle(); +} + +} // namespace GrpcCompressor +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/grpc_compressor/compressor_filter.h b/source/extensions/filters/http/grpc_compressor/compressor_filter.h new file mode 100644 index 0000000000000..1cd4806e0b80c --- /dev/null +++ b/source/extensions/filters/http/grpc_compressor/compressor_filter.h @@ -0,0 +1,224 @@ +#pragma once + +#include "envoy/compression/compressor/factory.h" +#include "envoy/extensions/filters/http/grpc_compressor/v3/compressor.pb.h" +#include "envoy/server/factory_context.h" +#include "envoy/stats/stats_macros.h" + +#include "source/common/common/logger.h" +#include "source/common/grpc/codec.h" +#include "source/common/http/headers.h" +#include "source/common/protobuf/protobuf.h" +#include "source/common/runtime/runtime_protos.h" +#include "source/extensions/filters/http/common/pass_through_filter.h" + +#include "absl/types/optional.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace GrpcCompressor { + +/** + * gRPC compressor filter stats common for responses and requests. @see stats_macros.h + * "total_uncompressed_bytes" only includes bytes from requests or responses that were marked for + * compression. If the request (or response) was not marked for compression, the filter increments + * "not_compressed", but does not add to "total_uncompressed_bytes". This way, the user can + * measure the memory performance of the compression. + */ +#define COMMON_GRPC_COMPRESSOR_STATS(COUNTER) \ + COUNTER(compressed_rpc) \ + COUNTER(not_compressed_rpc) \ + COUNTER(compressed_msg) \ + COUNTER(total_uncompressed_bytes) \ + COUNTER(total_compressed_bytes) \ + COUNTER(message_length_too_large) \ + COUNTER(message_decoding_error) + +/** + * Struct definition for gRPC compressor stats. @see stats_macros.h + */ +struct CommonCompressorStats { + COMMON_GRPC_COMPRESSOR_STATS(GENERATE_COUNTER_STRUCT) +}; + +/** + * Configuration for the gRPC compressor filter. + */ +class CompressorFilterConfig { +public: + class DirectionConfig { + public: + DirectionConfig(const envoy::extensions::filters::http::grpc_compressor::v3::Compressor:: + CommonDirectionConfig& proto_config, + const std::string& stats_prefix, Stats::Scope& scope, Runtime::Loader& runtime); + + virtual ~DirectionConfig() = default; + + virtual bool compressionEnabled() const PURE; + + const CommonCompressorStats& stats() const { return stats_; } + bool removeGrpcAcceptEncoding() const { return remove_grpc_accept_encoding_; } + uint32_t minimumMessageLength() const { return min_message_length_; } + absl::optional maximumMessageLength() const { return max_message_length_; } + + protected: + const Runtime::FeatureFlag compression_enabled_; + + private: + static CommonCompressorStats generateStats(const std::string& prefix, Stats::Scope& scope) { + return CommonCompressorStats{ + COMMON_GRPC_COMPRESSOR_STATS(POOL_COUNTER_PREFIX(scope, prefix))}; + } + + const bool remove_grpc_accept_encoding_; + const uint32_t min_message_length_; + const absl::optional max_message_length_; + const CommonCompressorStats stats_; + }; + + class RequestDirectionConfig : public DirectionConfig { + public: + RequestDirectionConfig( + const envoy::extensions::filters::http::grpc_compressor::v3::Compressor& proto_config, + const std::string& stats_prefix, Stats::Scope& scope, Runtime::Loader& runtime); + + bool compressionEnabled() const override { return is_set_ && compression_enabled_.enabled(); } + + private: + const bool is_set_; + }; + + class ResponseDirectionConfig : public DirectionConfig { + public: + ResponseDirectionConfig( + const envoy::extensions::filters::http::grpc_compressor::v3::Compressor& proto_config, + const std::string& stats_prefix, Stats::Scope& scope, Runtime::Loader& runtime); + + bool compressionEnabled() const override { return is_set_ && compression_enabled_.enabled(); } + + private: + const bool is_set_; + }; + + CompressorFilterConfig( + const envoy::extensions::filters::http::grpc_compressor::v3::Compressor& proto_config, + const std::string& stats_prefix, Stats::Scope& scope, Runtime::Loader& runtime, + Envoy::Compression::Compressor::CompressorFactoryPtr compressor_factory); + + const std::string grpcEncoding() const { return grpc_encoding_; }; + const RequestDirectionConfig& requestDirectionConfig() { return request_direction_config_; } + const ResponseDirectionConfig& responseDirectionConfig() { return response_direction_config_; } + Envoy::Compression::Compressor::CompressorFactory& compressorFactory() const { + return *compressor_factory_; + } + +private: + const std::string common_stats_prefix_; + const RequestDirectionConfig request_direction_config_; + const ResponseDirectionConfig response_direction_config_; + const std::string grpc_encoding_; + const Envoy::Compression::Compressor::CompressorFactoryPtr compressor_factory_; +}; +using CompressorFilterConfigSharedPtr = std::shared_ptr; + +using CompressorFactoryConstOptRef = + OptRef; + +using DirectionConfigOptRef = OptRef; + +/** + * A filter that optionallycompresses data both in request and response directions. + */ +class CompressorFilter : public Http::PassThroughFilter, + public Logger::Loggable { +public: + explicit CompressorFilter(const CompressorFilterConfigSharedPtr config); + + // Http::StreamDecoderFilter + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers, + bool end_stream) override; + Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override; + Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override; + + // Http::StreamEncoderFilter + Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers, + bool end_stream) override; + Http::FilterDataStatus encodeData(Buffer::Instance& data, bool end_stream) override; + Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap& trailers) override; + +private: + bool compressionEnabled(const DirectionConfigOptRef config) const; + + // Returns the appropriate grpc encoding for the current route. + std::string getGrpcEncoding() const; + + template + static Http::CustomInlineHeaderRegistry::Handle getGrpcEncodingHandle(); + + /** + * grpc-encoding matches if the header is not present or the value is the identity encoding. + */ + template bool isGrpcEncodingAllowed(HeaderType& headers) const { + const auto handle = getGrpcEncodingHandle(); + const auto grpc_encoding = headers.getInline(handle); + if (grpc_encoding == nullptr) { + return true; + } + return absl::EqualsIgnoreCase(grpc_encoding->value().getStringView(), + Http::CustomHeaders::get().AcceptEncodingValues.Identity); + } + bool connectionAllowsGrpcEncoding() const; + + /** + * grpc-accept-encoding matches if one of the values in the header is the configured encoding. + */ + bool isGrpcAcceptEncodingAllowed(const absl::string_view grpc_accept_encoding) const; + + /** + * Remove this filter's encoding from the grpc-accept-encoding header if it exists. + */ + template + static Http::CustomInlineHeaderRegistry::Handle getGrpcAcceptEncodingHandle(); + template void removeGrpcAcceptEncoding(HeaderType& headers) const { + const auto handle = getGrpcAcceptEncodingHandle(); + const auto grpc_accept_encoding = headers.getInline(handle); + if (grpc_accept_encoding == nullptr) { + return; + } + std::vector remaining_values; + for (absl::string_view header_value : + StringUtil::splitToken(grpc_accept_encoding->value().getStringView(), ",")) { + const auto trimmed_value = StringUtil::trim(header_value); + if (!absl::EqualsIgnoreCase(trimmed_value, getGrpcEncoding())) { + remaining_values.push_back(trimmed_value); + } + } + if (!remaining_values.empty()) { + headers.setInline(handle, absl::StrJoin(remaining_values, ",")); + } else { + headers.removeInline(handle); + } + } + + const CompressorFilterConfigSharedPtr config_; + bool is_grpc_{}; + // This header sourced from the request headers determines the encoding allowed for the response. + absl::optional request_grpc_accept_encoding_; + + bool response_compression_enabled_{}; + Grpc::Decoder request_decoder_; + Grpc::Encoder request_encoder_; + // Keep vector of frames around to preserve vector allocation and avoid reallocation. + std::vector request_frames_; + + bool request_compression_enabled_{}; + Grpc::Decoder response_decoder_; + Grpc::Encoder response_encoder_; + // Keep vector of frames around to preserve vector allocation and avoid reallocation. + std::vector response_frames_; +}; +} // namespace GrpcCompressor +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/grpc_compressor/config.cc b/source/extensions/filters/http/grpc_compressor/config.cc new file mode 100644 index 0000000000000..2ecd66cb0a232 --- /dev/null +++ b/source/extensions/filters/http/grpc_compressor/config.cc @@ -0,0 +1,48 @@ +#include "source/extensions/filters/http/grpc_compressor/config.h" + +#include "envoy/compression/compressor/config.h" + +#include "source/common/config/utility.h" +#include "source/extensions/filters/http/grpc_compressor/compressor_filter.h" +#include "source/server/generic_factory_context.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace GrpcCompressor { + +absl::StatusOr +GrpcCompressorFilterFactory::createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::http::grpc_compressor::v3::Compressor& proto_config, + const std::string& stats_prefix, Server::Configuration::FactoryContext& context) { + const std::string type{TypeUtil::typeUrlToDescriptorFullName( + proto_config.compressor_library().typed_config().type_url())}; + Compression::Compressor::NamedCompressorLibraryConfigFactory* const config_factory = + Registry::FactoryRegistry< + Compression::Compressor::NamedCompressorLibraryConfigFactory>::getFactoryByType(type); + if (config_factory == nullptr) { + return absl::InvalidArgumentError( + fmt::format("Didn't find a registered implementation for type: '{}'", type)); + } + ProtobufTypes::MessagePtr message = Config::Utility::translateAnyToFactoryConfig( + proto_config.compressor_library().typed_config(), context.messageValidationVisitor(), + *config_factory); + Compression::Compressor::CompressorFactoryPtr compressor_factory = + config_factory->createCompressorFactoryFromProto(*message, context); + CompressorFilterConfigSharedPtr config = std::make_shared( + proto_config, stats_prefix, context.scope(), context.serverFactoryContext().runtime(), + std::move(compressor_factory)); + return [config](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamFilter(std::make_shared(config)); + }; +} + +/** + * Static registration for the gRPC compressor filter. @see NamedHttpFilterConfigFactory. + */ +REGISTER_FACTORY(GrpcCompressorFilterFactory, Server::Configuration::NamedHttpFilterConfigFactory); + +} // namespace GrpcCompressor +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/grpc_compressor/config.h b/source/extensions/filters/http/grpc_compressor/config.h new file mode 100644 index 0000000000000..1c364fab4e02d --- /dev/null +++ b/source/extensions/filters/http/grpc_compressor/config.h @@ -0,0 +1,33 @@ +#pragma once + +#include "envoy/extensions/filters/http/grpc_compressor/v3/compressor.pb.h" +#include "envoy/extensions/filters/http/grpc_compressor/v3/compressor.pb.validate.h" + +#include "source/extensions/filters/http/common/factory_base.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace GrpcCompressor { + +/** + * Config registration for the gRPC compressor filter. @see NamedHttpFilterConfigFactory. + */ +class GrpcCompressorFilterFactory + : public Common::ExceptionFreeFactoryBase< + envoy::extensions::filters::http::grpc_compressor::v3::Compressor> { +public: + GrpcCompressorFilterFactory() : ExceptionFreeFactoryBase("envoy.filters.http.grpc_compressor") {} + +private: + absl::StatusOr createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::http::grpc_compressor::v3::Compressor& proto_config, + const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override; +}; + +DECLARE_FACTORY(GrpcCompressorFilterFactory); + +} // namespace GrpcCompressor +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy