Skip to content

Commit 24bf8ee

Browse files
committed
feat(sql): add pluggable source/sink builders and sink passthrough connectors
Refactor connector config construction into role-specific providers and builders, and add sink passthrough implementations for filesystem, delta, iceberg, lancedb plus dedicated S3/Kafka provider wiring for stronger option validation. Made-with: Cursor
1 parent 488d3b6 commit 24bf8ee

75 files changed

Lines changed: 8427 additions & 783 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 3827 additions & 461 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ arrow-array = "55"
5252
arrow-ipc = "55"
5353
arrow-schema = { version = "55", features = ["serde"] }
5454
parquet = "55"
55+
object_store = { version = "0.12.5", features = ["aws"] }
56+
bytes = "1"
5557
futures = "0.3"
5658
serde_json_path = "0.7"
5759
xxhash-rust = { version = "0.8", features = ["xxh3"] }
@@ -63,6 +65,7 @@ itertools = "0.14"
6365
strum = { version = "0.26", features = ["derive"] }
6466

6567
arrow-json = {version = '55.2.0'}
68+
apache-avro = "0.21"
6669
datafusion = {git = 'https://github.com/FunctionStream/datafusion', branch = '48.0.1/fs'}
6770
datafusion-common = {git = 'https://github.com/FunctionStream/datafusion', branch = '48.0.1/fs'}
6871
datafusion-execution = {git = 'https://github.com/FunctionStream/datafusion', branch = '48.0.1/fs'}
@@ -74,6 +77,9 @@ sqlparser = { git = "https://github.com/FunctionStream/sqlparser-rs", branch = "
7477

7578
ahash = "0.8"
7679
governor = "0.8.0"
80+
lance = { version = "4.0.0", default-features = false, features = ["aws"] }
81+
arrow-array-lance = { package = "arrow-array", version = "57.3.0" }
82+
arrow-ipc-lance = { package = "arrow-ipc", version = "57.3.0" }
7783

7884
[features]
7985
default = ["incremental-cache", "python"]

Makefile

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
12-
13-
1412
APP_NAME := function-stream
1513
VERSION := $(shell grep '^version' Cargo.toml | head -1 | awk -F '"' '{print $$2}')
1614
DATE := $(shell date -u +"%Y-%m-%dT%H:%M:%SZ")
@@ -29,30 +27,22 @@ endif
2927
OS := $(shell uname -s | tr '[:upper:]' '[:lower:]')
3028
OS_NAME := $(shell uname -s)
3129

32-
# 2. Configure RUSTFLAGS and target triple per platform
30+
# 2. Configure target triple per platform
3331
DIST_ROOT := dist
3432
ifeq ($(OS_NAME), Linux)
3533
TRIPLE := $(ARCH)-unknown-linux-gnu
3634
STATIC_FLAGS :=
3735
else ifeq ($(OS_NAME), Darwin)
38-
# macOS: strip symbols but keep dynamic linking (Apple system restriction)
3936
TRIPLE := $(ARCH)-apple-darwin
4037
STATIC_FLAGS :=
4138
else ifneq (,$(findstring MINGW,$(OS_NAME))$(findstring MSYS,$(OS_NAME)))
42-
# Windows (Git Bash / MSYS2): static-link MSVC runtime
4339
TRIPLE := $(ARCH)-pc-windows-msvc
4440
STATIC_FLAGS := -C target-feature=+crt-static
4541
else
46-
# Fallback
4742
TRIPLE := $(ARCH)-unknown-linux-gnu
4843
STATIC_FLAGS :=
4944
endif
5045

51-
# 3. Aggressive optimization flags
52-
# opt-level=z : size-oriented, minimize binary footprint
53-
# strip=symbols: remove debug symbol table at link time
54-
# Note: panic=abort is intentionally omitted to preserve stack unwinding
55-
# for better fault tolerance in the streaming runtime
5646
OPTIMIZE_FLAGS := -C opt-level=z -C strip=symbols $(STATIC_FLAGS)
5747

5848
TARGET_DIR := target/$(TRIPLE)/release
@@ -110,7 +100,7 @@ help:
110100
(printf "$(C_Y)[!] Auto-installing target toolchain for $(OS_NAME): $(TRIPLE)$(C_0)\n" && \
111101
rustup target add $(TRIPLE))
112102

113-
# 5. Build targets (depend on .ensure-target for automatic toolchain setup)
103+
# 5. Build targets
114104
build: .check-env .ensure-target .build-wasm
115105
$(call log,BUILD,Rust Full [$(OS_NAME) / $(TRIPLE)])
116106
@RUSTFLAGS="$(OPTIMIZE_FLAGS)" \
@@ -128,24 +118,24 @@ build: .check-env .ensure-target .build-wasm
128118

129119
build-lite: .check-env .ensure-target
130120
$(call log,BUILD,Rust Lite [$(OS_NAME) / $(TRIPLE)])
131-
@RUSTFLAGS="$(OPTIMIZE_FLAGS)" \
121+
@RUSTFLAGS="$(INDUSTRIAL_RUSTFLAGS)" \
132122
cargo build --release \
133-
--target $(TRIPLE) \
134-
--no-default-features \
135-
--features incremental-cache \
136-
--quiet
123+
--target $(TRIPLE) \
124+
--no-default-features \
125+
--features incremental-cache \
126+
--quiet
137127
$(call log,BUILD,CLI for dist)
138-
@RUSTFLAGS="$(OPTIMIZE_FLAGS)" \
128+
@RUSTFLAGS="$(INDUSTRIAL_RUSTFLAGS)" \
139129
cargo build --release \
140-
--target $(TRIPLE) \
141-
-p function-stream-cli \
142-
--quiet
130+
--target $(TRIPLE) \
131+
-p function-stream-cli \
132+
--quiet
143133
$(call success,Target: $(TARGET_DIR)/$(APP_NAME) $(TARGET_DIR)/cli)
144134

145135
.build-wasm:
146136
$(call log,WASM,Building Python Runtime using $(PYTHON_EXEC))
147137
@cd $(PYTHON_ROOT)/functionstream-runtime && \
148-
PYTHONPATH=../functionstream-api:../functionstream-api-advanced ../../$(PYTHON_EXEC) build.py > /dev/null
138+
PYTHONPATH=../functionstream-api:../functionstream-api-advanced ../../$(PYTHON_EXEC) build.py > /dev/null
149139
@[ -f "$(WASM_SOURCE)" ] || (printf "$(C_R)[X] WASM Build Failed$(C_0)\n" && exit 1)
150140

151141
dist: build
@@ -223,9 +213,9 @@ docker:
223213
docker-run:
224214
$(call log,DOCKER,Starting Container)
225215
@docker run --rm -it \
226-
-p 8080:8080 \
227-
-v $(CURDIR)/logs:/app/logs \
228-
$(IMAGE_NAME)
216+
-p 8080:8080 \
217+
-v $(CURDIR)/logs:/app/logs \
218+
$(IMAGE_NAME)
229219

230220
docker-push:
231221
$(call log,DOCKER,Pushing $(IMAGE_NAME))

README-zh.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ function-stream-<version>/
204204
| 文档 | 描述 |
205205
|------------------------------------------------------------------------|--------------------------|
206206
| [Streaming SQL 使用指南](docs/streaming-sql-guide-zh.md) | 声明式 SQL 实时流处理指南 |
207+
| [Streaming SQL Source/Sink 文档](docs/streaming-sql/README-zh.md) | Source/Sink 能力与参数速查 |
207208
| [连接器、格式与类型参考](docs/connectors-and-formats-zh.md) | 支持的 Source/Sink、格式与数据类型 |
208209
| [服务端配置与运维指南](docs/server-configuration-zh.md) | 服务端配置与运维操作 |
209210
| [Function 任务配置规范](docs/function-configuration-zh.md) | 任务定义规范 |

docs/streaming-sql-guide-zh.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ flowchart LR
9090
| **水位线 (Watermark)** | `AS <column> - INTERVAL ...` |**迟到、乱序**数据的容忍度;时间推进由水位线驱动,**过度迟到**的事件会被安全丢弃。 |
9191

9292
> **完整参考**:支持的连接器、数据格式和 SQL 数据类型,请参阅 [连接器、格式与类型参考](connectors-and-formats-zh.md)
93+
>
94+
> **Source / Sink 专项说明**:请参阅 [Streaming SQL Connector 文档](streaming-sql/README-zh.md)(含 Kafka Source 与 filesystem/s3/delta/iceberg/lanceDB Sink)。
9395
9496
---
9597

docs/streaming-sql/README-zh.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Streaming SQL Connector 文档
2+
3+
本目录提供 Streaming SQL 的 Source / Sink 专项文档,推荐配合 `CREATE STREAMING TABLE ... AS SELECT ...` 使用。
4+
5+
## 目录
6+
7+
- [Source 文档](Source/README-zh.md)
8+
- [Sink 文档](Sink/README-zh.md)
9+
10+
## 使用建议
11+
12+
1. 先用 `CREATE TABLE ... WITH (...)` 注册 Source(当前仅 Kafka)。
13+
2. 再用 `CREATE STREAMING TABLE ... WITH (...) AS SELECT ...` 创建持续运行的 Pipeline 并写入 Sink。

docs/streaming-sql/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Streaming SQL Connector Docs
2+
3+
This directory contains Source/Sink focused docs for Streaming SQL, intended to be used with `CREATE STREAMING TABLE ... AS SELECT ...`.
4+
5+
## Index
6+
7+
- [Source Docs](Source/README.md)
8+
- [Sink Docs](Sink/README.md)
9+
10+
## Recommended workflow
11+
12+
1. Register sources using `CREATE TABLE ... WITH (...)` (currently Kafka source).
13+
2. Build a continuous pipeline using `CREATE STREAMING TABLE ... WITH (...) AS SELECT ...` and write to sinks.
14+
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Streaming SQL Sink 文档
2+
3+
本目录聚焦 Streaming SQL 的下游写出能力(Sink)。
4+
5+
## 支持矩阵
6+
7+
| Connector | 支持格式 |
8+
9+
|---|---|
10+
| `kafka` | `json` / `raw_string` / `raw_bytes`(沿用 Kafka Sink 编码能力) |
11+
| `filesystem` | `csv` / `parquet` / `json`(JSONL) / `avro` / `orc` |
12+
| `s3` | `csv` / `parquet` |
13+
| `delta` | `csv` / `parquet` / `json`(JSONL) / `avro` / `orc` |
14+
| `iceberg` | `csv` / `parquet` |
15+
| `lanceDB` | `lance` |
16+
17+
## 文档列表
18+
19+
- [Kafka Sink](kafka-sink-zh.md)
20+
- [Filesystem Sink](filesystem-sink-zh.md)
21+
- [S3 Sink](s3-sink-zh.md)
22+
- [Delta Sink](delta-sink-zh.md)
23+
- [Iceberg Sink](iceberg-sink-zh.md)
24+
- [LanceDB Sink](lancedb-sink-zh.md)
25+
26+
## 通用约定
27+
28+
-`CREATE STREAMING TABLE ... WITH (...) AS SELECT ...` 中通过 `WITH` 指定 `connector``format`
29+
- Sink 场景建议显式指定 `type='sink'`
30+
-`lanceDB` connector 允许 `format='lance'`;其余 Sink connector 不支持 `lance`
31+
- `format='json'` 的文件类 Sink 输出为 JSON Lines(NDJSON,`.jsonl`)。

docs/streaming-sql/Sink/README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Streaming SQL Sink Docs
2+
3+
This directory documents sink connectors for Streaming SQL.
4+
5+
## Support matrix
6+
7+
| Connector | Supported formats |
8+
|---|---|
9+
| `kafka` | `json` / `raw_string` / `raw_bytes` |
10+
| `filesystem` | `csv` / `parquet` / `json`(JSONL) / `avro` / `orc` |
11+
| `s3` | `csv` / `parquet` |
12+
| `delta` | `csv` / `parquet` / `json`(JSONL) / `avro` / `orc` |
13+
| `iceberg` | `csv` / `parquet` |
14+
| `lanceDB` | `lance` |
15+
16+
## Documents
17+
18+
- [Kafka Sink](kafka-sink.md)
19+
- [Filesystem Sink](filesystem-sink.md)
20+
- [S3 Sink](s3-sink.md)
21+
- [Delta Sink](delta-sink.md)
22+
- [Iceberg Sink](iceberg-sink.md)
23+
- [LanceDB Sink](lancedb-sink.md)
24+
25+
## Notes
26+
27+
- Configure sink connectors via `WITH (...)` in `CREATE STREAMING TABLE ... AS SELECT ...`.
28+
- Use `type='sink'` explicitly for sink tables.
29+
- Only `lanceDB` accepts `format='lance'`.
30+
- For file-like sinks, `format='json'` is written as JSON Lines (NDJSON, `.jsonl`).
31+
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Delta Sink
2+
3+
`delta` connector 对应 Delta 数据湖写出通道。
4+
5+
## 支持格式
6+
7+
- `csv`
8+
- `parquet`
9+
- `json`(写出为 JSON Lines / NDJSON,文件后缀 `.jsonl`
10+
- `avro`
11+
- `orc`
12+
13+
## 常用 WITH 参数
14+
15+
- `connector='delta'`
16+
- `type='sink'`
17+
- `format='csv'|'parquet'|'json'|'avro'|'orc'`
18+
- `path='/data/delta/orders'`(本地)或对象存储前缀
19+
- 可选 S3 参数:`s3.bucket` / `s3.region` / `s3.endpoint` / AKSK
20+
- `parquet.compression`(仅 `parquet`
21+
22+
## 示例(CREATE STREAMING TABLE)
23+
24+
```sql
25+
CREATE STREAMING TABLE st_delta_parquet
26+
WITH (
27+
connector='delta',
28+
type='sink',
29+
format='parquet',
30+
path='/tmp/delta_orders'
31+
) AS
32+
SELECT * FROM src_kafka_orders;
33+
```

0 commit comments

Comments
 (0)