Skip to content

Commit 4eb4d53

Browse files
committed
update
1 parent 041e130 commit 4eb4d53

16 files changed

Lines changed: 113 additions & 37 deletions

File tree

conf/config.yaml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,17 @@ wasm:
5151

5252
# Streaming Runtime Configuration
5353
streaming:
54-
# Bytes in the global memory pool for streaming execution: pipeline buffers, batch collect,
55-
# backpressure. Omitted → 200 MiB.
56-
streaming_runtime_memory_bytes: 209715200
54+
# Global memory pool for streaming pipeline execution (buffers, batch collect, backpressure).
55+
# Default / example: 10 MiB (10485760 bytes).
56+
streaming_runtime_memory_bytes: 10485760
5757

58-
# Per stateful operator (join / agg / window): in-memory state store cap; spill when exceeded.
59-
# Omitted → 100 MiB.
60-
operator_state_store_memory_bytes: 104857600
58+
# Per stateful operator (join / agg / window): in-memory state store cap before spill.
59+
# Default / example: 5 MiB (5242880 bytes).
60+
operator_state_store_memory_bytes: 5242880
6161
checkpoint_interval_ms: 60000
6262
pipeline_parallelism: 1
63+
# KeyBy (key extraction) operator pipeline parallelism in planned streaming jobs.
64+
key_by_parallelism: 1
6365

6466
# State Storage Configuration
6567
# Used to store runtime state data for tasks

src/config/global_config.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,21 @@ use crate::config::service_config::ServiceConfig;
2020
use crate::config::streaming_job::{ResolvedStreamingJobConfig, StreamingJobConfig};
2121
use crate::config::wasm_config::WasmConfig;
2222

23-
/// Default for [`StreamingConfig::streaming_runtime_memory_bytes`] when unset. **200 MiB.**
24-
pub const DEFAULT_STREAMING_RUNTIME_MEMORY_BYTES: u64 = 200 * 1024 * 1024;
23+
/// Default for [`StreamingConfig::streaming_runtime_memory_bytes`] when unset. **10 MiB** (pipeline buffers, backpressure).
24+
pub const DEFAULT_STREAMING_RUNTIME_MEMORY_BYTES: u64 = 10 * 1024 * 1024;
2525

26-
/// Default for [`StreamingConfig::operator_state_store_memory_bytes`] when unset. **100 MiB.**
27-
pub const DEFAULT_OPERATOR_STATE_STORE_MEMORY_BYTES: u64 = 100 * 1024 * 1024;
26+
/// Default for [`StreamingConfig::operator_state_store_memory_bytes`] when unset. **5 MiB** per stateful operator cap.
27+
pub const DEFAULT_OPERATOR_STATE_STORE_MEMORY_BYTES: u64 = 5 * 1024 * 1024;
2828

2929
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
3030
pub struct StreamingConfig {
3131
#[serde(flatten)]
3232
pub job: StreamingJobConfig,
33-
/// Bytes reserved in the global memory pool for streaming execution (pipeline buffers,
34-
/// batch collect, backpressure).
33+
/// Bytes reserved in the global memory pool for streaming pipeline execution (buffers,
34+
/// batch collect, backpressure). Default 10 MiB.
3535
#[serde(default)]
3636
pub streaming_runtime_memory_bytes: Option<u64>,
37-
/// Per stateful operator: in-memory state store cap before spill.
37+
/// Per stateful operator: in-memory state store cap before spill. Default 5 MiB.
3838
#[serde(default)]
3939
pub operator_state_store_memory_bytes: Option<u64>,
4040
}

src/config/streaming_job.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,24 @@ use serde::{Deserialize, Serialize};
1414

1515
pub const DEFAULT_CHECKPOINT_INTERVAL_MS: u64 = 60 * 1000;
1616
pub const DEFAULT_PIPELINE_PARALLELISM: u32 = 1;
17+
pub const DEFAULT_KEY_BY_PARALLELISM: u32 = 1;
1718

1819
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1920
pub struct StreamingJobConfig {
2021
#[serde(default)]
2122
pub checkpoint_interval_ms: Option<u64>,
2223
#[serde(default)]
2324
pub pipeline_parallelism: Option<u32>,
25+
/// Physical parallelism for KeyBy / key-extraction operators in planned streaming graphs.
26+
#[serde(default)]
27+
pub key_by_parallelism: Option<u32>,
2428
}
2529

2630
#[derive(Debug, Clone, Copy)]
2731
pub struct ResolvedStreamingJobConfig {
2832
pub checkpoint_interval_ms: u64,
2933
pub pipeline_parallelism: u32,
34+
pub key_by_parallelism: u32,
3035
}
3136

3237
impl StreamingJobConfig {
@@ -40,6 +45,10 @@ impl StreamingJobConfig {
4045
.pipeline_parallelism
4146
.filter(|&p| p > 0)
4247
.unwrap_or(DEFAULT_PIPELINE_PARALLELISM),
48+
key_by_parallelism: self
49+
.key_by_parallelism
50+
.filter(|&p| p > 0)
51+
.unwrap_or(DEFAULT_KEY_BY_PARALLELISM),
4352
}
4453
}
4554
}

