Skip to content

Commit 0602512

Browse files
committed
update
1 parent 55bdff8 commit 0602512

61 files changed

Lines changed: 721 additions & 230 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

conf/config.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,19 @@ wasm:
4949
# When cache exceeds this size, least recently used items will be evicted
5050
max_cache_size: 104857600
5151

52+
# Streaming Runtime Configuration
53+
streaming:
54+
# Global memory pool size for the streaming runtime (network buffering, backpressure).
55+
# When not set, auto-detected as 70% of physical memory.
56+
# Fallback: 256 MiB if detection fails.
57+
# max_memory_bytes: 268435456
58+
59+
# Memory budget per stateful operator (aggregation, join, window).
60+
# Each operator gets its own independent memory controller with this limit.
61+
# When exceeded, the operator spills state to disk automatically.
62+
# Default: 67108864 (64 MiB)
63+
per_operator_state_memory_bytes: 67108864
64+
5265
# State Storage Configuration
5366
# Used to store runtime state data for tasks
5467
state_storage:

src/config/global_config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ use crate::config::wasm_config::WasmConfig;
2121

2222
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2323
pub struct StreamingConfig {
24-
/// Maximum heap memory (in bytes) available to the streaming runtime's memory pool.
25-
/// Defaults to 256 MiB when absent.
26-
pub max_memory_bytes: Option<usize>,
24+
pub max_memory_bytes: Option<u64>,
25+
/// Total bytes for the global operator-state [`MemoryPool`](crate::runtime::memory::MemoryPool) (all stores share this quota).
26+
pub per_operator_state_memory_bytes: Option<u64>,
2727
}
2828

2929
#[derive(Debug, Clone, Serialize, Deserialize, Default)]

src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod paths;
1717
pub mod python_config;
1818
pub mod service_config;
1919
pub mod storage;
20+
pub mod system;
2021
pub mod wasm_config;
2122

2223
pub use global_config::GlobalConfig;

