Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/extensions/filters/http/on_demand/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ envoy_extension_cc_test(
"//source/extensions/filters/http/on_demand:on_demand_update_lib",
"//test/config:v2_link_hacks",
"//test/integration:http_integration_lib",
"//test/integration:http_protocol_integration_lib",
"//test/integration:scoped_rds_lib",
"//test/integration:vhds_lib",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
Expand Down
207 changes: 207 additions & 0 deletions test/extensions/filters/http/on_demand/on_demand_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "test/common/grpc/grpc_client_integration.h"
#include "test/config/v2_link_hacks.h"
#include "test/integration/http_integration.h"
#include "test/integration/http_protocol_integration.h"
#include "test/integration/scoped_rds.h"
#include "test/integration/vhds.h"
#include "test/test_common/printers.h"
Expand Down Expand Up @@ -761,5 +762,211 @@ TEST_P(OnDemandVhdsIntegrationTest, AttemptAddingDuplicateDomainNames) {
cleanupUpstreamAndDownstream();
}

// Test class for VHDS on-demand updates with request bodies
class OnDemandVhdsWithBodyIntegrationTest
: public testing::TestWithParam<
std::tuple<HttpProtocolTestParams, std::tuple<Network::Address::IpVersion,
Grpc::ClientType, Grpc::LegacyOrUnified>>>,
public HttpIntegrationTest {
public:
using ParamType =
std::tuple<HttpProtocolTestParams,
std::tuple<Network::Address::IpVersion, Grpc::ClientType, Grpc::LegacyOrUnified>>;

const HttpProtocolTestParams& httpProtocolParams() const { return std::get<0>(GetParam()); }

OnDemandVhdsWithBodyIntegrationTest()
: HttpIntegrationTest(httpProtocolParams().downstream_protocol, httpProtocolParams().version,
ConfigHelper::httpProxyConfig(
/*downstream_is_quic=*/httpProtocolParams().downstream_protocol ==
Http::CodecType::HTTP3)),
use_universal_header_validator_(httpProtocolParams().use_universal_header_validator) {
setupHttp2ImplOverrides(httpProtocolParams().http2_implementation);
config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_universal_header_validator",
use_universal_header_validator_ ? "true" : "false");
use_lds_ = false;
config_helper_.addRuntimeOverride(
"envoy.reloadable_features.unified_mux",
std::get<2>(std::get<1>(GetParam())) == Grpc::LegacyOrUnified::Unified ? "true" : "false");
config_helper_.prependFilter(R"EOF(
name: envoy.filters.http.on_demand
)EOF");
}

void SetUp() override {
setDownstreamProtocol(httpProtocolParams().downstream_protocol);
setUpstreamProtocol(httpProtocolParams().upstream_protocol);
}

void TearDown() override { cleanUpXdsConnection(); }

std::string virtualHostYaml(const std::string& name, const std::string& domain) {
return fmt::format(R"EOF(
name: {}
domains: [{}]
routes:
- match: {{ prefix: "/" }}
route: {{ cluster: "cluster_0" }}
)EOF",
name, domain);
}

std::string vhdsRequestResourceName(const std::string& host_header) {
return "my_route/" + host_header;
}

envoy::config::route::v3::VirtualHost buildVirtualHost(const std::string& name,
const std::string& domain) {
return TestUtility::parseYaml<envoy::config::route::v3::VirtualHost>(
virtualHostYaml(name, domain));
}

void initialize() override {
setUpstreamCount(2); // xds_cluster and cluster_0
setUpstreamProtocol(Http::CodecType::HTTP2); // xDS uses gRPC uses HTTP2

const auto ip_version = httpProtocolParams().version;
config_helper_.addConfigModifier([ip_version](
envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
// Add xds_cluster for VHDS
auto* xds_cluster = bootstrap.mutable_static_resources()->add_clusters();
xds_cluster->MergeFrom(ConfigHelper::buildStaticCluster(
"xds_cluster", /*port=*/0, Network::Test::getLoopbackAddressString(ip_version)));
ConfigHelper::setHttp2(*xds_cluster);

auto* listener = bootstrap.mutable_static_resources()->mutable_listeners(0);
auto* filter_chain = listener->mutable_filter_chains(0);
auto* config_blob = filter_chain->mutable_filters(0)->mutable_typed_config();

ASSERT_TRUE(config_blob->Is<envoy::extensions::filters::network::http_connection_manager::v3::
HttpConnectionManager>());
auto hcm_config = MessageUtil::anyConvert<
envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager>(
*config_blob);

// Use RDS instead of static route_config so Envoy connects to xDS server
hcm_config.clear_route_config();
auto* rds_config = hcm_config.mutable_rds();
rds_config->set_route_config_name("my_route");
auto* rds_config_source = rds_config->mutable_config_source();
rds_config_source->mutable_api_config_source()->set_api_type(
envoy::config::core::v3::ApiConfigSource::GRPC);
rds_config_source->mutable_api_config_source()
->add_grpc_services()
->mutable_envoy_grpc()
->set_cluster_name("xds_cluster");

config_blob->PackFrom(hcm_config);
});

HttpIntegrationTest::initialize();

test_server_->waitUntilListenersReady();
registerTestServerPorts({"http"});

// Set up xDS connection (xds_cluster is the second cluster, so it maps to fake_upstreams_[1])
auto result = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, xds_connection_);
RELEASE_ASSERT(result, result.message());
result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_);
RELEASE_ASSERT(result, result.message());
xds_stream_->startGrpcStream();