src/coordinator/execution/executor.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,11 +323,16 @@ impl PlanVisitor for Executor {
323323
let execute = || -> Result<ExecuteResult, ExecuteError> {
324324
let mut fs_program: FsProgram = plan.program.clone().into();
325325
let job_manager: Arc<JobManager> = Arc::clone(&self.job_manager);
326-
let pipeline_parallelism = parse_pipeline_parallelism(plan.with_options.as_ref())
327-
.unwrap_or_else(|| job_manager.default_pipeline_parallelism())
328-
.max(1);
329-
for node in &mut fs_program.nodes {
330-
node.parallelism = pipeline_parallelism;
326+
// Only override per-node parallelism when CREATE STREAMING TABLE specifies
327+
// `WITH (parallelism = N)`. Otherwise keep planner-assigned values (e.g. keyed
328+
// aggregates defaulting to a higher parallelism than the job-wide default).
329+
if let Some(pipeline_parallelism) =
330+
parse_pipeline_parallelism(plan.with_options.as_ref())
331+
{
332+
let p = pipeline_parallelism.max(1);
333+
for node in &mut fs_program.nodes {
334+
node.parallelism = p;
335+
}
331336
}
332337

333338
let job_id = plan.name.clone();

src/server/initializer.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ use crate::config::GlobalConfig;
1919

2020
pub type InitializerFn = fn(&GlobalConfig) -> Result<()>;
2121

22+
fn initialize_streaming_sql_planning(config: &GlobalConfig) -> Result<()> {
23+
let job = config.streaming.resolved_job();
24+
crate::sql::planning_runtime::install_sql_planning_from_streaming_job(&job);
25+
Ok(())
26+
}
27+
2228
#[derive(Clone)]
2329
pub struct Component {
2430
pub name: &'static str,
@@ -94,6 +100,7 @@ impl ComponentRegistry {
94100
pub fn build_core_registry() -> ComponentRegistry {
95101
let builder = {
96102
let b = ComponentRegistryBuilder::new()
103+
.register("StreamingSqlPlanning", initialize_streaming_sql_planning)
97104
.register("WasmCache", initialize_wasm_cache)
98105
.register("TaskManager", initialize_task_manager)
99106
.register("MemoryService", initialize_memory_service)

src/sql/common/constants.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ pub mod sql_field {
108108

109109
pub mod sql_planning_default {
110110
pub const DEFAULT_PARALLELISM: usize = 1;
111+
/// Default physical parallelism for `KeyBy` / key-extraction pipelines (configurable via YAML).
112+
pub const DEFAULT_KEY_BY_PARALLELISM: usize = 1;
111113
/// Parallelism for aggregations that run after `KeyBy` / shuffle on non-empty routing keys.
112114
pub const KEYED_AGGREGATE_DEFAULT_PARALLELISM: usize = 8;
113115
pub const PLANNING_TTL_SECS: u64 = 24 * 60 * 60;

src/sql/logical_node/aggregate.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,11 @@ multifield_partial_ord!(
6464
);
6565

6666
impl StreamWindowAggregateNode {
67+
/// This node is only emitted after `KeyExtractionNode` in streaming rewrites; `partition_keys`
68+
/// may be empty when GROUP BY is only a window call (window column stripped from key list),
69+
/// but the pipeline still consumes a shuffle — use keyed aggregate parallelism.
6770
fn parallelism_after_keyed_shuffle(&self, planner: &Planner) -> usize {
68-
if self.partition_keys.is_empty() {
69-
planner.default_parallelism()
70-
} else {
71-
planner.keyed_aggregate_parallelism()
72-
}
71+
planner.keyed_aggregate_parallelism()
7372
}
7473

7574
/// Safely constructs a new node, computing the final projection without panicking.

src/sql/logical_node/key_calculation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ impl StreamingOperatorBlueprint for KeyExtractionNode {
238238
engine_operator_name,
239239
protobuf_payload,
240240
format!("Key<{}>", self.operator_label.as_deref().unwrap_or("_")),
241-
planner.default_parallelism(),
241+
planner.key_by_parallelism(),
242242
);
243243

244244
let data_edge =

src/sql/logical_node/logical/operator_chain.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,17 @@ impl OperatorChain {
128128
pub fn is_sink(&self) -> bool {
129129
self.operators[0].operator_name == OperatorName::ConnectorSink
130130
}
131+
132+
/// Operators safe to run at a higher upstream `TaskContext::parallelism` when fused after a
133+
/// stateful node (e.g. window aggregate @ 8 → projection @ 1).
134+
pub fn is_parallelism_upstream_expandable(&self) -> bool {
135+
self.operators.iter().all(|op| {
136+
matches!(
137+
op.operator_name,
138+
OperatorName::Projection
139+
| OperatorName::Value
140+
| OperatorName::ExpressionWatermark
141+
)
142+
})
143+
}
131144
}

src/sql/logical_node/updating_aggregate.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,7 @@ impl StreamingOperatorBlueprint for ContinuousAggregateNode {
218218

219219
let operator_config = self.compile_operator_config(planner, &upstream_schema)?;
220220

221-
let parallelism = if self.partition_key_indices.is_empty() {
222-
planner.default_parallelism()
223-
} else {
224-
planner.keyed_aggregate_parallelism()
225-
};
221+
let parallelism = planner.keyed_aggregate_parallelism();
226222

227223
let logical_node = LogicalNode::single(
228224
node_index as u32,

0 commit comments

Comments
 (0)