src/config/system.rs

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
use std::io;
14+
15+
pub struct SystemMemoryInfo {
16+
pub total_physical: u64,
17+
pub available_physical: u64,
18+
pub total_virtual: u64,
19+
pub available_virtual: u64,
20+
}
21+
22+
pub fn system_memory_info() -> io::Result<SystemMemoryInfo> {
23+
sys::system_memory_info()
24+
}
25+
26+
#[cfg(target_os = "linux")]
27+
mod sys {
28+
use super::SystemMemoryInfo;
29+
use std::io;
30+
31+
pub fn system_memory_info() -> io::Result<SystemMemoryInfo> {
32+
let content = std::fs::read_to_string("/proc/meminfo")?;
33+
34+
let mut total_physical: Option<u64> = None;
35+
let mut available_physical: Option<u64> = None;
36+
let mut swap_total: u64 = 0;
37+
let mut swap_free: u64 = 0;
38+
39+
for line in content.lines() {
40+
if let Some(v) = parse_meminfo_kb(line, "MemTotal:") {
41+
total_physical = Some(v);
42+
} else if let Some(v) = parse_meminfo_kb(line, "MemAvailable:") {
43+
available_physical = Some(v);
44+
} else if let Some(v) = parse_meminfo_kb(line, "SwapTotal:") {
45+
swap_total = v;
46+
} else if let Some(v) = parse_meminfo_kb(line, "SwapFree:") {
47+
swap_free = v;
48+
}
49+
}
50+
51+
let total_phys = total_physical.ok_or_else(|| {
52+
io::Error::new(
53+
io::ErrorKind::NotFound,
54+
"MemTotal not found in /proc/meminfo",
55+
)
56+
})?;
57+
let avail_phys = available_physical.unwrap_or(0);
58+
59+
Ok(SystemMemoryInfo {
60+
total_physical: total_phys,
61+
available_physical: avail_phys,
62+
total_virtual: total_phys + swap_total,
63+
available_virtual: avail_phys + swap_free,
64+
})
65+
}
66+
67+
fn parse_meminfo_kb(line: &str, prefix: &str) -> Option<u64> {
68+
let rest = line.strip_prefix(prefix)?;
69+
let kb: u64 = rest.trim().trim_end_matches("kB").trim().parse().ok()?;
70+
Some(kb * 1024)
71+
}
72+
}
73+
74+
#[cfg(target_os = "macos")]
75+
mod sys {
76+
use super::SystemMemoryInfo;
77+
use std::io;
78+
79+
pub fn system_memory_info() -> io::Result<SystemMemoryInfo> {
80+
let total_physical = sysctl_u64("hw.memsize")?;
81+
82+
let page_size = sysctl_u64("hw.pagesize").unwrap_or(4096);
83+
let vm_stats = read_vm_stat()?;
84+
85+
let free_pages = vm_stats.free + vm_stats.inactive + vm_stats.purgeable;
86+
let available_physical = free_pages * page_size;
87+
88+
let swap = read_swap_usage();
89+
let swap_total = swap.0;
90+
let swap_free = swap_total.saturating_sub(swap.1);
91+
92+
Ok(SystemMemoryInfo {
93+
total_physical,
94+
available_physical,
95+
total_virtual: total_physical + swap_total,
96+
available_virtual: available_physical + swap_free,
97+
})
98+
}
99+
100+
fn sysctl_u64(name: &str) -> io::Result<u64> {
101+
let output = std::process::Command::new("sysctl")
102+
.arg("-n")
103+
.arg(name)
104+
.output()?;
105+
if !output.status.success() {
106+
return Err(io::Error::new(
107+
io::ErrorKind::Other,
108+
format!("sysctl {name} failed"),
109+
));
110+
}
111+
String::from_utf8_lossy(&output.stdout)
112+
.trim()
113+
.parse()
114+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
115+
}
116+
117+
struct VmPages {
118+
free: u64,
119+
inactive: u64,
120+
purgeable: u64,
121+
}
122+
123+
fn read_vm_stat() -> io::Result<VmPages> {
124+
let output = std::process::Command::new("vm_stat").output()?;
125+
let text = String::from_utf8_lossy(&output.stdout);
126+
127+
let mut free = 0u64;
128+
let mut inactive = 0u64;
129+
let mut purgeable = 0u64;
130+
131+
for line in text.lines() {
132+
if let Some(v) = parse_vm_stat_line(line, "Pages free") {
133+
free = v;
134+
} else if let Some(v) = parse_vm_stat_line(line, "Pages inactive") {
135+
inactive = v;
136+
} else if let Some(v) = parse_vm_stat_line(line, "Pages purgeable") {
137+
purgeable = v;
138+
}
139+
}
140+
141+
Ok(VmPages {
142+
free,
143+
inactive,
144+
purgeable,
145+
})
146+
}
147+
148+
fn parse_vm_stat_line(line: &str, key: &str) -> Option<u64> {
149+
if !line.contains(key) {
150+
return None;
151+
}
152+
let val_str = line.rsplit(':').next()?.trim().trim_end_matches('.');
153+
val_str.parse().ok()
154+
}
155+
156+
fn read_swap_usage() -> (u64, u64) {
157+
let output = match std::process::Command::new("sysctl")
158+
.arg("-n")
159+
.arg("vm.swapusage")
160+
.output()
161+
{
162+
Ok(o) => o,
163+
Err(_) => return (0, 0),
164+
};
165+
let text = String::from_utf8_lossy(&output.stdout);
166+
let mut total = 0u64;
167+
let mut used = 0u64;
168+
for part in text.split_whitespace() {
169+
if let Some(mb_str) = part.strip_suffix("M") {
170+
if let Ok(mb) = mb_str.parse::<f64>() {
171+
if total == 0 {
172+
total = (mb * 1024.0 * 1024.0) as u64;
173+
} else if used == 0 {
174+
used = (mb * 1024.0 * 1024.0) as u64;
175+
}
176+
}
177+
}
178+
}
179+
(total, used)
180+
}
181+
}
182+
183+
#[cfg(target_os = "windows")]
184+
mod sys {
185+
use super::SystemMemoryInfo;
186+
use std::io;
187+
188+
#[repr(C)]
189+
struct MemoryStatusEx {
190+
dw_length: u32,
191+
dw_memory_load: u32,
192+
ull_total_phys: u64,
193+
ull_avail_phys: u64,
194+
ull_total_page_file: u64,
195+
ull_avail_page_file: u64,
196+
ull_total_virtual: u64,
197+
ull_avail_virtual: u64,
198+
ull_avail_extended_virtual: u64,
199+
}
200+
201+
extern "system" {
202+
fn GlobalMemoryStatusEx(lpBuffer: *mut MemoryStatusEx) -> i32;
203+
}
204+
205+
pub fn system_memory_info() -> io::Result<SystemMemoryInfo> {
206+
unsafe {
207+
let mut status = std::mem::zeroed::<MemoryStatusEx>();
208+
status.dw_length = std::mem::size_of::<MemoryStatusEx>() as u32;
209+
if GlobalMemoryStatusEx(&mut status) == 0 {
210+
return Err(io::Error::last_os_error());
211+
}
212+
Ok(SystemMemoryInfo {
213+
total_physical: status.ull_total_phys,
214+
available_physical: status.ull_avail_phys,
215+
total_virtual: status.ull_total_virtual,
216+
available_virtual: status.ull_avail_virtual,
217+
})
218+
}
219+
}
220+
}
221+
222+
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
223+
mod sys {
224+
use super::SystemMemoryInfo;
225+
use std::io;
226+
227+
pub fn system_memory_info() -> io::Result<SystemMemoryInfo> {
228+
Err(io::Error::new(
229+
io::ErrorKind::Unsupported,
230+
"memory detection not supported on this platform",
231+
))
232+
}
233+
}

