Skip to content

Commit 4677caa

Browse files
committed
Merge remote-tracking branch 'fs/main' into streaming-storage
# Conflicts: # Makefile # src/runtime/streaming/operators/grouping/incremental_aggregate.rs
2 parents de248a7 + 8c338d8 commit 4677caa

25 files changed

Lines changed: 2148 additions & 7 deletions
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
name: Integration Tests
14+
15+
on:
16+
push:
17+
branches: ["main", "master"]
18+
pull_request:
19+
branches: ["main", "master"]
20+
workflow_dispatch:
21+
22+
env:
23+
CARGO_TERM_COLOR: always
24+
25+
jobs:
26+
integration-test:
27+
name: Integration Tests
28+
runs-on: ubuntu-latest
29+
timeout-minutes: 180
30+
31+
steps:
32+
- name: Checkout Source
33+
uses: actions/checkout@v4
34+
35+
- name: Verify Docker
36+
run: docker version
37+
38+
- name: Setup Python
39+
uses: actions/setup-python@v5
40+
with:
41+
python-version: "3.11"
42+
cache: "pip"
43+
44+
- name: Setup Rust
45+
uses: dtolnay/rust-toolchain@stable
46+
47+
- name: Install System Dependencies
48+
run: |
49+
sudo apt-get update
50+
sudo apt-get install -y --no-install-recommends \
51+
cmake \
52+
libssl-dev \
53+
libcurl4-openssl-dev \
54+
pkg-config \
55+
libsasl2-dev \
56+
protobuf-compiler
57+
58+
- name: Cache Cargo
59+
uses: Swatinem/rust-cache@v2
60+
61+
- name: Build Release Binary
62+
run: make env && make dist
63+
64+
- name: Pre-pull Kafka Image
65+
run: docker pull apache/kafka:3.7.0
66+
67+
- name: Run Integration Tests
68+
run: make integration-test
69+
70+
- name: Upload Test Logs
71+
if: failure()
72+
uses: actions/upload-artifact@v4
73+
with:
74+
name: integration-test-logs
75+
path: tests/integration/target/**/logs/
76+
retention-days: 30

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,8 @@ python/**/target/
3333
**/**.egg-info
3434
.cache/
3535
/.cursor/worktrees.json
36+
37+
# Integration test output
38+
tests/integration/target/
39+
tests/integration/.venv/
40+
tests/integration/install

Makefile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ C_0 := \033[0m
7979
log = @printf "$(C_B)[-]$(C_0) %-15s %s\n" "$(1)" "$(2)"
8080
success = @printf "$(C_G)[✔]$(C_0) %s\n" "$(1)"
8181

82-
.PHONY: all help build build-lite dist dist-lite clean test env env-clean go-sdk-env go-sdk-build go-sdk-clean docker docker-run docker-push .check-env .ensure-target .build-wasm
82+
.PHONY: all help build build-lite dist dist-lite clean test env env-clean go-sdk-env go-sdk-build go-sdk-clean docker docker-run docker-push .check-env .build-wasm integration-test
8383

8484
all: build
8585

@@ -100,6 +100,8 @@ help:
100100
@echo " docker-run Run container (port 8080, mount logs)"
101101
@echo " docker-push Push image to registry"
102102
@echo ""
103+
@echo " integration-test Run integration tests (delegates to tests/integration)"
104+
@echo ""
103105
@echo " Version: $(VERSION) | Arch: $(ARCH) | OS: $(OS)"
104106

105107
# 4. Auto-install missing Rust target toolchain
@@ -206,6 +208,7 @@ clean:
206208
@cargo clean
207209
@rm -rf $(DIST_ROOT) data logs
208210
@./scripts/clean.sh 2>/dev/null || true
211+
@$(MAKE) -C tests/integration clean 2>/dev/null || true
209212
$(call success,Done)
210213

211214
.check-env:
@@ -228,3 +231,6 @@ docker-push:
228231
$(call log,DOCKER,Pushing $(IMAGE_NAME))
229232
@docker push $(IMAGE_NAME)
230233
$(call success,Push Complete)
234+
235+
integration-test:
236+
@$(MAKE) -C tests/integration test PYTEST_ARGS="$(PYTEST_ARGS)"

src/runtime/wasm/processor/wasm/wasm_task.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,10 +392,27 @@ impl WasmTask {
392392
) -> ControlAction {
393393
match signal {
394394
TaskControlSignal::Start { completion_flag } => {
395-
for input in inputs.iter_mut() {
396-
let _ = input.start();
395+
for (idx, input) in inputs.iter_mut().enumerate() {
396+
if let Err(e) = input.start() {
397+
let msg = format!("Failed to start input {}: {}", idx, e);
398+
log::error!("{}", msg);
399+
*failure_cause.lock().unwrap() = Some(msg.clone());
400+
*shared_state.lock().unwrap() =
401+
ComponentState::Error { error: msg.clone() };
402+
*execution_state.lock().unwrap() = ExecutionState::Failed;
403+
completion_flag.mark_error(msg);
404+
return ControlAction::Pause;
405+
}
406+
}
407+
if let Err(e) = processor.start_outputs() {
408+
let msg = format!("Failed to start outputs: {}", e);
409+
log::error!("{}", msg);
410+
*failure_cause.lock().unwrap() = Some(msg.clone());
411+
*shared_state.lock().unwrap() = ComponentState::Error { error: msg.clone() };
412+
*execution_state.lock().unwrap() = ExecutionState::Failed;
413+
completion_flag.mark_error(msg);
414+
return ControlAction::Pause;
397415
}
398-
let _ = processor.start_outputs();
399416
*state = TaskState::Running;
400417
*shared_state.lock().unwrap() = ComponentState::Running;
401418
completion_flag.mark_completed();

src/server/handler.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,25 @@ impl FunctionStreamServiceImpl {
9191
}
9292
}
9393

94+
fn classify_error(message: &str) -> StatusCode {
95+
let lower = message.to_lowercase();
96+
if lower.contains("not found") || lower.contains("not exist") {
97+
StatusCode::NotFound
98+
} else if lower.contains("uniqueness violation")
99+
|| lower.contains("already exists")
100+
|| lower.contains("duplicate")
101+
{
102+
StatusCode::Conflict
103+
} else if lower.contains("invalid")
104+
|| lower.contains("unsupported")
105+
|| lower.contains("missing")
106+
{
107+
StatusCode::BadRequest
108+
} else {
109+
StatusCode::InternalServerError
110+
}
111+
}
112+
94113
async fn execute_statement(
95114
&self,
96115
stmt: &dyn Statement,
@@ -101,7 +120,8 @@ impl FunctionStreamServiceImpl {
101120
if result.success {
102121
Self::build_success_response(success_status, result.message, result.data)
103122
} else {
104-
Self::build_error_response(StatusCode::InternalServerError, result.message)
123+
let status = Self::classify_error(&result.message);
124+
Self::build_error_response(status, result.message)
105125
}
106126
}
107127
}
@@ -139,8 +159,9 @@ impl FunctionStreamService for FunctionStreamServiceImpl {
139159

140160
if !result.success {
141161
error!("SQL execution aborted: {}", result.message);
162+
let status = Self::classify_error(&result.message);
142163
return Ok(TonicResponse::new(Self::build_error_response(
143-
StatusCode::InternalServerError,
164+
status,
144165
result.message,
145166
)));
146167
}
@@ -235,8 +256,9 @@ impl FunctionStreamService for FunctionStreamServiceImpl {
235256

236257
if !result.success {
237258
error!("show_functions execution failed: {}", result.message);
259+
let status = Self::classify_error(&result.message);
238260
return Ok(TonicResponse::new(ShowFunctionsResponse {
239-
status_code: StatusCode::InternalServerError as i32,
261+
status_code: status as i32,
240262
message: result.message,
241263
functions: vec![],
242264
}));

tests/integration/Makefile

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
# -----------------------------------------------------------------------
14+
# Integration Test Makefile
15+
# -----------------------------------------------------------------------
16+
# Usage:
17+
# make test — Setup env + run pytest (PYTEST_ARGS="-k xxx")
18+
# make clean — Remove .venv and test output
19+
#
20+
# Prerequisites:
21+
# The FunctionStream binary must already be built (make build / make build-lite
22+
# from the project root).
23+
# -----------------------------------------------------------------------
24+
25+
PROJECT_ROOT := $(shell git -C $(CURDIR) rev-parse --show-toplevel)
26+
PYTHON_ROOT := $(PROJECT_ROOT)/python
27+
VENV := $(CURDIR)/.venv
28+
PIP := $(VENV)/bin/pip
29+
PY := $(VENV)/bin/python
30+
31+
C_G := \033[0;32m
32+
C_B := \033[0;34m
33+
C_0 := \033[0m
34+
35+
log = @printf "$(C_B)[-]$(C_0) %-12s %s\n" "$(1)" "$(2)"
36+
success = @printf "$(C_G)[✔]$(C_0) %s\n" "$(1)"
37+
38+
.PHONY: test clean help
39+
40+
help:
41+
@echo "Integration Test Targets:"
42+
@echo ""
43+
@echo " test Setup Python env + run pytest (PYTEST_ARGS=...)"
44+
@echo " clean Remove .venv and target/tests output"
45+
46+
install: requirements.txt $(PYTHON_ROOT)/functionstream-api/pyproject.toml $(PYTHON_ROOT)/functionstream-client/pyproject.toml
47+
$(call log,ENV,Setting up Python virtual environment)
48+
@test -d $(VENV) || python3 -m venv $(VENV)
49+
@$(PIP) install --quiet --upgrade pip
50+
@$(PIP) install --quiet -r requirements.txt
51+
@touch $@
52+
$(call success,Python environment ready)
53+
54+
test: install
55+
$(call log,TEST,Running integration tests)
56+
@$(PY) -m pytest -v $(PYTEST_ARGS)
57+
$(call success,All integration tests passed)
58+
59+
clean:
60+
$(call log,CLEAN,Removing test artifacts)
61+
@rm -rf $(VENV)
62+
@rm -rf $(CURDIR)/target
63+
@rm -rf $(CURDIR)/install
64+
$(call success,Clean complete)

tests/integration/README.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Integration Tests
2+
3+
## Prerequisites
4+
5+
| Dependency | Version | Purpose |
6+
|------------|----------|-------------------------------------------------|
7+
| Python | >= 3.9 | Test framework runtime |
8+
| Rust | stable | Build the FunctionStream binary |
9+
| Docker | >= 20.10 | Run a Kafka broker for streaming integration tests |
10+
11+
> **Docker is required.** The test framework automatically pulls and manages
12+
> an `apache/kafka:3.7.0` container in KRaft mode to provide a real Kafka
13+
> broker for tests that involve Kafka input/output. Tests will fail if the
14+
> Docker daemon is not running.
15+
16+
## Quick Start
17+
18+
```bash
19+
# From the project root
20+
make build # Build the release binary (with --features python)
21+
make integration-test
22+
23+
# Or run directly from this directory
24+
cd tests/integration
25+
make test
26+
```
27+
28+
## Directory Layout
29+
30+
```
31+
tests/integration/
32+
├── Makefile # test / clean targets
33+
├── requirements.txt # Python dependencies (pytest, grpcio, docker, ...)
34+
├── pytest.ini # Pytest configuration
35+
├── framework/ # Reusable test infrastructure
36+
│ ├── instance.py # FunctionStreamInstance facade
37+
│ ├── workspace.py # Per-test directory management
38+
│ ├── config.py # Server config generation
39+
│ ├── process.py # OS process lifecycle (start/stop/kill)
40+
│ ├── utils.py # Port allocation, readiness probes
41+
│ └── kafka_manager.py # Docker-managed Kafka broker (KRaft mode)
42+
├── test/ # Test suites
43+
│ ├── wasm/ # WASM function tests
44+
│ │ └── python_sdk/ # Python SDK integration tests
45+
│ └── streaming/ # Streaming engine tests (future)
46+
└── target/ # Test output (git-ignored)
47+
├── .shared_cache/ # Shared WASM compilation cache across tests
48+
└── <suite>/<class>/<method>/<timestamp>/logs/
49+
```
50+
51+
## Test Output
52+
53+
Each test gets an isolated server instance with its own log directory:
54+
55+
```
56+
target/wasm/python_sdk/TestFunctionLifecycle/test_full_lifecycle_transitions/20260416_221655/
57+
logs/
58+
app.log # FunctionStream application log
59+
stdout.log # Server stdout
60+
stderr.log # Server stderr
61+
```
62+
63+
Only `logs/` is retained after tests complete; `conf/` and `data/` are
64+
automatically cleaned up.
65+
66+
## Python Dependencies
67+
68+
All Python packages are listed in `requirements.txt` and installed
69+
automatically by `make test`. Key dependencies:
70+
71+
- `pytest` — test runner
72+
- `grpcio` / `protobuf` — gRPC client communication
73+
- `docker` — Docker SDK for managing the Kafka container
74+
- `confluent-kafka` — Kafka admin client for topic management
75+
- `functionstream-api` / `functionstream-client` — local editable installs
76+
77+
## Running Specific Tests
78+
79+
```bash
80+
# Single test
81+
make test PYTEST_ARGS="-k test_full_lifecycle_transitions"
82+
83+
# Single file
84+
make test PYTEST_ARGS="test/wasm/python_sdk/test_lifecycle.py"
85+
86+
# Verbose with live log
87+
make test PYTEST_ARGS="-v --log-cli-level=DEBUG"
88+
```
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
from .instance import FunctionStreamInstance
14+
15+
__all__ = ["FunctionStreamInstance", "KafkaDockerManager"]
16+
17+
18+
def __getattr__(name: str):
19+
if name == "KafkaDockerManager":
20+
from .kafka_manager import KafkaDockerManager
21+
return KafkaDockerManager
22+
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

0 commit comments

Comments
 (0)