Skip to content

Commit f920659

Browse files
committed
update
1 parent 88d57b9 commit f920659

127 files changed

Lines changed: 19190 additions & 0 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/runtime.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
// In-tree runtime: streaming engine, util helpers, and WASM task runtime.
14+
// Paths are relative to `src/` (this file lives at `src/runtime.rs`).
15+
16+
pub use function_stream_runtime_common::{common, memory};
17+
18+
#[path = "streaming_runtime/src/streaming/mod.rs"]
19+
pub mod streaming;
20+
21+
#[path = "streaming_runtime/src/util/mod.rs"]
22+
pub mod util;
23+
24+
#[path = "wasm_runtime/src/wasm/mod.rs"]
25+
pub mod wasm;
26+
27+
pub use wasm::{input, output, processor};
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
//! Streaming control-plane types shared by the SQL planner and the execution runtime.
14+
15+
use bincode::{Decode, Encode};
16+
use serde::{Deserialize, Serialize};
17+
use std::time::SystemTime;
18+
19+
#[derive(Debug, Copy, Clone, PartialEq, Eq, Encode, Decode, Serialize, Deserialize)]
20+
pub enum Watermark {
21+
EventTime(SystemTime),
22+
Idle,
23+
}
24+
25+
#[derive(Debug, Copy, Clone, PartialEq, Eq, Encode, Decode, Serialize, Deserialize)]
26+
pub struct CheckpointBarrier {
27+
pub epoch: u64,
28+
pub min_epoch: u64,
29+
pub timestamp: SystemTime,
30+
pub then_stop: bool,
31+
}
32+
33+
pub fn merge_watermarks(per_input: &[Option<Watermark>]) -> Option<Watermark> {
34+
if per_input.iter().any(|w| w.is_none()) {
35+
return None;
36+
}
37+
38+
let mut min_event: Option<SystemTime> = None;
39+
let mut all_idle = true;
40+
41+
for w in per_input.iter().flatten() {
42+
match w {
43+
Watermark::Idle => {}
44+
Watermark::EventTime(t) => {
45+
all_idle = false;
46+
min_event = Some(match min_event {
47+
None => *t,
48+
Some(m) => m.min(*t),
49+
});
50+
}
51+
}
52+
}
53+
54+
if all_idle {
55+
Some(Watermark::Idle)
56+
} else {
57+
Some(Watermark::EventTime(min_event.expect(
58+
"non-idle alignment must have at least one EventTime",
59+
)))
60+
}
61+
}
62+
63+
pub fn watermark_strictly_advances(new: Watermark, previous: Option<Watermark>) -> bool {
64+
match previous {
65+
None => true,
66+
Some(prev) => match (new, prev) {
67+
(Watermark::EventTime(tn), Watermark::EventTime(tp)) => tn > tp,
68+
(Watermark::Idle, Watermark::Idle) => false,
69+
(Watermark::Idle, Watermark::EventTime(_)) => true,
70+
(Watermark::EventTime(_), Watermark::Idle) => true,
71+
},
72+
}
73+
}
74+
75+
#[cfg(test)]
76+
mod watermark_tests {
77+
use super::*;
78+
use std::time::Duration;
79+
80+
#[test]
81+
fn merge_waits_for_all_channels() {
82+
let wms = vec![Some(Watermark::EventTime(SystemTime::UNIX_EPOCH)), None];
83+
assert!(merge_watermarks(&wms).is_none());
84+
}
85+
86+
#[test]
87+
fn merge_min_event_time_ignores_idle() {
88+
let t1 = SystemTime::UNIX_EPOCH + Duration::from_secs(10);
89+
let t2 = SystemTime::UNIX_EPOCH + Duration::from_secs(5);
90+
let wms = vec![Some(Watermark::EventTime(t1)), Some(Watermark::Idle)];
91+
assert_eq!(merge_watermarks(&wms), Some(Watermark::EventTime(t1)));
92+
93+
let wms = vec![
94+
Some(Watermark::EventTime(t1)),
95+
Some(Watermark::EventTime(t2)),
96+
];
97+
assert_eq!(merge_watermarks(&wms), Some(Watermark::EventTime(t2)));
98+
}
99+
100+
#[test]
101+
fn merge_all_idle() {
102+
let wms = vec![Some(Watermark::Idle), Some(Watermark::Idle)];
103+
assert_eq!(merge_watermarks(&wms), Some(Watermark::Idle));
104+
}
105+
}

