Skip to content

Commit 394ac11

Browse files
committed
fix connection handling for non-TypeDB HTTP2 requests
1 parent aa591d3 commit 394ac11

3 files changed

Lines changed: 58 additions & 15 deletions

File tree

typedb/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ VENV_BIN = python3 -m venv
22
VENV_DIR ?= .venv
33
VENV_ACTIVATE = $(VENV_DIR)/bin/activate
44
VENV_RUN = . $(VENV_ACTIVATE)
5+
TEST_PATH ?= tests
56

67
usage: ## Shows usage for this Makefile
78
@cat Makefile | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}'
@@ -39,7 +40,7 @@ lint: ## Run ruff to lint the codebase
3940
$(VENV_RUN); python -m ruff check --output-format=full .
4041

4142
test: ## Run integration tests (requires LocalStack running with the Extension installed)
42-
$(VENV_RUN); pytest tests $(PYTEST_ARGS)
43+
$(VENV_RUN); pytest $(PYTEST_ARGS) $(TEST_PATH)
4344

4445
clean-dist: clean
4546
rm -rf dist/

typedb/localstack_typedb/utils/h2_proxy.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,18 @@ class ForwardingBuffer:
7070
to the backend, or leave it to the default handler.
7171
"""
7272

73+
backend: TcpForwarder
74+
buffer: list
75+
proxying: bool | None
76+
7377
def __init__(self, http_response_stream):
7478
self.http_response_stream = http_response_stream
7579
LOG.debug(
7680
f"Starting TCP forwarder to port {target_port} for new HTTP2 connection"
7781
)
7882
self.backend = TcpForwarder(target_port, host=target_host)
7983
self.buffer = []
80-
self.proxying = False
84+
self.proxying = None
8185
reactor.getThreadPool().callInThread(
8286
self.backend.receive_loop, self.received_from_backend
8387
)
@@ -86,23 +90,36 @@ def received_from_backend(self, data):
8690
LOG.debug(f"Received {len(data)} bytes from backend")
8791
self.http_response_stream.write(data)
8892

89-
def received_from_http2_client(self, data, default_handler):
93+
def received_from_http2_client(self, data, default_handler: Callable):
94+
if self.proxying is False:
95+
# Note: Return here only if `proxying` is `False` (a value of `None` indicates
96+
# that the headers have not fully been received yet)
97+
return default_handler(data)
98+
9099
if self.proxying:
91100
assert not self.buffer
92101
# Keep sending data to the backend for the lifetime of this connection
93102
self.backend.send(data)
94-
else:
95-
self.buffer.append(data)
96-
if headers := get_headers_from_data_stream(self.buffer):
97-
self.proxying = should_proxy_request(headers)
98-
# Now we know what to do with the buffer
99-
buffered_data = b"".join(self.buffer)
100-
self.buffer = []
101-
if self.proxying:
102-
LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend")
103-
self.backend.send(buffered_data)
104-
else:
105-
return default_handler(buffered_data)
103+
return
104+
105+
self.buffer.append(data)
106+
107+
if not (headers := get_headers_from_data_stream(self.buffer)):
108+
# If no headers received yet, then return (method will be called again for next chunk of data)
109+
return
110+
111+
self.proxying = should_proxy_request(headers)
112+
113+
buffered_data = b"".join(self.buffer)
114+
self.buffer = []
115+
116+
if not self.proxying:
117+
# if this is not a target request, then call the default handler
118+
default_handler(buffered_data)
119+
return
120+
121+
LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend")
122+
self.backend.send(buffered_data)
106123

107124
def close(self):
108125
self.backend.close()

typedb/tests/test_extension.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import requests
2+
import httpx
23
from localstack.utils.strings import short_uid
34
from localstack_typedb.utils.h2_proxy import (
45
get_frames_from_http2_stream,
@@ -75,6 +76,30 @@ def test_connect_to_db_via_grpc_endpoint():
7576
assert len(results) == 2
7677

7778

79+
def test_connect_to_h2_endpoint_non_typedb():
80+
url = "https://s3.localhost.localstack.cloud:4566/"
81+
82+
# make an HTTP/2 request to the LocalStack health endpoint
83+
with httpx.Client(http2=True, verify=False, trust_env=False) as client:
84+
health_url = f"{url}/_localstack/health"
85+
response = client.get(health_url)
86+
87+
assert response.status_code == 200
88+
assert response.http_version == "HTTP/2"
89+
assert '"services":' in response.text
90+
91+
# make an HTTP/2 request to a LocalStack endpoint outside the extension (S3 list buckets)
92+
headers = {
93+
"Authorization": "AWS4-HMAC-SHA256 Credential=000000000000/20250101/us-east-1/s3/aws4_request, ..."
94+
}
95+
with httpx.Client(http2=True, verify=False, trust_env=False) as client:
96+
response = client.get(url, headers=headers)
97+
98+
assert response.status_code == 200
99+
assert response.http_version == "HTTP/2"
100+
assert "<ListAllMyBucketsResult" in response.text
101+
102+
78103
def test_get_frames_from_http2_stream():
79104
# note: the data below is a dump taken from a browser request made against the emulator
80105
data = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n\x00\x00\x18\x04\x00\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00\x00\x00\x04\x00\x02\x00\x00\x00\x05\x00\x00@\x00\x00\x00\x04\x08\x00\x00\x00\x00\x00\x00\xbf\x00\x01"

0 commit comments

Comments
 (0)