EXPECT_TRUE(compareSotwDiscoveryRequest(Config::TestTypeUrl::get().RouteConfiguration, "",
{"my_route"}, true));
envoy::config::route::v3::RouteConfiguration route_config;
route_config.set_name("my_route");
auto* vhds_config_source = route_config.mutable_vhds()->mutable_config_source();
vhds_config_source->mutable_api_config_source()->set_api_type(
envoy::config::core::v3::ApiConfigSource::DELTA_GRPC);
vhds_config_source->mutable_api_config_source()
->add_grpc_services()
->mutable_envoy_grpc()
->set_cluster_name("xds_cluster");
sendSotwDiscoveryResponse<envoy::config::route::v3::RouteConfiguration>(
Config::TestTypeUrl::get().RouteConfiguration, {route_config}, "1");

result = xds_connection_->waitForNewStream(*dispatcher_, vhds_stream_);
RELEASE_ASSERT(result, result.message());
vhds_stream_->startGrpcStream();

EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TestTypeUrl::get().VirtualHost, {}, {},
vhds_stream_.get()));
}

FakeStreamPtr vhds_stream_;

protected:
const bool use_universal_header_validator_;
};

INSTANTIATE_TEST_SUITE_P(
ProtocolsAndGrpcTypes, OnDemandVhdsWithBodyIntegrationTest,
testing::Combine(
testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParamsWithoutHTTP3()),
UNIFIED_LEGACY_GRPC_CLIENT_INTEGRATION_PARAMS),
[](const testing::TestParamInfo<
std::tuple<HttpProtocolTestParams, std::tuple<Network::Address::IpVersion, Grpc::ClientType,
Grpc::LegacyOrUnified>>>& info) {
return absl::StrCat(
HttpProtocolIntegrationTest::protocolTestParamsToString(
testing::TestParamInfo<HttpProtocolTestParams>(std::get<0>(info.param), 0)),
"_",
Grpc::UnifiedOrLegacyMuxIntegrationParamTest::protocolTestParamsToString(
testing::TestParamInfo<
std::tuple<Network::Address::IpVersion, Grpc::ClientType, Grpc::LegacyOrUnified>>(
std::get<1>(info.param), 0)));
});

// Test VHDS on-demand update with a request body
TEST_P(OnDemandVhdsWithBodyIntegrationTest, VhdsOnDemandUpdateWithBody) {
// TODO(wdauchy): Fix Unified mux to properly handle on-demand VHDS updates.
const bool is_unified_mux =
std::get<2>(std::get<1>(GetParam())) == Grpc::LegacyOrUnified::Unified;
if (is_unified_mux) {
GTEST_SKIP() << "Unified mux times out when processing on-demand VHDS updates";
}
initialize();
// Make a request with body to an unknown virtual host
codec_client_ = makeHttpConnection(lookupPort("http"));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/"},
{":scheme", "http"},
{":authority", "vhost.with.body"},
{"x-lyft-user-id", "123"}};
const std::string request_body = "test request body";
IntegrationStreamDecoderPtr response =
codec_client_->makeRequestWithBody(request_headers, request_body, true);

// Verify VHDS on-demand request is sent
EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TestTypeUrl::get().VirtualHost,
{vhdsRequestResourceName("vhost.with.body")}, {},
vhds_stream_.get()));

// Send VHDS response with the virtual host
sendDeltaDiscoveryResponse<envoy::config::route::v3::VirtualHost>(
Config::TestTypeUrl::get().VirtualHost,
{buildVirtualHost("my_route/vhost_with_body", "vhost.with.body")}, {}, "2",
vhds_stream_.get(), {"my_route/vhost.with.body"});

// Wait for VHDS ACK to ensure the response is processed
EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TestTypeUrl::get().VirtualHost, {}, {},
vhds_stream_.get()));

// Wait for upstream request (cluster_0 is the first cluster, so it maps to fake_upstreams_[0])
waitForNextUpstreamRequest(0);

// Verify the request body was received correctly
EXPECT_TRUE(upstream_request_->complete());
EXPECT_EQ(request_body, upstream_request_->body().toString());

// Send response
upstream_request_->encodeHeaders(default_response_headers_, true);

response->waitForHeaders();
EXPECT_EQ("200", response->headers().getStatusValue());

cleanupUpstreamAndDownstream();
}

} // namespace
} // namespace Envoy