diff --git a/changelogs/current.yaml b/changelogs/current.yaml index e3ce4f49b3626..c852f7a2cbf6c 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -218,6 +218,12 @@ new_features: change: | Enhanced dynamic module ABIs to support headers addition and body size retrieval. See the latest ABI header file for more details. +- area: dynamic modules + change: | + Added support for streamable HTTP callouts in dynamic modules. Modules can now create + streaming HTTP connections to upstream clusters using ``start_http_stream``, send request + data and trailers incrementally, and receive streaming response headers, data, and trailers + through dedicated callbacks. - area: udp_sink change: | Enhanced the UDP sink to support tapped messages larger than 64 KB. diff --git a/source/extensions/dynamic_modules/abi.h b/source/extensions/dynamic_modules/abi.h index 381224a068cea..1e91378110fe1 100644 --- a/source/extensions/dynamic_modules/abi.h +++ b/source/extensions/dynamic_modules/abi.h @@ -486,6 +486,31 @@ typedef enum envoy_dynamic_module_type_http_callout_result { envoy_dynamic_module_type_http_callout_result_ExceedResponseBufferLimit, } envoy_dynamic_module_type_http_callout_result; +/** + * envoy_dynamic_module_type_http_stream_envoy_ptr is a handle to an HTTP stream for streamable + * callouts. This represents an ongoing HTTP stream initiated via + * envoy_dynamic_module_callback_http_filter_start_http_stream. + * + * OWNERSHIP: Envoy owns the stream. The module must not store this pointer beyond the lifetime of + * the stream callbacks. + */ +typedef void* envoy_dynamic_module_type_http_stream_envoy_ptr; + +/** + * envoy_dynamic_module_type_http_stream_reset_reason represents the reason for a stream reset. + * This corresponds to `AsyncClient::StreamResetReason::*` in envoy/http/async_client.h. + */ +typedef enum envoy_dynamic_module_type_http_stream_reset_reason { + envoy_dynamic_module_type_http_stream_reset_reason_ConnectionFailure, + envoy_dynamic_module_type_http_stream_reset_reason_ConnectionTermination, + envoy_dynamic_module_type_http_stream_reset_reason_LocalReset, + envoy_dynamic_module_type_http_stream_reset_reason_LocalRefusedStreamReset, + envoy_dynamic_module_type_http_stream_reset_reason_Overflow, + envoy_dynamic_module_type_http_stream_reset_reason_RemoteReset, + envoy_dynamic_module_type_http_stream_reset_reason_RemoteRefusedStreamReset, + envoy_dynamic_module_type_http_stream_reset_reason_ProtocolError, +} envoy_dynamic_module_type_http_stream_reset_reason; + /** * envoy_dynamic_module_type_metrics_result represents the result of the metrics operation. * Success means the operation was successful. @@ -750,6 +775,108 @@ void envoy_dynamic_module_on_http_filter_http_callout_done( envoy_dynamic_module_type_envoy_http_header* headers, size_t headers_size, envoy_dynamic_module_type_envoy_buffer* body_chunks, size_t body_chunks_size); +/** + * envoy_dynamic_module_on_http_filter_http_stream_headers is called when response headers are + * received for a streamable HTTP callout stream. + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleHttpFilter object of the + * corresponding HTTP filter. + * @param filter_module_ptr is the pointer to the in-module HTTP filter created by + * envoy_dynamic_module_on_http_filter_new. + * @param stream_ptr is the handle to the HTTP stream. + * @param headers is the headers of the response. + * @param headers_size is the size of the headers. + * @param end_stream is true if this is the last data in the stream (no body or trailers will + * follow). + * + * headers are owned by Envoy and are guaranteed to be valid until the end of this event hook. + */ +void envoy_dynamic_module_on_http_filter_http_stream_headers( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + envoy_dynamic_module_type_envoy_http_header* headers, size_t headers_size, bool end_stream); + +/** + * envoy_dynamic_module_on_http_filter_http_stream_data is called when a chunk of response body is + * received for a streamable HTTP callout stream. This may be called multiple times for a single + * stream as body chunks arrive. + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleHttpFilter object of the + * corresponding HTTP filter. + * @param filter_module_ptr is the pointer to the in-module HTTP filter created by + * envoy_dynamic_module_on_http_filter_new. + * @param stream_ptr is the handle to the HTTP stream. + * @param data is the pointer to the array of buffers containing the body chunk. + * @param data_count is the number of buffers. + * @param end_stream is true if this is the last data in the stream (no trailers will follow). + * + * data is owned by Envoy and is guaranteed to be valid until the end of this event hook. + */ +void envoy_dynamic_module_on_http_filter_http_stream_data( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + const envoy_dynamic_module_type_envoy_buffer* data, size_t data_count, bool end_stream); + +/** + * envoy_dynamic_module_on_http_filter_http_stream_trailers is called when response trailers are + * received for a streamable HTTP callout stream. This is called after headers and any data chunks. + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleHttpFilter object of the + * corresponding HTTP filter. + * @param filter_module_ptr is the pointer to the in-module HTTP filter created by + * envoy_dynamic_module_on_http_filter_new. + * @param stream_ptr is the handle to the HTTP stream. + * @param trailers is the trailers of the response. + * @param trailers_size is the size of the trailers. + * + * trailers are owned by Envoy and are guaranteed to be valid until the end of this event hook. + */ +void envoy_dynamic_module_on_http_filter_http_stream_trailers( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + envoy_dynamic_module_type_envoy_http_header* trailers, size_t trailers_size); + +/** + * envoy_dynamic_module_on_http_filter_http_stream_complete is called when a streamable HTTP + * callout stream completes successfully. This is called after all headers, data, and trailers have + * been received. + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleHttpFilter object of the + * corresponding HTTP filter. + * @param filter_module_ptr is the pointer to the in-module HTTP filter created by + * envoy_dynamic_module_on_http_filter_new. + * @param stream_ptr is the handle to the HTTP stream. + * + * After this callback, the stream is automatically cleaned up and stream_ptr becomes invalid. + */ +void envoy_dynamic_module_on_http_filter_http_stream_complete( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr); + +/** + * envoy_dynamic_module_on_http_filter_http_stream_reset is called when a streamable HTTP callout + * stream is reset or fails. This may be called instead of the complete callback if the stream + * encounters an error. + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleHttpFilter object of the + * corresponding HTTP filter. + * @param filter_module_ptr is the pointer to the in-module HTTP filter created by + * envoy_dynamic_module_on_http_filter_new. + * @param stream_ptr is the handle to the HTTP stream. + * @param reason is the reason for the stream reset. + * + * After this callback, the stream is automatically cleaned up and stream_ptr becomes invalid. + */ +void envoy_dynamic_module_on_http_filter_http_stream_reset( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + envoy_dynamic_module_type_http_stream_reset_reason reason); + /** * envoy_dynamic_module_on_http_filter_scheduled is called when the HTTP filter is scheduled * to be executed on the worker thread where the HTTP filter is running with @@ -1982,6 +2109,107 @@ envoy_dynamic_module_callback_http_filter_http_callout( envoy_dynamic_module_type_buffer_module_ptr body, size_t body_size, uint64_t timeout_milliseconds); +/** + * envoy_dynamic_module_callback_http_filter_start_http_stream is called by the module to start + * a streamable HTTP callout to a specified cluster. Unlike the one-shot HTTP callout, this allows + * the module to receive response headers, body chunks, and trailers through separate event hooks, + * enabling true streaming behavior. + * + * The stream will trigger the following event hooks in order: + * 1. envoy_dynamic_module_on_http_filter_http_stream_headers - when response headers arrive + * 2. envoy_dynamic_module_on_http_filter_http_stream_data - for each body chunk (may be called + * multiple times or not at all) + * 3. envoy_dynamic_module_on_http_filter_http_stream_trailers - when trailers arrive (optional) + * 4. envoy_dynamic_module_on_http_filter_http_stream_complete - when stream completes successfully + * OR + * envoy_dynamic_module_on_http_filter_http_stream_reset - if stream fails + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleHttpFilter object of the + * corresponding HTTP filter. + * @param stream_ptr_out is a pointer to a variable where the stream handle will be stored. The + * module can use this handle to reset the stream via + * envoy_dynamic_module_callback_http_filter_reset_http_stream. + * @param cluster_name is the name of the cluster to which the stream is sent. + * @param cluster_name_length is the length of the cluster name. + * @param headers is the headers of the request. It must contain :method, :path and host headers. + * @param headers_size is the size of the headers. + * @param body is the pointer to the buffer of the body of the request. + * @param body_size is the length of the body. + * @param end_stream is true if the request stream should be ended after sending headers and body. + * If true and body_size > 0, the body will be sent with end_stream=true. + * If true and body_size is 0, headers will be sent with end_stream=true. + * If false, the module can send additional data or trailers using send_http_stream_data() or + * send_http_stream_trailers(). + * @param timeout_milliseconds is the timeout for the stream in milliseconds. If 0, no timeout is + * set. + * @return envoy_dynamic_module_type_http_callout_init_result is the result of the stream + * initialization. + */ +envoy_dynamic_module_type_http_callout_init_result +envoy_dynamic_module_callback_http_filter_start_http_stream( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr* stream_ptr_out, + envoy_dynamic_module_type_buffer_module_ptr cluster_name, size_t cluster_name_length, + envoy_dynamic_module_type_module_http_header* headers, size_t headers_size, + envoy_dynamic_module_type_buffer_module_ptr body, size_t body_size, bool end_stream, + uint64_t timeout_milliseconds); + +/** + * envoy_dynamic_module_callback_http_filter_reset_http_stream is called by the module to reset + * or cancel an ongoing streamable HTTP callout. This causes the stream to be terminated and the + * envoy_dynamic_module_on_http_filter_http_stream_reset event hook to be called. + * + * This can be called at any point after the stream is started and before it completes. After + * calling this function, the stream handle becomes invalid. + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleHttpFilter object of the + * corresponding HTTP filter. + * @param stream_ptr is the handle to the HTTP stream to reset. + */ +void envoy_dynamic_module_callback_http_filter_reset_http_stream( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr); + +/** + * envoy_dynamic_module_callback_http_stream_send_data is called by the module to send request + * body data on an active streamable HTTP callout. This can be called multiple times to stream + * the request body in chunks. + * + * This must be called after the stream is started and headers have been sent. It can be called + * multiple times until end_stream is set to true. + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleHttpFilter object of the + * corresponding HTTP filter. + * @param stream_ptr is the handle to the HTTP stream. + * @param data is the pointer to the buffer of the data to send. + * @param data_length is the length of the data. + * @param end_stream is true if this is the last data (no trailers will follow). + * @return true if the operation is successful, false otherwise. + */ +bool envoy_dynamic_module_callback_http_stream_send_data( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + envoy_dynamic_module_type_buffer_module_ptr data, size_t data_length, bool end_stream); + +/** + * envoy_dynamic_module_callback_http_stream_send_trailers is called by the module to send + * request trailers on an active streamable HTTP callout. This implicitly ends the stream. + * + * This must be called after the stream is started and all request data has been sent. + * After calling this, no more data can be sent on the stream. + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleHttpFilter object of the + * corresponding HTTP filter. + * @param stream_ptr is the handle to the HTTP stream. + * @param trailers is the trailers to send. + * @param trailers_size is the size of the trailers. + * @return true if the operation is successful, false otherwise. + */ +bool envoy_dynamic_module_callback_http_stream_send_trailers( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + envoy_dynamic_module_type_module_http_header* trailers, size_t trailers_size); + /** * envoy_dynamic_module_callback_http_filter_continue_decoding is called by the module to continue * decoding the HTTP request. diff --git a/source/extensions/dynamic_modules/abi_version.h b/source/extensions/dynamic_modules/abi_version.h index 36095f057511c..58f19d2c3ed15 100644 --- a/source/extensions/dynamic_modules/abi_version.h +++ b/source/extensions/dynamic_modules/abi_version.h @@ -6,7 +6,7 @@ namespace DynamicModules { #endif // This is the ABI version calculated as a sha256 hash of the ABI header files. When the ABI // changes, this value must change, and the correctness of this value is checked by the test. -const char* kAbiVersion = "e7c1b3b48b6ef759ad0766916b5124ea01ca7117db79bc5b13d8cdd294deb9fc"; +const char* kAbiVersion = "1ba1ea660354920fa486a9b6fa80917bd6df046ae05c7ebd7a26d4a7155ed7b7"; #ifdef __cplusplus } // namespace DynamicModules diff --git a/source/extensions/dynamic_modules/sdk/rust/src/lib.rs b/source/extensions/dynamic_modules/sdk/rust/src/lib.rs index 032326f712b53..6e4c4cf1124cd 100644 --- a/source/extensions/dynamic_modules/sdk/rust/src/lib.rs +++ b/source/extensions/dynamic_modules/sdk/rust/src/lib.rs @@ -356,6 +356,73 @@ pub trait HttpFilter { ) { } + /// This is called when response headers are received from an HTTP stream callout. + /// + /// * `envoy_filter` can be used to interact with the underlying Envoy filter object. + /// * `stream_handle` is the opaque handle to the HTTP stream. + /// * `response_headers` is a list of key-value pairs of the response headers. + /// * `end_stream` indicates whether this is the final frame of the response. + fn on_http_stream_headers( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + _response_headers: &[(EnvoyBuffer, EnvoyBuffer)], + _end_stream: bool, + ) { + } + + /// This is called when response data is received from an HTTP stream callout. + /// + /// * `envoy_filter` can be used to interact with the underlying Envoy filter object. + /// * `stream_handle` is the opaque handle to the HTTP stream. + /// * `response_data` is the response body data chunks. + /// * `end_stream` indicates whether this is the final frame of the response. + fn on_http_stream_data( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + _response_data: &[EnvoyBuffer], + _end_stream: bool, + ) { + } + + /// This is called when response trailers are received from an HTTP stream callout. + /// + /// * `envoy_filter` can be used to interact with the underlying Envoy filter object. + /// * `stream_handle` is the opaque handle to the HTTP stream. + /// * `response_trailers` is a list of key-value pairs of the response trailers. + fn on_http_stream_trailers( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + _response_trailers: &[(EnvoyBuffer, EnvoyBuffer)], + ) { + } + + /// This is called when an HTTP stream callout completes successfully. + /// + /// * `envoy_filter` can be used to interact with the underlying Envoy filter object. + /// * `stream_handle` is the opaque handle to the HTTP stream (no longer valid after this call). + fn on_http_stream_complete( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + ) { + } + + /// This is called when an HTTP stream callout is reset (failed or cancelled). + /// + /// * `envoy_filter` can be used to interact with the underlying Envoy filter object. + /// * `stream_handle` is the opaque handle to the HTTP stream (no longer valid after this call). + /// * `reset_reason` indicates why the stream was reset. + fn on_http_stream_reset( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + _reset_reason: abi::envoy_dynamic_module_type_http_stream_reset_reason, + ) { + } + /// This is called when the new event is scheduled via the [`EnvoyHttpFilterScheduler::commit`] /// for this [`HttpFilter`]. /// @@ -1066,6 +1133,76 @@ pub trait EnvoyHttpFilter { _timeout_milliseconds: u64, ) -> abi::envoy_dynamic_module_type_http_callout_init_result; + /// Start a streamable HTTP callout to the given cluster with the given headers and optional + /// body. Multiple concurrent streams can be created from the same filter. + /// + /// Headers must contain the `:method`, `:path`, and `host` headers. + /// + /// This returns a tuple of (status, stream_handle): + /// * Success + valid stream_handle: The stream was started successfully. + /// * MissingRequiredHeaders + null: One of the required headers is missing. + /// * ClusterNotFound + null: The cluster with the given name was not found. + /// * CannotCreateRequest + null: The stream could not be created (e.g., no healthy upstream). + /// + /// After starting the stream, use the returned stream_handle to send data/trailers or reset. + /// Stream events will be delivered to the [`HttpFilter::on_http_stream_*`] methods. + /// + /// When the HTTP stream ends (either successfully or via reset), no further callbacks will + /// be invoked for that stream. + fn start_http_stream<'a>( + &mut self, + _cluster_name: &'a str, + _headers: Vec<(&'a str, &'a [u8])>, + _body: Option<&'a [u8]>, + _end_stream: bool, + _timeout_milliseconds: u64, + ) -> ( + abi::envoy_dynamic_module_type_http_callout_init_result, + abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + ); + + /// Send data on an active HTTP stream. + /// + /// # Safety + /// + /// * `stream_handle` must be a valid handle returned from [`EnvoyHttpFilter::start_http_stream`]. + /// * `data` is the data to send. + /// * `end_stream` indicates whether this is the final frame of the request. + /// + /// Returns true if the data was sent successfully, false otherwise. + unsafe fn send_http_stream_data( + &mut self, + _stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + _data: &[u8], + _end_stream: bool, + ) -> bool; + + /// Send trailers on an active HTTP stream (implicitly ends the stream). + /// + /// # Safety + /// + /// * `stream_handle` must be a valid handle returned from [`EnvoyHttpFilter::start_http_stream`]. + /// * `trailers` is a list of key-value pairs for the trailers. + /// + /// Returns true if the trailers were sent successfully, false otherwise. + unsafe fn send_http_stream_trailers<'a>( + &mut self, + _stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + _trailers: Vec<(&'a str, &'a [u8])>, + ) -> bool; + + /// Reset (cancel) an active HTTP stream. + /// + /// # Safety + /// + /// * `stream_handle` must be a valid handle returned from [`EnvoyHttpFilter::start_http_stream`]. + /// + /// This will trigger the [`HttpFilter::on_http_stream_reset`] callback. + unsafe fn reset_http_stream( + &mut self, + _stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + ); + /// Get the most specific route configuration for the current route. /// /// Returns None if no per-route configuration is present on this route. Otherwise, @@ -2018,6 +2155,83 @@ impl EnvoyHttpFilter for EnvoyHttpFilterImpl { } } + fn start_http_stream<'a>( + &mut self, + cluster_name: &'a str, + headers: Vec<(&'a str, &'a [u8])>, + body: Option<&'a [u8]>, + end_stream: bool, + timeout_milliseconds: u64, + ) -> ( + abi::envoy_dynamic_module_type_http_callout_init_result, + abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + ) { + let body_ptr = body.map(|s| s.as_ptr()).unwrap_or(std::ptr::null()); + let body_length = body.map(|s| s.len()).unwrap_or(0); + let headers_ptr = headers.as_ptr() as *const abi::envoy_dynamic_module_type_module_http_header; + let mut stream_ptr: abi::envoy_dynamic_module_type_http_stream_envoy_ptr = std::ptr::null_mut(); + + let result = unsafe { + abi::envoy_dynamic_module_callback_http_filter_start_http_stream( + self.raw_ptr, + &mut stream_ptr, + cluster_name.as_ptr() as *const _ as *mut _, + cluster_name.len(), + headers_ptr as *const _ as *mut _, + headers.len(), + body_ptr as *const _ as *mut _, + body_length, + end_stream, + timeout_milliseconds, + ) + }; + + (result, stream_ptr) + } + + unsafe fn send_http_stream_data( + &mut self, + stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + data: &[u8], + end_stream: bool, + ) -> bool { + unsafe { + abi::envoy_dynamic_module_callback_http_stream_send_data( + self.raw_ptr, + stream_handle, + data.as_ptr() as *const _ as *mut _, + data.len(), + end_stream, + ) + } + } + + unsafe fn send_http_stream_trailers<'a>( + &mut self, + stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + trailers: Vec<(&'a str, &'a [u8])>, + ) -> bool { + let trailers_ptr = + trailers.as_ptr() as *const abi::envoy_dynamic_module_type_module_http_header; + unsafe { + abi::envoy_dynamic_module_callback_http_stream_send_trailers( + self.raw_ptr, + stream_handle, + trailers_ptr as *const _ as *mut _, + trailers.len(), + ) + } + } + + unsafe fn reset_http_stream( + &mut self, + stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + ) { + unsafe { + abi::envoy_dynamic_module_callback_http_filter_reset_http_stream(self.raw_ptr, stream_handle); + } + } + fn get_most_specific_route_config(&self) -> Option> { unsafe { let filter_config_ptr = @@ -2746,6 +2960,107 @@ unsafe extern "C" fn envoy_dynamic_module_on_http_filter_downstream_below_write_ filter.on_downstream_below_write_buffer_low_watermark(&mut EnvoyHttpFilterImpl::new(envoy_ptr)); } +#[no_mangle] +unsafe extern "C" fn envoy_dynamic_module_on_http_filter_http_stream_headers( + envoy_ptr: abi::envoy_dynamic_module_type_http_filter_envoy_ptr, + filter_ptr: abi::envoy_dynamic_module_type_http_filter_module_ptr, + stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + headers: *const abi::envoy_dynamic_module_type_envoy_http_header, + headers_size: usize, + end_stream: bool, +) { + let filter = filter_ptr as *mut *mut dyn HttpFilter; + let filter = &mut **filter; + let headers = if headers_size > 0 { + unsafe { + std::slice::from_raw_parts(headers as *const (EnvoyBuffer, EnvoyBuffer), headers_size) + } + } else { + &[] + }; + filter.on_http_stream_headers( + &mut EnvoyHttpFilterImpl::new(envoy_ptr), + stream_handle, + headers, + end_stream, + ); +} + +#[no_mangle] +unsafe extern "C" fn envoy_dynamic_module_on_http_filter_http_stream_data( + envoy_ptr: abi::envoy_dynamic_module_type_http_filter_envoy_ptr, + filter_ptr: abi::envoy_dynamic_module_type_http_filter_module_ptr, + stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + data: *const abi::envoy_dynamic_module_type_envoy_buffer, + data_count: usize, + end_stream: bool, +) { + let filter = filter_ptr as *mut *mut dyn HttpFilter; + let filter = &mut **filter; + let data = if data_count > 0 { + unsafe { std::slice::from_raw_parts(data as *const EnvoyBuffer, data_count) } + } else { + &[] + }; + filter.on_http_stream_data( + &mut EnvoyHttpFilterImpl::new(envoy_ptr), + stream_handle, + data, + end_stream, + ); +} + +#[no_mangle] +unsafe extern "C" fn envoy_dynamic_module_on_http_filter_http_stream_trailers( + envoy_ptr: abi::envoy_dynamic_module_type_http_filter_envoy_ptr, + filter_ptr: abi::envoy_dynamic_module_type_http_filter_module_ptr, + stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + trailers: *const abi::envoy_dynamic_module_type_envoy_http_header, + trailers_size: usize, +) { + let filter = filter_ptr as *mut *mut dyn HttpFilter; + let filter = &mut **filter; + let trailers = if trailers_size > 0 { + unsafe { + std::slice::from_raw_parts(trailers as *const (EnvoyBuffer, EnvoyBuffer), trailers_size) + } + } else { + &[] + }; + filter.on_http_stream_trailers( + &mut EnvoyHttpFilterImpl::new(envoy_ptr), + stream_handle, + trailers, + ); +} + +#[no_mangle] +unsafe extern "C" fn envoy_dynamic_module_on_http_filter_http_stream_complete( + envoy_ptr: abi::envoy_dynamic_module_type_http_filter_envoy_ptr, + filter_ptr: abi::envoy_dynamic_module_type_http_filter_module_ptr, + stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, +) { + let filter = filter_ptr as *mut *mut dyn HttpFilter; + let filter = &mut **filter; + filter.on_http_stream_complete(&mut EnvoyHttpFilterImpl::new(envoy_ptr), stream_handle); +} + +#[no_mangle] +unsafe extern "C" fn envoy_dynamic_module_on_http_filter_http_stream_reset( + envoy_ptr: abi::envoy_dynamic_module_type_http_filter_envoy_ptr, + filter_ptr: abi::envoy_dynamic_module_type_http_filter_module_ptr, + stream_handle: abi::envoy_dynamic_module_type_http_stream_envoy_ptr, + reset_reason: abi::envoy_dynamic_module_type_http_stream_reset_reason, +) { + let filter = filter_ptr as *mut *mut dyn HttpFilter; + let filter = &mut **filter; + filter.on_http_stream_reset( + &mut EnvoyHttpFilterImpl::new(envoy_ptr), + stream_handle, + reset_reason, + ); +} + impl From for Result<(), envoy_dynamic_module_type_metrics_result> { diff --git a/source/extensions/filters/http/dynamic_modules/abi_impl.cc b/source/extensions/filters/http/dynamic_modules/abi_impl.cc index d7774ea352024..b466fcb915f2c 100644 --- a/source/extensions/filters/http/dynamic_modules/abi_impl.cc +++ b/source/extensions/filters/http/dynamic_modules/abi_impl.cc @@ -618,6 +618,9 @@ void envoy_dynamic_module_callback_http_send_response( envoy_dynamic_module_type_buffer_module_ptr body_ptr, size_t body_length, envoy_dynamic_module_type_buffer_module_ptr details, size_t details_length) { DynamicModuleHttpFilter* filter = static_cast(filter_envoy_ptr); + if (filter->isDestroyed()) { + return; + } std::function modify_headers = nullptr; if (headers_vector != nullptr && headers_vector_size != 0) { @@ -650,6 +653,9 @@ void envoy_dynamic_module_callback_http_send_response_headers( envoy_dynamic_module_type_module_http_header* headers_vector, size_t headers_vector_size, bool end_stream) { DynamicModuleHttpFilter* filter = static_cast(filter_envoy_ptr); + if (filter->isDestroyed()) { + return; + } std::unique_ptr headers = ResponseHeaderMapImpl::create(); for (size_t i = 0; i < headers_vector_size; i++) { @@ -667,6 +673,9 @@ void envoy_dynamic_module_callback_http_send_response_data( envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, envoy_dynamic_module_type_buffer_module_ptr data, size_t length, bool end_stream) { DynamicModuleHttpFilter* filter = static_cast(filter_envoy_ptr); + if (filter->isDestroyed()) { + return; + } Buffer::OwnedImpl buffer(static_cast(data), length); filter->decoder_callbacks_->encodeData(buffer, end_stream); @@ -676,6 +685,9 @@ void envoy_dynamic_module_callback_http_send_response_trailers( envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, envoy_dynamic_module_type_module_http_header* trailers_vector, size_t trailers_vector_size) { DynamicModuleHttpFilter* filter = static_cast(filter_envoy_ptr); + if (filter->isDestroyed()) { + return; + } std::unique_ptr trailers = ResponseTrailerMapImpl::create(); for (size_t i = 0; i < trailers_vector_size; i++) { @@ -1574,6 +1586,81 @@ envoy_dynamic_module_callback_http_filter_http_callout( timeout_milliseconds); } +envoy_dynamic_module_type_http_callout_init_result +envoy_dynamic_module_callback_http_filter_start_http_stream( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr* stream_ptr_out, + envoy_dynamic_module_type_buffer_module_ptr cluster_name, size_t cluster_name_length, + envoy_dynamic_module_type_module_http_header* headers, size_t headers_size, + envoy_dynamic_module_type_buffer_module_ptr body, size_t body_size, bool end_stream, + uint64_t timeout_milliseconds) { + auto filter = static_cast(filter_envoy_ptr); + + // Try to get the cluster from the cluster manager for the given cluster name. + absl::string_view cluster_name_view(cluster_name, cluster_name_length); + + // Construct the request message, starting with the headers, checking for required headers, and + // adding the body if present. + std::unique_ptr hdrs = Http::RequestHeaderMapImpl::create(); + for (size_t i = 0; i < headers_size; i++) { + const auto& header = &headers[i]; + const absl::string_view key(static_cast(header->key_ptr), header->key_length); + const absl::string_view value(static_cast(header->value_ptr), + header->value_length); + hdrs->addCopy(Http::LowerCaseString(key), value); + } + Http::RequestMessagePtr message(new Http::RequestMessageImpl(std::move(hdrs))); + if (message->headers().Path() == nullptr || message->headers().Method() == nullptr || + message->headers().Host() == nullptr) { + return envoy_dynamic_module_type_http_callout_init_result_MissingRequiredHeaders; + } + if (body_size > 0) { + message->body().add(absl::string_view(static_cast(body), body_size)); + } + return filter->startHttpStream(stream_ptr_out, cluster_name_view, std::move(message), end_stream, + timeout_milliseconds); +} + +void envoy_dynamic_module_callback_http_filter_reset_http_stream( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr) { + auto filter = static_cast(filter_envoy_ptr); + filter->resetHttpStream(stream_ptr); +} + +bool envoy_dynamic_module_callback_http_stream_send_data( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + envoy_dynamic_module_type_buffer_module_ptr data, size_t data_length, bool end_stream) { + auto filter = static_cast(filter_envoy_ptr); + + // Create a buffer and send the data. + Buffer::OwnedImpl buffer; + if (data_length > 0) { + buffer.add(absl::string_view(static_cast(data), data_length)); + } + return filter->sendStreamData(stream_ptr, buffer, end_stream); +} + +bool envoy_dynamic_module_callback_http_stream_send_trailers( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + envoy_dynamic_module_type_module_http_header* trailers, size_t trailers_size) { + auto filter = static_cast(filter_envoy_ptr); + + // Construct the trailers. + std::unique_ptr trailer_map = Http::RequestTrailerMapImpl::create(); + for (size_t i = 0; i < trailers_size; i++) { + const auto& trailer = &trailers[i]; + const absl::string_view key(static_cast(trailer->key_ptr), trailer->key_length); + const absl::string_view value(static_cast(trailer->value_ptr), + trailer->value_length); + trailer_map->addCopy(Http::LowerCaseString(key), value); + } + + return filter->sendStreamTrailers(stream_ptr, std::move(trailer_map)); +} + envoy_dynamic_module_type_http_filter_scheduler_module_ptr envoy_dynamic_module_callback_http_filter_scheduler_new( envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr) { diff --git a/source/extensions/filters/http/dynamic_modules/filter.cc b/source/extensions/filters/http/dynamic_modules/filter.cc index 200405a9408d8..6bcda7364491e 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.cc +++ b/source/extensions/filters/http/dynamic_modules/filter.cc @@ -19,22 +19,57 @@ void DynamicModuleHttpFilter::onStreamComplete() { config_->on_http_filter_stream_complete_(thisAsVoidPtr(), in_module_filter_); } -void DynamicModuleHttpFilter::onDestroy() { destroy(); }; +void DynamicModuleHttpFilter::onDestroy() { + destroyed_ = true; + destroy(); +}; void DynamicModuleHttpFilter::destroy() { if (in_module_filter_ == nullptr) { return; } + + Event::Dispatcher* dispatcher = nullptr; + if (!http_stream_callouts_.empty()) { + if (decoder_callbacks_ != nullptr) { + dispatcher = &decoder_callbacks_->dispatcher(); + } else if (encoder_callbacks_ != nullptr) { + dispatcher = &encoder_callbacks_->dispatcher(); + } + } + config_->on_http_filter_destroy_(in_module_filter_); in_module_filter_ = nullptr; - decoder_callbacks_ = nullptr; - encoder_callbacks_ = nullptr; - for (auto& callout : http_callouts_) { - if (callout.second->request_) { - callout.second->request_->cancel(); + + while (!http_callouts_.empty()) { + auto it = http_callouts_.begin(); + if (it->second->request_) { + it->second->request_->cancel(); + } + if (!http_callouts_.empty() && http_callouts_.begin() == it) { + http_callouts_.erase(it); + } + } + + while (!http_stream_callouts_.empty()) { + auto it = http_stream_callouts_.begin(); + if (it->second->stream_) { + it->second->stream_->reset(); + } + // Do not delete the callback inline because AsyncClient may invoke it synchronously from reset. + if (dispatcher != nullptr) { + std::unique_ptr deletable(it->second.release()); + dispatcher->deferredDelete(std::move(deletable)); + } else { + it->second.reset(); + } + if (!http_stream_callouts_.empty() && http_stream_callouts_.begin() == it) { + http_stream_callouts_.erase(it); } } - http_callouts_.clear(); + + decoder_callbacks_ = nullptr; + encoder_callbacks_ = nullptr; } FilterHeadersStatus DynamicModuleHttpFilter::decodeHeaders(RequestHeaderMap&, bool end_of_stream) { @@ -255,6 +290,253 @@ void DynamicModuleHttpFilter::onBelowWriteBufferLowWatermark() { in_module_filter_); } +envoy_dynamic_module_type_http_callout_init_result DynamicModuleHttpFilter::startHttpStream( + envoy_dynamic_module_type_http_stream_envoy_ptr* stream_ptr_out, absl::string_view cluster_name, + Http::RequestMessagePtr&& message, bool end_stream, uint64_t timeout_milliseconds) { + // Get the cluster. + Upstream::ThreadLocalCluster* cluster = + config_->cluster_manager_.getThreadLocalCluster(cluster_name); + if (cluster == nullptr) { + return envoy_dynamic_module_type_http_callout_init_result_ClusterNotFound; + } + // Check required headers are present. + if (!message->headers().Path() || !message->headers().Method() || !message->headers().Host()) { + return envoy_dynamic_module_type_http_callout_init_result_MissingRequiredHeaders; + } + // Create the callback. + auto callback = + std::make_unique(shared_from_this()); + DynamicModuleHttpFilter::HttpStreamCalloutCallback& callback_ref = *callback; + // Store the callback first so if start fails inline, we can clean it up properly. + http_stream_callouts_[callback->this_as_void_ptr_] = std::move(callback); + + Http::AsyncClient::StreamOptions options; + options.setTimeout(std::chrono::milliseconds(timeout_milliseconds)); + + Http::AsyncClient::Stream* async_stream = cluster->httpAsyncClient().start(callback_ref, options); + if (!async_stream) { + // Failed to create the stream, clean up. + http_stream_callouts_.erase(callback_ref.this_as_void_ptr_); + return envoy_dynamic_module_type_http_callout_init_result_CannotCreateRequest; + } + + callback_ref.stream_ = async_stream; + callback_ref.request_message_ = std::move(message); + *stream_ptr_out = callback_ref.this_as_void_ptr_; + + // Send headers. The end_stream flag controls whether headers alone end the stream. + // If body is provided, send it immediately. + bool has_initial_body = callback_ref.request_message_->body().length() > 0; + if (has_initial_body) { + // Send headers without end_stream, then send body with the end_stream flag. + callback_ref.stream_->sendHeaders(callback_ref.request_message_->headers(), + false /* end_stream */); + + // The stream might reset inline while sending headers. Bail out if that happened. + if (callback_ref.stream_ == nullptr) { + return envoy_dynamic_module_type_http_callout_init_result_Success; + } + + callback_ref.stream_->sendData(callback_ref.request_message_->body(), end_stream); + if (callback_ref.stream_ == nullptr) { + return envoy_dynamic_module_type_http_callout_init_result_Success; + } + } else { + // No body, so end_stream applies to headers. + callback_ref.stream_->sendHeaders(callback_ref.request_message_->headers(), end_stream); + if (callback_ref.stream_ == nullptr) { + return envoy_dynamic_module_type_http_callout_init_result_Success; + } + } + + return envoy_dynamic_module_type_http_callout_init_result_Success; +} + +void DynamicModuleHttpFilter::resetHttpStream( + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr) { + auto it = http_stream_callouts_.find(stream_ptr); + if (it != http_stream_callouts_.end() && it->second->stream_) { + it->second->stream_->reset(); + } +} + +bool DynamicModuleHttpFilter::sendStreamData( + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, Buffer::Instance& data, + bool end_stream) { + auto it = http_stream_callouts_.find(stream_ptr); + if (it == http_stream_callouts_.end() || !it->second->stream_) { + return false; + } + it->second->stream_->sendData(data, end_stream); + return true; +} + +bool DynamicModuleHttpFilter::sendStreamTrailers( + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + Http::RequestTrailerMapPtr trailers) { + auto it = http_stream_callouts_.find(stream_ptr); + if (it == http_stream_callouts_.end() || !it->second->stream_) { + return false; + } + + // Store the trailers in the callback to keep them alive, since AsyncStream stores a pointer. + it->second->request_trailers_ = std::move(trailers); + it->second->stream_->sendTrailers(*it->second->request_trailers_); + return true; +} + +void DynamicModuleHttpFilter::HttpStreamCalloutCallback::onHeaders(ResponseHeaderMapPtr&& headers, + bool end_stream) { + // Check if the filter is destroyed before the stream completes. + if (!filter_->in_module_filter_) { + return; + } + + absl::InlinedVector headers_vector; + headers_vector.reserve(headers->size()); + headers->iterate([&headers_vector](const Http::HeaderEntry& header) -> Http::HeaderMap::Iterate { + headers_vector.emplace_back(envoy_dynamic_module_type_envoy_http_header{ + const_cast(header.key().getStringView().data()), header.key().getStringView().size(), + const_cast(header.value().getStringView().data()), + header.value().getStringView().size()}); + return Http::HeaderMap::Iterate::Continue; + }); + + filter_->config_->on_http_filter_http_stream_headers_( + filter_->thisAsVoidPtr(), filter_->in_module_filter_, this_as_void_ptr_, + headers_vector.data(), headers_vector.size(), end_stream); +} + +void DynamicModuleHttpFilter::HttpStreamCalloutCallback::onData(Buffer::Instance& data, + bool end_stream) { + // Check if the filter is destroyed before the stream completes. + if (!filter_->in_module_filter_) { + return; + } + + const uint64_t length = data.length(); + if (length > 0 || end_stream) { + std::vector buffers; + const auto& slices = data.getRawSlices(); + buffers.reserve(slices.size()); + for (const auto& slice : slices) { + buffers.push_back({static_cast(slice.mem_), slice.len_}); + } + filter_->config_->on_http_filter_http_stream_data_( + filter_->thisAsVoidPtr(), filter_->in_module_filter_, this_as_void_ptr_, buffers.data(), + buffers.size(), end_stream); + } +} + +void DynamicModuleHttpFilter::HttpStreamCalloutCallback::onTrailers( + ResponseTrailerMapPtr&& trailers) { + // Check if the filter is destroyed before the stream completes. + if (!filter_->in_module_filter_) { + return; + } + + absl::InlinedVector trailers_vector; + trailers_vector.reserve(trailers->size()); + trailers->iterate([&trailers_vector]( + const Http::HeaderEntry& header) -> Http::HeaderMap::Iterate { + trailers_vector.emplace_back(envoy_dynamic_module_type_envoy_http_header{ + const_cast(header.key().getStringView().data()), header.key().getStringView().size(), + const_cast(header.value().getStringView().data()), + header.value().getStringView().size()}); + return Http::HeaderMap::Iterate::Continue; + }); + + filter_->config_->on_http_filter_http_stream_trailers_( + filter_->thisAsVoidPtr(), filter_->in_module_filter_, this_as_void_ptr_, + trailers_vector.data(), trailers_vector.size()); +} + +void DynamicModuleHttpFilter::HttpStreamCalloutCallback::onComplete() { + // Avoid double cleanup if this callback was already handled. + if (cleaned_up_) { + return; + } + cleaned_up_ = true; + + // Move the filter to the local scope since on_http_filter_http_stream_complete_ might + // result in a local reply which destroys the filter. That eventually ends up deallocating this + // callback itself. + DynamicModuleHttpFilterSharedPtr filter = std::move(filter_); + void* stream_ptr = this_as_void_ptr_; + + // Check if the filter is destroyed before we can invoke the callback. + if (!filter->in_module_filter_ || !filter->decoder_callbacks_) { + return; + } + + // Cache the dispatcher before we call the module callback, as the callback may destroy the filter + // which will clear decoder_callbacks_. + Event::Dispatcher& dispatcher = filter->decoder_callbacks_->dispatcher(); + + filter->config_->on_http_filter_http_stream_complete_(filter->thisAsVoidPtr(), + filter->in_module_filter_, stream_ptr); + + stream_ = nullptr; + request_message_.reset(); + request_trailers_.reset(); + + // Schedule deferred deletion of this callback to avoid deleting 'this' while we're still in it. + // The stream may call other callbacks like onReset() after onComplete(). + auto it = filter->http_stream_callouts_.find(stream_ptr); + if (it != filter->http_stream_callouts_.end()) { + // Cast unique_ptr to unique_ptr for deferred + // deletion. + std::unique_ptr deletable(it->second.release()); + dispatcher.deferredDelete(std::move(deletable)); + filter->http_stream_callouts_.erase(it); + } +} + +void DynamicModuleHttpFilter::HttpStreamCalloutCallback::onReset() { + // Avoid double cleanup if this callback was already handled. + if (cleaned_up_) { + return; + } + cleaned_up_ = true; + + // Move the filter to the local scope since on_http_filter_http_stream_reset_ might + // result in a local reply which destroys the filter. That eventually ends up deallocating this + // callback itself. + DynamicModuleHttpFilterSharedPtr filter = std::move(filter_); + void* stream_ptr = this_as_void_ptr_; + + // Check if the filter is destroyed before we can invoke the callback. + if (!filter->in_module_filter_ || !filter->decoder_callbacks_) { + return; + } + + // Cache the dispatcher before we call the module callback, as the callback may destroy the filter + // which will clear decoder_callbacks_. + Event::Dispatcher& dispatcher = filter->decoder_callbacks_->dispatcher(); + + // Only invoke the callback if the stream was actually started. + if (stream_) { + // Since we don't have detailed reset reason here, use a generic one. + filter->config_->on_http_filter_http_stream_reset_( + filter->thisAsVoidPtr(), filter->in_module_filter_, stream_ptr, + envoy_dynamic_module_type_http_stream_reset_reason_LocalReset); + } + + stream_ = nullptr; + request_message_.reset(); + request_trailers_.reset(); + + // Schedule deferred deletion of this callback to avoid deleting 'this' while we're still in it. + auto it = filter->http_stream_callouts_.find(stream_ptr); + if (it != filter->http_stream_callouts_.end()) { + // Cast unique_ptr to unique_ptr for deferred + // deletion. + std::unique_ptr deletable(it->second.release()); + dispatcher.deferredDelete(std::move(deletable)); + filter->http_stream_callouts_.erase(it); + } +} + } // namespace HttpFilters } // namespace DynamicModules } // namespace Extensions diff --git a/source/extensions/filters/http/dynamic_modules/filter.h b/source/extensions/filters/http/dynamic_modules/filter.h index 4e2cb533d6d0b..d1b340e5e3e48 100644 --- a/source/extensions/filters/http/dynamic_modules/filter.h +++ b/source/extensions/filters/http/dynamic_modules/filter.h @@ -59,6 +59,8 @@ class DynamicModuleHttpFilter : public Http::StreamFilter, } void encodeComplete() override; + bool isDestroyed() const { return destroyed_; } + // ---------- Http::DownstreamWatermarkCallbacks ---------- void onAboveWriteBufferHighWatermark() override; void onBelowWriteBufferLowWatermark() override; @@ -71,6 +73,7 @@ class DynamicModuleHttpFilter : public Http::StreamFilter, // The callbacks for the filter. They are only valid until onDestroy() is called. StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr; StreamEncoderFilterCallbacks* encoder_callbacks_ = nullptr; + bool destroyed_ = false; // These are used to hold the current chunk of the request/response body during the decodeData and // encodeData callbacks. It is only valid during the call and should not be used outside of the @@ -176,6 +179,32 @@ class DynamicModuleHttpFilter : public Http::StreamFilter, sendHttpCallout(uint32_t callout_id, absl::string_view cluster_name, Http::RequestMessagePtr&& message, uint64_t timeout_milliseconds); + /** + * Starts a streamable HTTP callout to the specified cluster with the given message. + * Returns a stream handle that can be used to reset the stream. + */ + envoy_dynamic_module_type_http_callout_init_result + startHttpStream(envoy_dynamic_module_type_http_stream_envoy_ptr* stream_ptr_out, + absl::string_view cluster_name, Http::RequestMessagePtr&& message, + bool end_stream, uint64_t timeout_milliseconds); + + /** + * Resets an ongoing streamable HTTP callout stream. + */ + void resetHttpStream(envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr); + + /** + * Sends data on an ongoing streamable HTTP callout stream. + */ + bool sendStreamData(envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + Buffer::Instance& data, bool end_stream); + + /** + * Sends trailers on an ongoing streamable HTTP callout stream. + */ + bool sendStreamTrailers(envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr, + Http::RequestTrailerMapPtr trailers); + const DynamicModuleHttpFilterConfig& getFilterConfig() const { return *config_; } Stats::StatNameDynamicPool& getStatNamePool() { return stat_name_pool_; } @@ -232,8 +261,52 @@ class DynamicModuleHttpFilter : public Http::StreamFilter, uint32_t callout_id_; }; + /** + * This implementation of the AsyncClient::StreamCallbacks is used to handle the streaming + * response from the HTTP streamable callout from the parent HTTP filter. + */ + class HttpStreamCalloutCallback : public Http::AsyncClient::StreamCallbacks, + public Event::DeferredDeletable { + public: + HttpStreamCalloutCallback(std::shared_ptr filter) + : this_as_void_ptr_(static_cast(this)), filter_(std::move(filter)) {} + ~HttpStreamCalloutCallback() override = default; + + // AsyncClient::StreamCallbacks + void onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) override; + void onData(Buffer::Instance& data, bool end_stream) override; + void onTrailers(ResponseTrailerMapPtr&& trailers) override; + void onComplete() override; + void onReset() override; + + // This is the stream object that is used to send the streaming HTTP callout. It is used to + // reset the callout if the filter is destroyed before the callout is completed or if the + // module requests it. + Http::AsyncClient::Stream* stream_ = nullptr; + + // Store the request message to keep headers alive, since AsyncStream stores a pointer to them. + Http::RequestMessagePtr request_message_ = nullptr; + + // Store the request trailers to keep them alive, since AsyncStream stores a pointer to them. + Http::RequestTrailerMapPtr request_trailers_ = nullptr; + + // Store this as void* so it can be passed directly to the module without casting. + void* this_as_void_ptr_; + + // Track if this callback has already been cleaned up to avoid double cleanup. + bool cleaned_up_ = false; + + private: + std::shared_ptr filter_; + }; + absl::flat_hash_map> http_callouts_; + + // Unlike http_callouts_, we don't use an id-based map because the stream pointer itself is the + // unique identifier. We store the callback objects here to manage their lifetime. + absl::flat_hash_map> + http_stream_callouts_; }; using DynamicModuleHttpFilterSharedPtr = std::shared_ptr; diff --git a/source/extensions/filters/http/dynamic_modules/filter_config.cc b/source/extensions/filters/http/dynamic_modules/filter_config.cc index 5df20e80390be..f7417e2c9e7e3 100644 --- a/source/extensions/filters/http/dynamic_modules/filter_config.cc +++ b/source/extensions/filters/http/dynamic_modules/filter_config.cc @@ -105,6 +105,29 @@ absl::StatusOr newDynamicModuleHttpFilte "envoy_dynamic_module_on_http_filter_http_callout_done"); RETURN_IF_NOT_OK_REF(on_http_callout_done.status()); + auto on_http_stream_headers = + dynamic_module->getFunctionPointer( + "envoy_dynamic_module_on_http_filter_http_stream_headers"); + RETURN_IF_NOT_OK_REF(on_http_stream_headers.status()); + + auto on_http_stream_data = dynamic_module->getFunctionPointer( + "envoy_dynamic_module_on_http_filter_http_stream_data"); + RETURN_IF_NOT_OK_REF(on_http_stream_data.status()); + + auto on_http_stream_trailers = + dynamic_module->getFunctionPointer( + "envoy_dynamic_module_on_http_filter_http_stream_trailers"); + RETURN_IF_NOT_OK_REF(on_http_stream_trailers.status()); + + auto on_http_stream_complete = + dynamic_module->getFunctionPointer( + "envoy_dynamic_module_on_http_filter_http_stream_complete"); + RETURN_IF_NOT_OK_REF(on_http_stream_complete.status()); + + auto on_http_stream_reset = dynamic_module->getFunctionPointer( + "envoy_dynamic_module_on_http_filter_http_stream_reset"); + RETURN_IF_NOT_OK_REF(on_http_stream_reset.status()); + auto on_scheduled = dynamic_module->getFunctionPointer( "envoy_dynamic_module_on_http_filter_scheduled"); RETURN_IF_NOT_OK_REF(on_scheduled.status()); @@ -144,6 +167,11 @@ absl::StatusOr newDynamicModuleHttpFilte config->on_http_filter_stream_complete_ = on_filter_stream_complete.value(); config->on_http_filter_destroy_ = on_filter_destroy.value(); config->on_http_filter_http_callout_done_ = on_http_callout_done.value(); + config->on_http_filter_http_stream_headers_ = on_http_stream_headers.value(); + config->on_http_filter_http_stream_data_ = on_http_stream_data.value(); + config->on_http_filter_http_stream_trailers_ = on_http_stream_trailers.value(); + config->on_http_filter_http_stream_complete_ = on_http_stream_complete.value(); + config->on_http_filter_http_stream_reset_ = on_http_stream_reset.value(); config->on_http_filter_scheduled_ = on_scheduled.value(); config->on_http_filter_downstream_above_write_buffer_high_watermark_ = on_downstream_above_write_buffer_high_watermark.value(); diff --git a/source/extensions/filters/http/dynamic_modules/filter_config.h b/source/extensions/filters/http/dynamic_modules/filter_config.h index 30b26789326ed..b6844b8e40f11 100644 --- a/source/extensions/filters/http/dynamic_modules/filter_config.h +++ b/source/extensions/filters/http/dynamic_modules/filter_config.h @@ -35,6 +35,16 @@ using OnHttpFilterStreamCompleteType = using OnHttpFilterDestroyType = decltype(&envoy_dynamic_module_on_http_filter_destroy); using OnHttpFilterHttpCalloutDoneType = decltype(&envoy_dynamic_module_on_http_filter_http_callout_done); +using OnHttpFilterHttpStreamHeadersType = + decltype(&envoy_dynamic_module_on_http_filter_http_stream_headers); +using OnHttpFilterHttpStreamDataType = + decltype(&envoy_dynamic_module_on_http_filter_http_stream_data); +using OnHttpFilterHttpStreamTrailersType = + decltype(&envoy_dynamic_module_on_http_filter_http_stream_trailers); +using OnHttpFilterHttpStreamCompleteType = + decltype(&envoy_dynamic_module_on_http_filter_http_stream_complete); +using OnHttpFilterHttpStreamResetType = + decltype(&envoy_dynamic_module_on_http_filter_http_stream_reset); using OnHttpFilterScheduled = decltype(&envoy_dynamic_module_on_http_filter_scheduled); using OnHttpFilterDownstreamAboveWriteBufferHighWatermark = decltype(&envoy_dynamic_module_on_http_filter_downstream_above_write_buffer_high_watermark); @@ -79,6 +89,11 @@ class DynamicModuleHttpFilterConfig { OnHttpFilterStreamCompleteType on_http_filter_stream_complete_ = nullptr; OnHttpFilterDestroyType on_http_filter_destroy_ = nullptr; OnHttpFilterHttpCalloutDoneType on_http_filter_http_callout_done_ = nullptr; + OnHttpFilterHttpStreamHeadersType on_http_filter_http_stream_headers_ = nullptr; + OnHttpFilterHttpStreamDataType on_http_filter_http_stream_data_ = nullptr; + OnHttpFilterHttpStreamTrailersType on_http_filter_http_stream_trailers_ = nullptr; + OnHttpFilterHttpStreamCompleteType on_http_filter_http_stream_complete_ = nullptr; + OnHttpFilterHttpStreamResetType on_http_filter_http_stream_reset_ = nullptr; OnHttpFilterScheduled on_http_filter_scheduled_ = nullptr; OnHttpFilterDownstreamAboveWriteBufferHighWatermark on_http_filter_downstream_above_write_buffer_high_watermark_ = nullptr; diff --git a/test/coverage.yaml b/test/coverage.yaml index 2513f308f364d..e6a9e39904cca 100644 --- a/test/coverage.yaml +++ b/test/coverage.yaml @@ -40,6 +40,7 @@ directories: source/extensions/filters/common/lua: 95.6 source/extensions/filters/http/cache: 95.9 source/extensions/filters/http/dynamic_forward_proxy: 94.8 + source/extensions/filters/http/dynamic_modules: 95.6 source/extensions/filters/http/decompressor: 95.9 source/extensions/filters/http/ext_proc: 96.4 # might be flaky: be careful adjusting source/extensions/filters/http/grpc_json_reverse_transcoder: 94.8 diff --git a/test/extensions/dynamic_modules/http/filter_test.cc b/test/extensions/dynamic_modules/http/filter_test.cc index cc81f020ff936..bd8b9ec35a94f 100644 --- a/test/extensions/dynamic_modules/http/filter_test.cc +++ b/test/extensions/dynamic_modules/http/filter_test.cc @@ -1,8 +1,12 @@ +#include + #include "source/common/http/message_impl.h" #include "source/common/router/string_accessor_impl.h" +#include "source/extensions/dynamic_modules/abi.h" #include "source/extensions/filters/http/dynamic_modules/filter.h" #include "test/extensions/dynamic_modules/util.h" +#include "test/mocks/event/mocks.h" #include "test/mocks/http/mocks.h" #include "test/mocks/server/server_factory_context.h" #include "test/mocks/stats/mocks.h" @@ -140,12 +144,12 @@ TEST(DynamicModulesTest, StatsCallbacks) { false), (std::vector{1})); - Http::MockStreamDecoderFilterCallbacks decoder_callbacks; + NiceMock decoder_callbacks; StreamInfo::MockStreamInfo stream_info; EXPECT_CALL(decoder_callbacks, streamInfo()).WillRepeatedly(testing::ReturnRef(stream_info)); Http::MockDownstreamStreamFilterCallbacks downstream_callbacks; filter->setDecoderFilterCallbacks(decoder_callbacks); - Http::MockStreamEncoderFilterCallbacks encoder_callbacks; + NiceMock encoder_callbacks; filter->setEncoderFilterCallbacks(encoder_callbacks); std::initializer_list> headers = {{"header", "header_value"}}; @@ -220,7 +224,7 @@ TEST(DynamicModulesTest, HeaderCallbacks) { stats_store.symbolTable()); filter->initializeInModuleFilter(); - Http::MockStreamDecoderFilterCallbacks decoder_callbacks; + NiceMock decoder_callbacks; StreamInfo::MockStreamInfo stream_info; EXPECT_CALL(decoder_callbacks, streamInfo()).WillRepeatedly(testing::ReturnRef(stream_info)); Http::MockDownstreamStreamFilterCallbacks downstream_callbacks; @@ -229,7 +233,7 @@ TEST(DynamicModulesTest, HeaderCallbacks) { .WillOnce(testing::Return(OptRef(downstream_callbacks))); filter->setDecoderFilterCallbacks(decoder_callbacks); - Http::MockStreamEncoderFilterCallbacks encoder_callbacks; + NiceMock encoder_callbacks; filter->setEncoderFilterCallbacks(encoder_callbacks); NiceMock info; @@ -284,7 +288,7 @@ TEST(DynamicModulesTest, DynamicMetadataCallbacks) { filter->initializeInModuleFilter(); auto route = std::make_shared>(); - Http::MockStreamDecoderFilterCallbacks callbacks; + NiceMock callbacks; StreamInfo::MockStreamInfo stream_info; EXPECT_CALL(callbacks, streamInfo()).WillRepeatedly(testing::ReturnRef(stream_info)); EXPECT_CALL(callbacks, streamInfo()).WillRepeatedly(testing::ReturnRef(stream_info)); @@ -367,7 +371,7 @@ TEST(DynamicModulesTest, FilterStateCallbacks) { stats_scope->symbolTable()); filter->initializeInModuleFilter(); - Http::MockStreamDecoderFilterCallbacks callbacks; + NiceMock callbacks; StreamInfo::MockStreamInfo stream_info; EXPECT_CALL(callbacks, streamInfo()).WillRepeatedly(testing::ReturnRef(stream_info)); EXPECT_CALL(stream_info, filterState()) @@ -445,8 +449,8 @@ TEST(DynamicModulesTest, BodyCallbacks) { stats_scope->symbolTable()); filter->initializeInModuleFilter(); - Http::MockStreamDecoderFilterCallbacks decoder_callbacks; - Http::MockStreamEncoderFilterCallbacks encoder_callbacks; + NiceMock decoder_callbacks; + NiceMock encoder_callbacks; filter->setDecoderFilterCallbacks(decoder_callbacks); filter->setEncoderFilterCallbacks(encoder_callbacks); Buffer::OwnedImpl request_body; @@ -519,7 +523,7 @@ TEST(DynamicModulesTest, HttpFilterHttpCallout_non_existing_cluster) { context); EXPECT_TRUE(filter_config_or_status.ok()); - Http::MockStreamDecoderFilterCallbacks callbacks; + NiceMock callbacks; auto filter = std::make_shared(filter_config_or_status.value(), stats_scope->symbolTable()); filter->initializeInModuleFilter(); @@ -573,7 +577,7 @@ TEST(DynamicModulesTest, HttpFilterHttpCallout_immediate_failing_cluster) { return nullptr; })); - Http::MockStreamDecoderFilterCallbacks callbacks; + NiceMock callbacks; auto filter = std::make_shared(filter_config_or_status.value(), stats_scope->symbolTable()); filter->initializeInModuleFilter(); @@ -631,7 +635,7 @@ TEST(DynamicModulesTest, HttpFilterHttpCallout_success) { return &request; })); - Http::MockStreamDecoderFilterCallbacks callbacks; + NiceMock callbacks; auto filter = std::make_shared(filter_config_or_status.value(), stats_scope->symbolTable()); filter->initializeInModuleFilter(); @@ -828,6 +832,331 @@ TEST(HttpFilter, HeaderMapGetter) { EXPECT_EQ(response_trailers, filter.responseTrailers().value()); } +// Test sendStreamData on invalid stream handle returns false. +TEST(HttpFilter, SendStreamDataOnInvalidStream) { + Stats::SymbolTableImpl symbol_table; + DynamicModuleHttpFilter filter(nullptr, symbol_table); + + NiceMock decoder_callbacks; + filter.setDecoderFilterCallbacks(decoder_callbacks); + + // Try to send data on a non-existent stream (invalid handle). + Buffer::OwnedImpl data("test"); + void* invalid_handle = reinterpret_cast(0x12345678); + EXPECT_FALSE(filter.sendStreamData(invalid_handle, data, false)); +} + +// Test resetHttpStream on invalid stream handle. +TEST(HttpFilter, ResetInvalidStream) { + Stats::SymbolTableImpl symbol_table; + DynamicModuleHttpFilter filter(nullptr, symbol_table); + + NiceMock decoder_callbacks; + filter.setDecoderFilterCallbacks(decoder_callbacks); + + // Try to reset a non-existent stream (invalid handle). + void* invalid_handle = reinterpret_cast(0x12345678); + filter.resetHttpStream(invalid_handle); + // Should not crash - just no-op for invalid handle. +} + +// Test sendStreamTrailers on invalid stream handle. +TEST(HttpFilter, SendStreamTrailersOnInvalidStream) { + Stats::SymbolTableImpl symbol_table; + DynamicModuleHttpFilter filter(nullptr, symbol_table); + + NiceMock decoder_callbacks; + filter.setDecoderFilterCallbacks(decoder_callbacks); + + // Try to send trailers on a non-existent stream (invalid handle). + auto trailers = Http::RequestTrailerMapImpl::create(); + void* invalid_handle = reinterpret_cast(0x12345678); + EXPECT_FALSE(filter.sendStreamTrailers(invalid_handle, std::move(trailers))); +} + +TEST(DynamicModulesTest, HttpFilterHttpStreamCalloutOnComplete) { + auto dynamic_module = newDynamicModule(testSharedObjectPath("no_op", "c"), false); + EXPECT_TRUE(dynamic_module.ok()); + NiceMock context; + Stats::IsolatedStoreImpl stats_store; + auto stats_scope = stats_store.createScope(""); + + Upstream::MockClusterManager cluster_manager; + auto cluster = std::make_shared>(); + + EXPECT_CALL(cluster_manager, getThreadLocalCluster(_)) + .WillRepeatedly(testing::Return(cluster.get())); + + EXPECT_CALL(context, clusterManager()).WillRepeatedly(testing::ReturnRef(cluster_manager)); + + auto filter_config_or_status = + Envoy::Extensions::DynamicModules::HttpFilters::newDynamicModuleHttpFilterConfig( + "filter", "", false, std::move(dynamic_module.value()), *stats_scope, context); + EXPECT_TRUE(filter_config_or_status.ok()); + + auto filter = std::make_shared(filter_config_or_status.value(), + stats_scope->symbolTable()); + filter->initializeInModuleFilter(); + NiceMock decoder_callbacks; + filter->setDecoderFilterCallbacks(decoder_callbacks); + + // Mock AsyncClient + NiceMock stream; + Http::AsyncClient::StreamCallbacks* captured_callbacks = nullptr; + EXPECT_CALL(cluster->async_client_, start(_, _)) + .WillOnce(Invoke([&](Http::AsyncClient::StreamCallbacks& callbacks, + const Http::AsyncClient::StreamOptions&) -> Http::AsyncClient::Stream* { + captured_callbacks = &callbacks; + return &stream; + })); + + // Start Stream + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr; + auto headers = std::make_unique( + std::initializer_list>{ + {":method", "GET"}, {":path", "/"}, {":authority", "host"}}); + auto message = std::make_unique(std::move(headers)); + auto result = filter->startHttpStream(&stream_ptr, "cluster", std::move(message), false, 1000); + EXPECT_EQ(result, envoy_dynamic_module_type_http_callout_init_result_Success); + EXPECT_NE(captured_callbacks, nullptr); + + // Invoke onComplete + if (captured_callbacks) { + captured_callbacks->onComplete(); + } +} + +TEST(DynamicModulesTest, StartHttpStreamDoesNotSetContentLength) { + auto dynamic_module = newDynamicModule(testSharedObjectPath("no_op", "c"), false); + EXPECT_TRUE(dynamic_module.ok()); + NiceMock context; + Stats::IsolatedStoreImpl stats_store; + auto stats_scope = stats_store.createScope(""); + + Upstream::MockClusterManager cluster_manager; + auto cluster = std::make_shared>(); + + EXPECT_CALL(cluster_manager, getThreadLocalCluster(_)) + .WillRepeatedly(testing::Return(cluster.get())); + + EXPECT_CALL(context, clusterManager()).WillRepeatedly(testing::ReturnRef(cluster_manager)); + + auto filter_config_or_status = + Envoy::Extensions::DynamicModules::HttpFilters::newDynamicModuleHttpFilterConfig( + "filter", "", false, std::move(dynamic_module.value()), *stats_scope, context); + EXPECT_TRUE(filter_config_or_status.ok()); + + auto filter = std::make_shared(filter_config_or_status.value(), + stats_scope->symbolTable()); + filter->initializeInModuleFilter(); + NiceMock decoder_callbacks; + filter->setDecoderFilterCallbacks(decoder_callbacks); + + // Mock AsyncClient + NiceMock stream; + Http::AsyncClient::StreamCallbacks* captured_callbacks = nullptr; + EXPECT_CALL(cluster->async_client_, start(_, _)) + .WillOnce(Invoke([&](Http::AsyncClient::StreamCallbacks& callbacks, + const Http::AsyncClient::StreamOptions&) -> Http::AsyncClient::Stream* { + captured_callbacks = &callbacks; + return &stream; + })); + + EXPECT_CALL(stream, sendHeaders(_, false)) + .WillOnce(Invoke([](Http::RequestHeaderMap& headers, bool) { + EXPECT_EQ(nullptr, headers.ContentLength()); + })); + EXPECT_CALL(stream, sendData(_, true)); + + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr = nullptr; + char cluster_name[] = "cluster"; + char method_key[] = ":method"; + char method_value[] = "POST"; + char path_key[] = ":path"; + char path_value[] = "/"; + char authority_key[] = ":authority"; + char authority_value[] = "host"; + envoy_dynamic_module_type_module_http_header headers[] = { + {method_key, strlen(method_key), method_value, strlen(method_value)}, + {path_key, strlen(path_key), path_value, strlen(path_value)}, + {authority_key, strlen(authority_key), authority_value, strlen(authority_value)}}; + char body[] = "hello"; + + auto result = envoy_dynamic_module_callback_http_filter_start_http_stream( + filter.get(), &stream_ptr, cluster_name, strlen(cluster_name), headers, 3, body, + sizeof(body) - 1, true, 1000); + EXPECT_EQ(result, envoy_dynamic_module_type_http_callout_init_result_Success); + EXPECT_NE(captured_callbacks, nullptr); + EXPECT_NE(stream_ptr, nullptr); + if (captured_callbacks != nullptr) { + captured_callbacks->onComplete(); + } +} + +TEST(DynamicModulesTest, StartHttpStreamHandlesInlineResetDuringHeaders) { + auto dynamic_module = newDynamicModule(testSharedObjectPath("no_op", "c"), false); + EXPECT_TRUE(dynamic_module.ok()); + NiceMock context; + Stats::IsolatedStoreImpl stats_store; + auto stats_scope = stats_store.createScope(""); + + Upstream::MockClusterManager cluster_manager; + auto cluster = std::make_shared>(); + + EXPECT_CALL(cluster_manager, getThreadLocalCluster(_)) + .WillRepeatedly(testing::Return(cluster.get())); + + EXPECT_CALL(context, clusterManager()).WillRepeatedly(testing::ReturnRef(cluster_manager)); + + auto filter_config_or_status = + Envoy::Extensions::DynamicModules::HttpFilters::newDynamicModuleHttpFilterConfig( + "filter", "", false, std::move(dynamic_module.value()), *stats_scope, context); + EXPECT_TRUE(filter_config_or_status.ok()); + + auto filter = std::make_shared(filter_config_or_status.value(), + stats_scope->symbolTable()); + filter->initializeInModuleFilter(); + NiceMock decoder_callbacks; + filter->setDecoderFilterCallbacks(decoder_callbacks); + + // Mock AsyncClient + NiceMock stream; + Http::AsyncClient::StreamCallbacks* captured_callbacks = nullptr; + EXPECT_CALL(cluster->async_client_, start(_, _)) + .WillOnce(Invoke([&](Http::AsyncClient::StreamCallbacks& callbacks, + const Http::AsyncClient::StreamOptions&) -> Http::AsyncClient::Stream* { + captured_callbacks = &callbacks; + return &stream; + })); + + EXPECT_CALL(stream, sendHeaders(_, false)).WillOnce(Invoke([&](Http::RequestHeaderMap&, bool) { + ASSERT_NE(captured_callbacks, nullptr); + captured_callbacks->onReset(); + })); + EXPECT_CALL(stream, sendData(_, _)).Times(0); + + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr; + auto headers = std::make_unique( + std::initializer_list>{ + {":method", "POST"}, {":path", "/"}, {":authority", "host"}}); + auto message = std::make_unique(std::move(headers)); + message->body().add(absl::string_view("payload")); + + auto result = filter->startHttpStream(&stream_ptr, "cluster", std::move(message), + true /* end_stream */, 1000); + EXPECT_EQ(result, envoy_dynamic_module_type_http_callout_init_result_Success); + EXPECT_NE(captured_callbacks, nullptr); +} + +TEST(DynamicModulesTest, HttpStreamCalloutDeferredDeleteOnDestroy) { + auto dynamic_module = newDynamicModule(testSharedObjectPath("no_op", "c"), false); + EXPECT_TRUE(dynamic_module.ok()); + NiceMock context; + Stats::IsolatedStoreImpl stats_store; + auto stats_scope = stats_store.createScope(""); + + Upstream::MockClusterManager cluster_manager; + auto cluster = std::make_shared>(); + + EXPECT_CALL(cluster_manager, getThreadLocalCluster(_)) + .WillRepeatedly(testing::Return(cluster.get())); + + EXPECT_CALL(context, clusterManager()).WillRepeatedly(testing::ReturnRef(cluster_manager)); + + auto filter_config_or_status = + Envoy::Extensions::DynamicModules::HttpFilters::newDynamicModuleHttpFilterConfig( + "filter", "", false, std::move(dynamic_module.value()), *stats_scope, context); + EXPECT_TRUE(filter_config_or_status.ok()); + + auto filter = std::make_shared(filter_config_or_status.value(), + stats_scope->symbolTable()); + filter->initializeInModuleFilter(); + + NiceMock dispatcher; + NiceMock decoder_callbacks; + EXPECT_CALL(decoder_callbacks, dispatcher()).WillRepeatedly(testing::ReturnRef(dispatcher)); + filter->setDecoderFilterCallbacks(decoder_callbacks); + + // Mock AsyncClient + NiceMock stream; + Http::AsyncClient::StreamCallbacks* captured_callbacks = nullptr; + EXPECT_CALL(cluster->async_client_, start(_, _)) + .WillOnce(Invoke([&](Http::AsyncClient::StreamCallbacks& callbacks, + const Http::AsyncClient::StreamOptions&) -> Http::AsyncClient::Stream* { + captured_callbacks = &callbacks; + return &stream; + })); + + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr; + auto headers = std::make_unique( + std::initializer_list>{ + {":method", "GET"}, {":path", "/"}, {":authority", "host"}}); + auto message = std::make_unique(std::move(headers)); + auto result = filter->startHttpStream(&stream_ptr, "cluster", std::move(message), false, 1000); + EXPECT_EQ(result, envoy_dynamic_module_type_http_callout_init_result_Success); + ASSERT_NE(captured_callbacks, nullptr); + + EXPECT_CALL(dispatcher, deferredDelete_(_)); + filter->onDestroy(); + + // Upstream callbacks may still run after destroy; ensure they safely return. + captured_callbacks->onComplete(); + + dispatcher.clearDeferredDeleteList(); +} + +TEST(DynamicModulesTest, HttpFilterHttpStreamCalloutOnReset) { + auto dynamic_module = newDynamicModule(testSharedObjectPath("no_op", "c"), false); + EXPECT_TRUE(dynamic_module.ok()); + NiceMock context; + Stats::IsolatedStoreImpl stats_store; + auto stats_scope = stats_store.createScope(""); + + Upstream::MockClusterManager cluster_manager; + auto cluster = std::make_shared>(); + + EXPECT_CALL(cluster_manager, getThreadLocalCluster(_)) + .WillRepeatedly(testing::Return(cluster.get())); + + EXPECT_CALL(context, clusterManager()).WillRepeatedly(testing::ReturnRef(cluster_manager)); + + auto filter_config_or_status = + Envoy::Extensions::DynamicModules::HttpFilters::newDynamicModuleHttpFilterConfig( + "filter", "", false, std::move(dynamic_module.value()), *stats_scope, context); + EXPECT_TRUE(filter_config_or_status.ok()); + + auto filter = std::make_shared(filter_config_or_status.value(), + stats_scope->symbolTable()); + filter->initializeInModuleFilter(); + NiceMock decoder_callbacks; + filter->setDecoderFilterCallbacks(decoder_callbacks); + + // Mock AsyncClient + NiceMock stream; + Http::AsyncClient::StreamCallbacks* captured_callbacks = nullptr; + EXPECT_CALL(cluster->async_client_, start(_, _)) + .WillOnce(Invoke([&](Http::AsyncClient::StreamCallbacks& callbacks, + const Http::AsyncClient::StreamOptions&) -> Http::AsyncClient::Stream* { + captured_callbacks = &callbacks; + return &stream; + })); + + // Start Stream + envoy_dynamic_module_type_http_stream_envoy_ptr stream_ptr; + auto headers = std::make_unique( + std::initializer_list>{ + {":method", "GET"}, {":path", "/"}, {":authority", "host"}}); + auto message = std::make_unique(std::move(headers)); + auto result = filter->startHttpStream(&stream_ptr, "cluster", std::move(message), false, 1000); + EXPECT_EQ(result, envoy_dynamic_module_type_http_callout_init_result_Success); + EXPECT_NE(captured_callbacks, nullptr); + + // Invoke onReset + if (captured_callbacks) { + captured_callbacks->onReset(); + } +} + } // namespace HttpFilters } // namespace DynamicModules } // namespace Extensions diff --git a/test/extensions/dynamic_modules/http/integration_test.cc b/test/extensions/dynamic_modules/http/integration_test.cc index 06385e5c9d51d..2d288cc4f9dea 100644 --- a/test/extensions/dynamic_modules/http/integration_test.cc +++ b/test/extensions/dynamic_modules/http/integration_test.cc @@ -8,7 +8,9 @@ namespace Envoy { class DynamicModulesIntegrationTest : public testing::TestWithParam, public HttpIntegrationTest { public: - DynamicModulesIntegrationTest() : HttpIntegrationTest(Http::CodecType::HTTP2, GetParam()) {}; + DynamicModulesIntegrationTest() : HttpIntegrationTest(Http::CodecType::HTTP2, GetParam()) { + setUpstreamProtocol(Http::CodecType::HTTP2); + }; void initializeFilter(const std::string& filter_name, const std::string& config = "", @@ -711,4 +713,96 @@ TEST_P(DynamicModulesTerminalIntegrationTest, StreamingTerminalFilter) { EXPECT_EQ(below_watermark_count, 8); } +// Test basic HTTP stream callout. A GET request with streaming response. +TEST_P(DynamicModulesIntegrationTest, HttpStreamBasic) { + initializeFilter("http_stream_basic", "cluster_0"); + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + auto response = std::move(encoder_decoder.second); + waitForNextUpstreamRequest(); + + // Send response headers. + Http::TestRequestHeaderMapImpl response_headers{{":status", "200"}}; + upstream_request_->encodeHeaders(response_headers, false); + + // Send response body. + upstream_request_->encodeData("response_from_upstream", true); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + EXPECT_EQ("stream_callout_success", response->body()); + EXPECT_EQ( + "basic", + response->headers().get(Http::LowerCaseString("x-stream-test"))[0]->value().getStringView()); +} + +// Test bidirectional HTTP stream callout. A POST request with streaming request and response. +TEST_P(DynamicModulesIntegrationTest, HttpStreamBidirectional) { + initializeFilter("http_stream_bidirectional", "cluster_0"); + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + auto response = std::move(encoder_decoder.second); + waitForNextUpstreamRequest(); + + // Verify the filter sent request data in chunks. + EXPECT_TRUE(upstream_request_->complete()); + std::string received_body = upstream_request_->body().toString(); + EXPECT_EQ("chunk1chunk2", received_body); + + // Verify trailers were sent. + EXPECT_TRUE(upstream_request_->trailers().get() != nullptr); + + // Send response with headers, data, and trailers. + Http::TestRequestHeaderMapImpl response_headers{{":status", "200"}}; + upstream_request_->encodeHeaders(response_headers, false); + upstream_request_->encodeData("chunk_a", false); + upstream_request_->encodeData("chunk_b", false); + Http::TestResponseTrailerMapImpl response_trailers{{"x-response-trailer", "value"}}; + upstream_request_->encodeTrailers(response_trailers); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + EXPECT_EQ("bidirectional_success", response->body()); + EXPECT_EQ( + "bidirectional", + response->headers().get(Http::LowerCaseString("x-stream-test"))[0]->value().getStringView()); + EXPECT_EQ( + "2", + response->headers().get(Http::LowerCaseString("x-chunks-sent"))[0]->value().getStringView()); + // Should have received at least 1 data chunk. Due to buffering, the two chunks sent by the + // upstream may be coalesced into a single chunk by the time they reach the dynamic module. + EXPECT_GE(std::stoi(std::string(response->headers() + .get(Http::LowerCaseString("x-chunks-received"))[0] + ->value() + .getStringView())), + 1); +} + +// Test upstream reset logic. +TEST_P(DynamicModulesIntegrationTest, HttpStreamUpstreamReset) { + initializeFilter("upstream_reset", "cluster_0"); + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + auto response = std::move(encoder_decoder.second); + waitForNextUpstreamRequest(); + + // Send partial response and then reset from upstream to simulate mid-stream failure. + Http::TestResponseHeaderMapImpl response_headers{{":status", "200"}}; + upstream_request_->encodeHeaders(response_headers, false); + upstream_request_->encodeData("partial", false); + upstream_request_->encodeResetStream(); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + EXPECT_EQ("upstream_reset", response->body()); + EXPECT_EQ("true", + response->headers().get(Http::LowerCaseString("x-reset"))[0]->value().getStringView()); +} + } // namespace Envoy diff --git a/test/extensions/dynamic_modules/test_data/c/no_op.c b/test/extensions/dynamic_modules/test_data/c/no_op.c index 7fb38d30d006c..863062f131fa1 100644 --- a/test/extensions/dynamic_modules/test_data/c/no_op.c +++ b/test/extensions/dynamic_modules/test_data/c/no_op.c @@ -103,3 +103,33 @@ void envoy_dynamic_module_on_http_filter_downstream_above_write_buffer_high_wate void envoy_dynamic_module_on_http_filter_downstream_below_write_buffer_low_watermark( envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr) {} + +void envoy_dynamic_module_on_http_filter_http_stream_headers( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_handle, + envoy_dynamic_module_type_envoy_http_header* headers, size_t headers_size, + bool end_stream) {} + +void envoy_dynamic_module_on_http_filter_http_stream_data( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_handle, + const envoy_dynamic_module_type_envoy_buffer* data, size_t data_count, bool end_stream) {} + +void envoy_dynamic_module_on_http_filter_http_stream_trailers( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_handle, + envoy_dynamic_module_type_envoy_http_header* trailers, size_t trailers_size) {} + +void envoy_dynamic_module_on_http_filter_http_stream_complete( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_handle) {} + +void envoy_dynamic_module_on_http_filter_http_stream_reset( + envoy_dynamic_module_type_http_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_http_filter_module_ptr filter_module_ptr, + envoy_dynamic_module_type_http_stream_envoy_ptr stream_handle, + envoy_dynamic_module_type_http_stream_reset_reason reset_reason) {} diff --git a/test/extensions/dynamic_modules/test_data/rust/BUILD b/test/extensions/dynamic_modules/test_data/rust/BUILD index 79e9d82a9d678..d23b393db2e4d 100644 --- a/test/extensions/dynamic_modules/test_data/rust/BUILD +++ b/test/extensions/dynamic_modules/test_data/rust/BUILD @@ -18,3 +18,5 @@ test_program(name = "abi_version_mismatch") test_program(name = "http") test_program(name = "http_integration_test") + +test_program(name = "http_stream_callouts_test") diff --git a/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs index 4aabe4cdcebaf..c76a3122eee8c 100644 --- a/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs +++ b/test/extensions/dynamic_modules/test_data/rust/http_integration_test.rs @@ -68,6 +68,15 @@ fn new_http_filter_config_fn( })) }, "streaming_terminal_filter" => Some(Box::new(StreamingTerminalFilterConfig {})), + "http_stream_basic" => Some(Box::new(HttpStreamBasicConfig { + cluster_name: String::from_utf8(config.to_owned()).unwrap(), + })), + "http_stream_bidirectional" => Some(Box::new(HttpStreamBidirectionalConfig { + cluster_name: String::from_utf8(config.to_owned()).unwrap(), + })), + "upstream_reset" => Some(Box::new(UpstreamResetConfig { + cluster_name: String::from_utf8(config.to_owned()).unwrap(), + })), _ => panic!("Unknown filter name: {}", name), } } @@ -1191,3 +1200,366 @@ impl StreamingTerminalHttpFilter { self.large_response_bytes_sent += chunk_size; } } + +// ============================================================================= +// HTTP Stream Callouts Tests +// ============================================================================= + +// Basic HTTP Stream Test. A simple GET request with streaming response. +struct HttpStreamBasicConfig { + cluster_name: String, +} + +impl HttpFilterConfig for HttpStreamBasicConfig { + fn new_http_filter(&mut self, _envoy: &mut EHF) -> Box> { + Box::new(HttpStreamBasicFilter { + cluster_name: self.cluster_name.clone(), + stream_handle: std::ptr::null_mut(), + received_headers: false, + received_data: false, + stream_completed: false, + }) + } +} + +struct HttpStreamBasicFilter { + cluster_name: String, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + received_headers: bool, + received_data: bool, + stream_completed: bool, +} + +impl HttpFilter for HttpStreamBasicFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { + let (result, handle) = envoy_filter.start_http_stream( + &self.cluster_name, + vec![ + (":path", b"/"), + (":method", b"GET"), + ("host", b"example.com"), + ], + None, + true, // end_stream = true. + 5000, + ); + + if result != envoy_dynamic_module_type_http_callout_init_result::Success { + envoy_filter.send_response(500, vec![("x-error", b"stream_init_failed")], None, None); + return envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration; + } + + self.stream_handle = handle; + + // For a GET request with no body, we need to end the request stream by sending empty data with + // end_stream = true. + let success = unsafe { envoy_filter.send_http_stream_data(handle, b"", true) }; + assert!(success); + + envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration + } + + fn on_http_stream_headers( + &mut self, + _envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + response_headers: &[(EnvoyBuffer, EnvoyBuffer)], + _end_stream: bool, + ) { + assert_eq!(stream_handle, self.stream_handle); + self.received_headers = true; + + let mut found_status = false; + for (name, value) in response_headers { + if name.as_slice() == b":status" { + assert_eq!(value.as_slice(), b"200"); + found_status = true; + break; + } + } + assert!(found_status); + } + + fn on_http_stream_data( + &mut self, + _envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _response_data: &[EnvoyBuffer], + _end_stream: bool, + ) { + assert_eq!(stream_handle, self.stream_handle); + self.received_data = true; + } + + fn on_http_stream_complete( + &mut self, + envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + ) { + assert_eq!(stream_handle, self.stream_handle); + self.stream_completed = true; + + envoy_filter.send_response( + 200, + vec![("x-stream-test", b"basic")], + Some(b"stream_callout_success"), + None, + ); + } +} + +impl Drop for HttpStreamBasicFilter { + fn drop(&mut self) { + assert!(self.received_headers); + assert!(self.received_data); + assert!(self.stream_completed); + } +} + +// Bidirectional HTTP Stream Test. A POST request with streaming request and response. +struct HttpStreamBidirectionalConfig { + cluster_name: String, +} + +impl HttpFilterConfig for HttpStreamBidirectionalConfig { + fn new_http_filter(&mut self, _envoy: &mut EHF) -> Box> { + Box::new(HttpStreamBidirectionalFilter { + cluster_name: self.cluster_name.clone(), + stream_handle: std::ptr::null_mut(), + data_chunks_sent: 0, + trailers_sent: false, + received_headers: false, + data_chunks_received: 0, + received_trailers: false, + stream_completed: false, + }) + } +} + +struct HttpStreamBidirectionalFilter { + cluster_name: String, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + data_chunks_sent: usize, + trailers_sent: bool, + received_headers: bool, + data_chunks_received: usize, + received_trailers: bool, + stream_completed: bool, +} + +impl HttpFilter for HttpStreamBidirectionalFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { + let (result, handle) = envoy_filter.start_http_stream( + &self.cluster_name, + vec![ + (":path", b"/stream"), + (":method", b"POST"), + ("host", b"example.com"), + ], + None, + false, // end_stream = false. + 10000, + ); + + if result != envoy_dynamic_module_type_http_callout_init_result::Success { + envoy_filter.send_response(500, vec![("x-error", b"stream_init_failed")], None, None); + return envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration; + } + + self.stream_handle = handle; + + // Send data chunks. + let success = unsafe { envoy_filter.send_http_stream_data(handle, b"chunk1", false) }; + assert!(success); + self.data_chunks_sent += 1; + + let success = unsafe { envoy_filter.send_http_stream_data(handle, b"chunk2", false) }; + assert!(success); + self.data_chunks_sent += 1; + + // Send trailers. + let success = unsafe { + envoy_filter.send_http_stream_trailers(handle, vec![("x-request-trailer", b"value")]) + }; + assert!(success); + self.trailers_sent = true; + + envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration + } + + fn on_http_stream_headers( + &mut self, + _envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _response_headers: &[(EnvoyBuffer, EnvoyBuffer)], + _end_stream: bool, + ) { + assert_eq!(stream_handle, self.stream_handle); + self.received_headers = true; + } + + fn on_http_stream_data( + &mut self, + _envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _response_data: &[EnvoyBuffer], + _end_stream: bool, + ) { + assert_eq!(stream_handle, self.stream_handle); + self.data_chunks_received += 1; + } + + fn on_http_stream_trailers( + &mut self, + _envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _response_trailers: &[(EnvoyBuffer, EnvoyBuffer)], + ) { + assert_eq!(stream_handle, self.stream_handle); + self.received_trailers = true; + } + + fn on_http_stream_complete( + &mut self, + envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + ) { + assert_eq!(stream_handle, self.stream_handle); + self.stream_completed = true; + + envoy_filter.send_response( + 200, + vec![ + ("x-stream-test", b"bidirectional"), + ( + "x-chunks-sent", + self.data_chunks_sent.to_string().as_bytes(), + ), + ( + "x-chunks-received", + self.data_chunks_received.to_string().as_bytes(), + ), + ], + Some(b"bidirectional_success"), + None, + ); + } +} + +impl Drop for HttpStreamBidirectionalFilter { + fn drop(&mut self) { + assert_eq!(self.data_chunks_sent, 2); + assert!(self.trailers_sent); + assert!(self.received_headers); + assert!(self.data_chunks_received > 0); + assert!(self.received_trailers); + assert!(self.stream_completed); + } +} + +struct UpstreamResetConfig { + cluster_name: String, +} + +impl HttpFilterConfig for UpstreamResetConfig { + fn new_http_filter(&mut self, _envoy_filter_config: &mut EHF) -> Box> { + Box::new(UpstreamResetFilter { + cluster_name: self.cluster_name.clone(), + stream_handle: std::ptr::null_mut(), + }) + } +} + +struct UpstreamResetFilter { + cluster_name: String, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, +} + +impl HttpFilter for UpstreamResetFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { + // Start a stream that we expect to be reset by the upstream. + let (result, handle) = envoy_filter.start_http_stream( + &self.cluster_name, + vec![ + (":path", b"/reset"), + (":method", b"GET"), + ("host", b"example.com"), + ], + None, + true, + 5000, + ); + + if result != envoy_dynamic_module_type_http_callout_init_result::Success { + envoy_filter.send_response(500, vec![("x-error", b"stream_init_failed")], None, None); + return envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration; + } + + self.stream_handle = handle; + envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration + } + + fn on_http_stream_headers( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _headers: &[(EnvoyBuffer, EnvoyBuffer)], + _end_stream: bool, + ) { + // Not expected in this test. + } + + fn on_http_stream_data( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _data: &[EnvoyBuffer], + _end_stream: bool, + ) { + // Not expected in this test. + } + + fn on_http_stream_trailers( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _trailers: &[(EnvoyBuffer, EnvoyBuffer)], + ) { + // Not expected in this test. + } + + fn on_http_stream_complete( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + ) { + // Not expected in this test (should get reset instead). + } + + fn on_http_stream_reset( + &mut self, + envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _reason: envoy_dynamic_module_type_http_stream_reset_reason, + ) { + assert_eq!(stream_handle, self.stream_handle); + envoy_filter.send_response( + 200, + vec![("x-reset", b"true")], + Some(b"upstream_reset"), + None, + ); + } +} diff --git a/test/extensions/dynamic_modules/test_data/rust/http_stream_callouts_test.rs b/test/extensions/dynamic_modules/test_data/rust/http_stream_callouts_test.rs new file mode 100644 index 0000000000000..5e089bc4d8c4e --- /dev/null +++ b/test/extensions/dynamic_modules/test_data/rust/http_stream_callouts_test.rs @@ -0,0 +1,475 @@ +use abi::*; +use envoy_proxy_dynamic_modules_rust_sdk::*; + +declare_init_functions!( + init, + new_http_filter_config_fn, + new_http_filter_per_route_config_fn +); + +fn init() -> bool { + true +} + +fn new_http_filter_config_fn( + _envoy_filter_config: &mut EC, + name: &str, + config: &[u8], +) -> Option>> { + match name { + "basic_stream_lifecycle" => Some(Box::new(BasicStreamLifecycleConfig { + cluster_name: String::from_utf8(config.to_owned()).unwrap(), + })), + "bidirectional_streaming" => Some(Box::new(BidirectionalStreamingConfig { + cluster_name: String::from_utf8(config.to_owned()).unwrap(), + })), + "multiple_streams" => Some(Box::new(MultipleStreamsConfig { + cluster_name: String::from_utf8(config.to_owned()).unwrap(), + })), + "stream_reset" => Some(Box::new(StreamResetConfig { + cluster_name: String::from_utf8(config.to_owned()).unwrap(), + })), + "upstream_reset" => Some(Box::new(UpstreamResetConfig { + cluster_name: String::from_utf8(config.to_owned()).unwrap(), + })), + _ => panic!("Unknown filter name: {}", name), + } +} + +fn new_http_filter_per_route_config_fn( + _name: &str, + _config: &[u8], +) -> Option> { + None +} + +// ============================================================================= +// Test 1: Basic Stream Lifecycle +// Tests: Start stream, receive headers/data, stream completes successfully. +// ============================================================================= + +struct BasicStreamLifecycleConfig { + cluster_name: String, +} + +impl HttpFilterConfig for BasicStreamLifecycleConfig { + fn new_http_filter(&mut self, _envoy_filter_config: &mut EHF) -> Box> { + Box::new(BasicStreamLifecycleFilter { + cluster_name: self.cluster_name.clone(), + received_response: false, + }) + } +} + +struct BasicStreamLifecycleFilter { + cluster_name: String, + received_response: bool, +} + +impl HttpFilter for BasicStreamLifecycleFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { + // Start an HTTP stream. + let (result, handle) = envoy_filter.start_http_stream( + &self.cluster_name, + vec![ + (":path", b"/test"), + (":method", b"GET"), + ("host", b"example.com"), + ], + None, // No body. + true, // end_stream = true. + 5000, // 5 second timeout. + ); + + if result != envoy_dynamic_module_type_http_callout_init_result::Success { + envoy_filter.send_response(500, vec![("x-error", b"stream_init_failed")], None, None); + return envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration; + } + + // Store handle if needed (not storing in this simple test). + let _ = handle; + + envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration + } + + fn on_http_stream_headers( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _headers: &[(EnvoyBuffer, EnvoyBuffer)], + _end_stream: bool, + ) { + // Process headers. + } + + fn on_http_stream_data( + &mut self, + _envoy_filter: &mut EHF, + _stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _data: &[EnvoyBuffer], + _end_stream: bool, + ) { + // Process data. + } + + fn on_http_stream_complete( + &mut self, + envoy_filter: &mut EHF, + _stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + ) { + self.received_response = true; + envoy_filter.send_response( + 200, + vec![("x-stream", b"success")], + Some(b"stream_callout_success"), + None, + ); + } +} + +impl Drop for BasicStreamLifecycleFilter { + fn drop(&mut self) { + // Ensure we received the response. + assert!( + self.received_response, + "Stream did not complete successfully" + ); + } +} + +// ============================================================================= +// Test 2: Bidirectional Streaming +// Tests: Start stream, send data chunks, receive data chunks, send trailers. +// ============================================================================= + +struct BidirectionalStreamingConfig { + cluster_name: String, +} + +impl HttpFilterConfig for BidirectionalStreamingConfig { + fn new_http_filter(&mut self, _envoy_filter_config: &mut EHF) -> Box> { + Box::new(BidirectionalStreamingFilter { + cluster_name: self.cluster_name.clone(), + stream_handle: std::ptr::null_mut(), + chunks_received: 0, + }) + } +} + +struct BidirectionalStreamingFilter { + cluster_name: String, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + chunks_received: usize, +} + +impl HttpFilter for BidirectionalStreamingFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { + // Start an HTTP stream with POST method. + let (result, handle) = envoy_filter.start_http_stream( + &self.cluster_name, + vec![ + (":path", b"/stream"), + (":method", b"POST"), + ("host", b"example.com"), + ("content-type", b"application/octet-stream"), + ], + None, // No initial body - we'll stream it. + false, // end_stream = false. + 10000, + ); + + if result != envoy_dynamic_module_type_http_callout_init_result::Success { + envoy_filter.send_response(500, vec![("x-error", b"stream_init_failed")], None, None); + return envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration; + } + + self.stream_handle = handle; + + // Send chunk 1. + let chunk1 = b"chunk1"; + let success = unsafe { envoy_filter.send_http_stream_data(handle, chunk1, false) }; + assert!(success); + + // Send chunk 2. + let chunk2 = b"chunk2"; + let success = unsafe { envoy_filter.send_http_stream_data(handle, chunk2, false) }; + assert!(success); + + // Send trailers to end the stream. + let trailers = vec![("x-trailer", b"value" as &[u8])]; + let success = unsafe { envoy_filter.send_http_stream_trailers(handle, trailers) }; + assert!(success); + + envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration + } + + fn on_http_stream_data( + &mut self, + _envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _data: &[EnvoyBuffer], + _end_stream: bool, + ) { + assert_eq!(stream_handle, self.stream_handle); + self.chunks_received += 1; + } + + fn on_http_stream_complete( + &mut self, + envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + ) { + assert_eq!(stream_handle, self.stream_handle); + let chunks_str = self.chunks_received.to_string(); + envoy_filter.send_response( + 200, + vec![("x-chunks-received", chunks_str.as_bytes())], + Some(b"bidirectional_success"), + None, + ); + } +} + +// ============================================================================= +// Test 3: Multiple Streams +// Tests: Start multiple streams concurrently. +// ============================================================================= + +struct MultipleStreamsConfig { + cluster_name: String, +} + +impl HttpFilterConfig for MultipleStreamsConfig { + fn new_http_filter(&mut self, _envoy_filter_config: &mut EHF) -> Box> { + Box::new(MultipleStreamsFilter { + cluster_name: self.cluster_name.clone(), + stream_handles: Vec::new(), + completed_streams: 0, + }) + } +} + +struct MultipleStreamsFilter { + cluster_name: String, + stream_handles: Vec, + completed_streams: usize, +} + +impl HttpFilter for MultipleStreamsFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { + // Create 3 concurrent streams. + for i in 1 ..= 3 { + let path = format!("/stream{}", i); + let (result, handle) = envoy_filter.start_http_stream( + &self.cluster_name, + vec![ + (":path", path.as_bytes()), + (":method", b"GET"), + ("host", b"example.com"), + ], + None, + true, // end_stream = true. + 5000, + ); + + if result == envoy_dynamic_module_type_http_callout_init_result::Success { + self.stream_handles.push(handle); + } + } + + if self.stream_handles.len() != 3 { + envoy_filter.send_response(500, vec![("x-error", b"stream_init_failed")], None, None); + } + + envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration + } + + fn on_http_stream_complete( + &mut self, + envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + ) { + assert!(self.stream_handles.contains(&stream_handle)); + self.completed_streams += 1; + + if self.completed_streams == 3 { + envoy_filter.send_response(200, vec![("x-stream", b"all_success")], None, None); + } + } +} + +// ============================================================================= +// Test 4: Stream Reset +// Tests: Reset an ongoing stream explicitly. +// ============================================================================= + +struct StreamResetConfig { + cluster_name: String, +} + +impl HttpFilterConfig for StreamResetConfig { + fn new_http_filter(&mut self, _envoy_filter_config: &mut EHF) -> Box> { + Box::new(StreamResetFilter { + cluster_name: self.cluster_name.clone(), + stream_handle: std::ptr::null_mut(), + received_headers: false, + reset_called: false, + }) + } +} + +struct StreamResetFilter { + cluster_name: String, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + received_headers: bool, + reset_called: bool, +} + +impl HttpFilter for StreamResetFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { + // Start a stream to a cluster that will be reset. + let (result, handle) = envoy_filter.start_http_stream( + &self.cluster_name, + vec![ + (":path", b"/slow"), + (":method", b"GET"), + ("host", b"example.com"), + ], + None, + true, // end_stream = true. + 5000, + ); + + if result != envoy_dynamic_module_type_http_callout_init_result::Success { + envoy_filter.send_response(500, vec![("x-error", b"stream_init_failed")], None, None); + return envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration; + } + + self.stream_handle = handle; + envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration + } + + fn on_http_stream_headers( + &mut self, + envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _headers: &[(EnvoyBuffer, EnvoyBuffer)], + _end_stream: bool, + ) { + assert_eq!(stream_handle, self.stream_handle); + self.received_headers = true; + + // Immediately reset the stream after receiving headers. + unsafe { + envoy_filter.reset_http_stream(stream_handle); + } + } + + fn on_http_stream_reset( + &mut self, + envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _reset_reason: envoy_dynamic_module_type_http_stream_reset_reason, + ) { + assert_eq!(stream_handle, self.stream_handle); + self.reset_called = true; + + // Send response indicating reset occurred. + envoy_filter.send_response( + 200, + vec![("x-stream", b"reset_ok")], + Some(b"stream_was_reset"), + None, + ); + } +} + +impl Drop for StreamResetFilter { + fn drop(&mut self) { + assert!(self.received_headers, "Never received headers before reset"); + assert!(self.reset_called, "Reset callback was not called"); + } +} + +// ============================================================================= +// Test 5: Upstream Reset +// Tests: Start stream, upstream resets connection, receive reset callback. +// ============================================================================= + +struct UpstreamResetConfig { + cluster_name: String, +} + +impl HttpFilterConfig for UpstreamResetConfig { + fn new_http_filter(&mut self, _envoy_filter_config: &mut EHF) -> Box> { + Box::new(UpstreamResetFilter { + cluster_name: self.cluster_name.clone(), + stream_handle: std::ptr::null_mut(), + }) + } +} + +struct UpstreamResetFilter { + cluster_name: String, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, +} + +impl HttpFilter for UpstreamResetFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status { + // Start a stream that we expect to be reset by the upstream. + let (result, handle) = envoy_filter.start_http_stream( + &self.cluster_name, + vec![ + (":path", b"/reset"), + (":method", b"GET"), + ("host", b"example.com"), + ], + None, + true, + 5000, + ); + + if result != envoy_dynamic_module_type_http_callout_init_result::Success { + envoy_filter.send_response(500, vec![("x-error", b"stream_init_failed")], None, None); + return envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration; + } + + self.stream_handle = handle; + envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration + } + + fn on_http_stream_reset( + &mut self, + envoy_filter: &mut EHF, + stream_handle: envoy_dynamic_module_type_http_stream_envoy_ptr, + _reason: envoy_dynamic_module_type_http_stream_reset_reason, + ) { + assert_eq!(stream_handle, self.stream_handle); + envoy_filter.send_response( + 200, + vec![("x-reset", b"true")], + Some(b"upstream_reset"), + None, + ); + } +} diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 83412905d97ba..8497708345983 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -1596,6 +1596,9 @@ vhosts infos ElastiCache pinterest +callouts +streamable +extern NSS SSLKEYLOGFILE DLB