src/coordinator/execution/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::coordinator::plan::{
3030
use crate::coordinator::statement::{ConfigSource, FunctionSource};
3131
use crate::runtime::streaming::job::JobManager;
3232
use crate::runtime::streaming::protocol::control::StopMode;
33-
use crate::runtime::taskexecutor::TaskManager;
33+
use crate::runtime::wasm::taskexecutor::TaskManager;
3434
use crate::sql::schema::show_create_catalog_table;
3535
use crate::sql::schema::table::Table as CatalogTable;
3636
use crate::storage::stream_catalog::CatalogManager;

src/coordinator/runtime_context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::sync::Arc;
1717
use anyhow::Result;
1818

1919
use crate::runtime::streaming::job::JobManager;
20-
use crate::runtime::taskexecutor::TaskManager;
20+
use crate::runtime::wasm::taskexecutor::TaskManager;
2121
use crate::sql::schema::StreamSchemaProvider;
2222
use crate::storage::stream_catalog::CatalogManager;
2323

src/runtime/memory/block.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
//
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
use std::sync::Arc;
15+
use std::sync::atomic::{AtomicU64, Ordering};
16+
17+
use super::pool::MemoryPool;
18+
use super::ticket::MemoryTicket;
19+
20+
#[derive(Debug)]
21+
pub struct MemoryBlock {
22+
capacity: u64,
23+
available_bytes: AtomicU64,
24+
pool: Arc<MemoryPool>,
25+
}
26+
27+
impl MemoryBlock {
28+
pub(crate) fn new(capacity: u64, pool: Arc<MemoryPool>) -> Arc<Self> {
29+
Arc::new(Self {
30+
capacity,
31+
available_bytes: AtomicU64::new(capacity),
32+
pool,
33+
})
34+
}
35+
36+
pub fn try_allocate(self: &Arc<Self>, bytes: u64) -> Option<MemoryTicket> {
37+
if bytes == 0 {
38+
return Some(MemoryTicket::new(0, self.clone()));
39+
}
40+
41+
let mut current_available = self.available_bytes.load(Ordering::Acquire);
42+
loop {
43+
if current_available < bytes {
44+
return None;
45+
}
46+
47+
match self.available_bytes.compare_exchange_weak(
48+
current_available,
49+
current_available - bytes,
50+
Ordering::AcqRel,
51+
Ordering::Acquire,
52+
) {
53+
Ok(_) => return Some(MemoryTicket::new(bytes, self.clone())),
54+
Err(actual) => current_available = actual,
55+
}
56+
}
57+
}
58+
59+
#[inline]
60+
pub fn available_bytes(&self) -> u64 {
61+
self.available_bytes.load(Ordering::Relaxed)
62+
}
63+
64+
pub(crate) fn release_ticket(&self, bytes: u64) {
65+
if bytes > 0 {
66+
self.available_bytes.fetch_add(bytes, Ordering::Release);
67+
}
68+
}
69+
}
70+
71+
impl Drop for MemoryBlock {
72+
fn drop(&mut self) {
73+
self.pool.release_block(self.capacity);
74+
}
75+
}

0 commit comments

Comments
 (0)