src/storage.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
// Persistent / catalog-related modules compiled into the root crate.
14+
// Paths are relative to `src/` (this file lives at `src/storage.rs`).
15+
16+
use std::sync::Arc;
17+
18+
use anyhow::Context;
19+
20+
#[path = "wasm_runtime/src/state_backend/mod.rs"]
21+
pub mod state_backend;
22+
23+
#[path = "catalog_storage/src/stream_catalog/mod.rs"]
24+
pub mod stream_catalog;
25+
26+
#[path = "catalog_storage/src/task/mod.rs"]
27+
pub mod task;
28+
29+
/// Install the process-global [`stream_catalog::CatalogManager`] from configuration.
30+
/// In-memory when `config.stream_catalog.persist` is `false`, otherwise a durable
31+
/// [`stream_catalog::RocksDbMetaStore`] (default path: `{data_dir}/catalog.db`).
32+
pub fn initialize_stream_catalog(config: &crate::config::GlobalConfig) -> anyhow::Result<()> {
33+
use stream_catalog::{CatalogManager, InMemoryMetaStore, MetaStore, RocksDbMetaStore};
34+
35+
let store: Arc<dyn MetaStore> = if !config.stream_catalog.persist {
36+
Arc::new(InMemoryMetaStore::new())
37+
} else {
38+
let path = config
39+
.stream_catalog
40+
.db_path
41+
.as_ref()
42+
.map(|p| crate::config::resolve_path(p))
43+
.unwrap_or_else(|| crate::config::get_data_dir().join("catalog.db"));
44+
45+
std::fs::create_dir_all(&path).with_context(|| {
46+
format!(
47+
"Failed to create stream catalog RocksDB directory {}",
48+
path.display()
49+
)
50+
})?;
51+
52+
Arc::new(RocksDbMetaStore::open(&path).with_context(|| {
53+
format!(
54+
"Failed to open stream catalog RocksDB at {}",
55+
path.display()
56+
)
57+
})?)
58+
};
59+
60+
CatalogManager::init_global(store).context("Stream catalog (CatalogManager) global init failed")
61+
}

src/streaming_planner/Cargo.toml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
[package]
2+
name = "function-stream-streaming-planner"
3+
version = "0.6.0"
4+
edition = "2024"
5+
6+
[lib]
7+
name = "function_stream_streaming_planner"
8+
path = "src/lib.rs"
9+
10+
[dependencies]
11+
protocol = { path = "../../protocol" }
12+
prost = "0.13"
13+
function-stream-config = { path = "../config" }
14+
function-stream-runtime-common = { path = "../runtime_common" }
15+
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "time", "net"] }
16+
tokio-stream = "0.1.18"
17+
anyhow = "1.0"
18+
xxhash-rust = { version = "0.8", features = ["xxh3"] }
19+
serde = { version = "1.0", features = ["derive"] }
20+
serde_json = "1.0"
21+
thiserror = "2"
22+
tracing = "0.1"
23+
async-trait = "0.1"
24+
futures = "0.3"
25+
itertools = "0.14"
26+
petgraph = "0.7"
27+
unicase = "2.7"
28+
rand = { version = "0.8", features = ["small_rng"] }
29+
bincode = { version = "2", features = ["serde"] }
30+
chrono = "0.4"
31+
bytes = "1"
32+
ahash = "0.8"
33+
strum = { version = "0.26", features = ["derive"] }
34+
serde_json_path = "0.7"
35+
36+
arrow = { version = "55", default-features = false }
37+
arrow-array = "55"
38+
arrow-schema = { version = "55", features = ["serde"] }
39+
arrow-json = { version = "55.2.0" }
40+
apache-avro = "0.21"
41+
42+
datafusion = { git = "https://github.com/FunctionStream/datafusion", branch = "48.0.1/fs" }
43+
datafusion-common = { git = "https://github.com/FunctionStream/datafusion", branch = "48.0.1/fs" }
44+
datafusion-execution = { git = "https://github.com/FunctionStream/datafusion", branch = "48.0.1/fs" }
45+
datafusion-expr = { git = "https://github.com/FunctionStream/datafusion", branch = "48.0.1/fs" }
46+
datafusion-physical-expr = { git = "https://github.com/FunctionStream/datafusion", branch = "48.0.1/fs" }
47+
datafusion-proto = { git = "https://github.com/FunctionStream/datafusion", branch = "48.0.1/fs" }
48+
49+
sqlparser = { git = "https://github.com/FunctionStream/sqlparser-rs", branch = "0.58.0/fs" }

0 commit comments

Comments
 (0)