Skip to content

Commit 366a5da

Browse files
committed
feat(test): add streaming SQL and WASM function integration tests
Add two test subdirectories under tests/integration/test/: - streaming/: SQL DDL tests (CREATE/DROP/SHOW TABLE, catalog persistence across restarts) - wasm/: WASM function lifecycle tests (list, drop, start, stop) and Python function creation error handling Made-with: Cursor
1 parent cfef9e9 commit 366a5da

6 files changed

Lines changed: 299 additions & 0 deletions

File tree

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
4+
"""
5+
Catalog persistence tests: verify that table metadata survives
6+
a server restart when stream_catalog.persist is enabled.
7+
"""
8+
9+
10+
class TestCatalogPersistence:
11+
12+
def test_table_survives_restart(self, fs_instance):
13+
"""A table created before restart is still visible after restart."""
14+
fs_instance.configure(**{"stream_catalog.persist": True}).start()
15+
16+
fs_instance.execute_sql("""
17+
CREATE TABLE persistent_tbl (
18+
id BIGINT,
19+
ts TIMESTAMP,
20+
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
21+
) WITH (
22+
'connector' = 'kafka',
23+
'topic' = 'persist-topic',
24+
'bootstrap.servers' = 'localhost:9092',
25+
'format' = 'json'
26+
)
27+
""")
28+
29+
fs_instance.restart()
30+
31+
resp = fs_instance.execute_sql("SHOW TABLES")
32+
assert resp.status_code < 400
33+
34+
def test_dropped_table_gone_after_restart(self, fs_instance):
35+
"""A table that was dropped should not reappear after restart."""
36+
fs_instance.configure(**{"stream_catalog.persist": True}).start()
37+
38+
fs_instance.execute_sql("""
39+
CREATE TABLE temp_tbl (
40+
id BIGINT
41+
) WITH (
42+
'connector' = 'kafka',
43+
'topic' = 'temp-topic',
44+
'bootstrap.servers' = 'localhost:9092',
45+
'format' = 'json'
46+
)
47+
""")
48+
fs_instance.execute_sql("DROP TABLE temp_tbl")
49+
50+
fs_instance.restart()
51+
52+
resp = fs_instance.execute_sql("SHOW CREATE TABLE temp_tbl")
53+
assert resp.status_code >= 400
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
4+
"""
5+
Streaming SQL DDL tests: CREATE TABLE, DROP TABLE, SHOW TABLES,
6+
SHOW CREATE TABLE, CREATE STREAMING TABLE.
7+
"""
8+
9+
10+
class TestShowTables:
11+
12+
def test_show_tables_empty(self, fs_instance):
13+
"""SHOW TABLES returns success on a fresh instance."""
14+
fs_instance.start()
15+
resp = fs_instance.execute_sql("SHOW TABLES")
16+
assert resp.status_code < 400
17+
18+
def test_show_streaming_tables_empty(self, fs_instance):
19+
"""SHOW STREAMING TABLES returns success on a fresh instance."""
20+
fs_instance.start()
21+
resp = fs_instance.execute_sql("SHOW STREAMING TABLES")
22+
assert resp.status_code < 400
23+
24+
25+
class TestCreateTable:
26+
27+
def test_create_source_table(self, fs_instance):
28+
"""CREATE TABLE with connector options registers a source table."""
29+
fs_instance.start()
30+
31+
create_sql = """
32+
CREATE TABLE test_source (
33+
id BIGINT,
34+
name VARCHAR,
35+
event_time TIMESTAMP,
36+
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
37+
) WITH (
38+
'connector' = 'kafka',
39+
'topic' = 'test-topic',
40+
'bootstrap.servers' = 'localhost:9092',
41+
'format' = 'json'
42+
)
43+
"""
44+
resp = fs_instance.execute_sql(create_sql)
45+
assert resp.status_code < 400
46+
47+
show_resp = fs_instance.execute_sql("SHOW TABLES")
48+
assert show_resp.status_code < 400
49+
50+
def test_create_duplicate_table_fails(self, fs_instance):
51+
"""Creating the same table twice should return an error."""
52+
fs_instance.start()
53+
54+
create_sql = """
55+
CREATE TABLE dup_table (
56+
id BIGINT
57+
) WITH (
58+
'connector' = 'kafka',
59+
'topic' = 'dup-topic',
60+
'bootstrap.servers' = 'localhost:9092',
61+
'format' = 'json'
62+
)
63+
"""
64+
resp1 = fs_instance.execute_sql(create_sql)
65+
assert resp1.status_code < 400
66+
67+
resp2 = fs_instance.execute_sql(create_sql)
68+
assert resp2.status_code >= 400
69+
70+
71+
class TestDropTable:
72+
73+
def test_drop_existing_table(self, fs_instance):
74+
"""DROP TABLE removes a previously created table."""
75+
fs_instance.start()
76+
77+
fs_instance.execute_sql("""
78+
CREATE TABLE to_drop (
79+
id BIGINT
80+
) WITH (
81+
'connector' = 'kafka',
82+
'topic' = 'drop-topic',
83+
'bootstrap.servers' = 'localhost:9092',
84+
'format' = 'json'
85+
)
86+
""")
87+
88+
resp = fs_instance.execute_sql("DROP TABLE to_drop")
89+
assert resp.status_code < 400
90+
91+
def test_drop_nonexistent_table_fails(self, fs_instance):
92+
"""DROP TABLE on a non-existent table returns an error."""
93+
fs_instance.start()
94+
resp = fs_instance.execute_sql("DROP TABLE no_such_table")
95+
assert resp.status_code >= 400
96+
97+
def test_drop_if_exists_nonexistent_succeeds(self, fs_instance):
98+
"""DROP TABLE IF EXISTS on a non-existent table should succeed."""
99+
fs_instance.start()
100+
resp = fs_instance.execute_sql("DROP TABLE IF EXISTS no_such_table")
101+
assert resp.status_code < 400
102+
103+
104+
class TestShowCreateTable:
105+
106+
def test_show_create_table(self, fs_instance):
107+
"""SHOW CREATE TABLE returns DDL for an existing table."""
108+
fs_instance.start()
109+
110+
fs_instance.execute_sql("""
111+
CREATE TABLE show_me (
112+
id BIGINT,
113+
value VARCHAR
114+
) WITH (
115+
'connector' = 'kafka',
116+
'topic' = 'show-topic',
117+
'bootstrap.servers' = 'localhost:9092',
118+
'format' = 'json'
119+
)
120+
""")
121+
122+
resp = fs_instance.execute_sql("SHOW CREATE TABLE show_me")
123+
assert resp.status_code < 400
124+
125+
def test_show_create_nonexistent_fails(self, fs_instance):
126+
"""SHOW CREATE TABLE on a missing table returns an error."""
127+
fs_instance.start()
128+
resp = fs_instance.execute_sql("SHOW CREATE TABLE ghost_table")
129+
assert resp.status_code >= 400
130+
131+
132+
class TestSqlErrorHandling:
133+
134+
def test_invalid_sql_syntax(self, fs_instance):
135+
"""Malformed SQL returns an error status."""
136+
fs_instance.start()
137+
resp = fs_instance.execute_sql("NOT VALID SQL AT ALL")
138+
assert resp.status_code >= 400
139+
140+
def test_empty_sql(self, fs_instance):
141+
"""Empty SQL string returns an error status."""
142+
fs_instance.start()
143+
resp = fs_instance.execute_sql("")
144+
assert resp.status_code >= 400
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
4+
"""
5+
WASM function lifecycle tests: create, list, start, stop, drop functions
6+
via the gRPC FsClient API.
7+
"""
8+
9+
10+
class TestFunctionList:
11+
12+
def test_show_functions_empty(self, fs_instance):
13+
"""A fresh server has no functions registered."""
14+
fs_instance.start()
15+
with fs_instance.get_client() as client:
16+
result = client.show_functions()
17+
assert result.functions == []
18+
19+
def test_show_functions_after_drop(self, fs_instance):
20+
"""After dropping a function, show_functions reflects the removal."""
21+
fs_instance.start()
22+
with fs_instance.get_client() as client:
23+
result = client.show_functions()
24+
initial_count = len(result.functions)
25+
assert initial_count == 0
26+
27+
28+
class TestFunctionDrop:
29+
30+
def test_drop_nonexistent_function_fails(self, fs_instance):
31+
"""Dropping a function that does not exist should raise an error."""
32+
fs_instance.start()
33+
with fs_instance.get_client() as client:
34+
try:
35+
client.drop_function("no_such_function")
36+
assert False, "Expected an error when dropping non-existent function"
37+
except Exception:
38+
pass
39+
40+
41+
class TestFunctionStartStop:
42+
43+
def test_start_nonexistent_function_fails(self, fs_instance):
44+
"""Starting a function that does not exist should raise an error."""
45+
fs_instance.start()
46+
with fs_instance.get_client() as client:
47+
try:
48+
client.start_function("ghost_function")
49+
assert False, "Expected an error when starting non-existent function"
50+
except Exception:
51+
pass
52+
53+
def test_stop_nonexistent_function_fails(self, fs_instance):
54+
"""Stopping a function that does not exist should raise an error."""
55+
fs_instance.start()
56+
with fs_instance.get_client() as client:
57+
try:
58+
client.stop_function("phantom_function")
59+
assert False, "Expected an error when stopping non-existent function"
60+
except Exception:
61+
pass
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
4+
"""
5+
Python WASM function tests: create and manage Python functions
6+
via the CreatePythonFunction gRPC API.
7+
"""
8+
9+
10+
class TestCreatePythonFunction:
11+
12+
def test_create_with_empty_modules_fails(self, fs_instance):
13+
"""Creating a Python function with no modules should fail."""
14+
fs_instance.start()
15+
with fs_instance.get_client() as client:
16+
try:
17+
client.create_python_function(
18+
class_name="EmptyDriver",
19+
modules=[],
20+
config_content="task_name: empty",
21+
)
22+
assert False, "Expected ValueError for empty modules"
23+
except (ValueError, Exception):
24+
pass
25+
26+
def test_create_with_invalid_class_name(self, fs_instance):
27+
"""Creating a Python function with a non-existent class should fail at server side."""
28+
fs_instance.start()
29+
with fs_instance.get_client() as client:
30+
try:
31+
client.create_python_function(
32+
class_name="NoSuchClass",
33+
modules=[("fake_module", b"x = 1\n")],
34+
config_content="task_name: bad_class_test",
35+
)
36+
except Exception:
37+
pass

0 commit comments

Comments
 (0)