Skip to content

Commit a9ba1c1

Browse files
committed
style: apply rustfmt across workspace crates
Run rustfmt on CLI, coordinator, catalog_storage, streaming_planner, streaming_runtime, and wasm_runtime so cargo fmt --check passes for the main binary and CLI packages.
1 parent 3271589 commit a9ba1c1

72 files changed

Lines changed: 138 additions & 184 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cli/cli/src/repl.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use arrow_ipc::reader::StreamReader;
1717
use arrow_schema::DataType;
1818
use comfy_table::presets::UTF8_FULL;
1919
use comfy_table::{Attribute, Cell, Color, ContentArrangement, Table, TableComponent};
20-
use protocol::cli::{function_stream_service_client::FunctionStreamServiceClient, SqlRequest};
20+
use protocol::cli::{SqlRequest, function_stream_service_client::FunctionStreamServiceClient};
2121
use rustyline::error::ReadlineError;
2222
use rustyline::{Config, DefaultEditor, EditMode};
2323
use std::fmt;
@@ -243,11 +243,7 @@ impl Repl {
243243
}
244244
}
245245

246-
if has_rows {
247-
Ok(Some(table))
248-
} else {
249-
Ok(None)
250-
}
246+
if has_rows { Ok(Some(table)) } else { Ok(None) }
251247
}
252248

253249
fn extract_value(&self, column: &dyn Array, row: usize) -> String {
@@ -317,7 +313,7 @@ impl Repl {
317313

318314
#[cfg(unix)]
319315
let mut sigterm = {
320-
use tokio::signal::unix::{signal, SignalKind};
316+
use tokio::signal::unix::{SignalKind, signal};
321317
signal(SignalKind::terminate()).expect("failed to register SIGTERM handler")
322318
};
323319

@@ -403,9 +399,7 @@ impl Repl {
403399
println!();
404400
}
405401

406-
if !skip_save_history
407-
&& let Some(ref mut ed) = repl.lock().await.editor
408-
{
402+
if !skip_save_history && let Some(ref mut ed) = repl.lock().await.editor {
409403
let _ = ed.save_history(".function-stream-cli-history");
410404
}
411405
Ok(())

src/catalog_storage/src/task/rocksdb_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use super::proto_codec::{
1919
encode_task_module_bytes,
2020
};
2121
use super::storage::{StoredTaskInfo, TaskStorage};
22-
use crate::config::storage::RocksDBStorageConfig;
2322
use crate::common::ComponentState;
23+
use crate::config::storage::RocksDBStorageConfig;
2424
use anyhow::{Context, Result, anyhow};
2525
use rocksdb::{ColumnFamilyDescriptor, DB, IteratorMode, Options, WriteBatch};
2626
use std::path::Path;

src/coordinator/src/coordinator_body.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ mod execution;
1818
mod execution_context;
1919
mod plan;
2020
mod runtime_context;
21-
mod statement;
2221
mod sql_classify;
22+
mod statement;
2323
mod streaming_table_options;
2424
mod tool;
2525

src/coordinator/src/execution/executor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ use crate::coordinator::statement::{ConfigSource, FunctionSource};
3131
use crate::coordinator::streaming_table_options::{
3232
parse_checkpoint_interval_ms, parse_pipeline_parallelism,
3333
};
34-
use crate::streaming::job::JobManager;
35-
use crate::streaming::protocol::control::StopMode;
36-
use crate::wasm::taskexecutor::TaskManager;
3734
use crate::sql::schema::catalog::ExternalTable;
3835
use crate::sql::schema::show_create_catalog_table;
3936
use crate::sql::schema::table::CatalogEntity;
4037
use crate::stream_catalog::CatalogManager;
38+
use crate::streaming::job::JobManager;
39+
use crate::streaming::protocol::control::StopMode;
40+
use crate::wasm::taskexecutor::TaskManager;
4141

4242
#[derive(Error, Debug)]
4343
pub enum ExecuteError {

src/coordinator/src/runtime_context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ use std::sync::Arc;
1616

1717
use anyhow::Result;
1818

19-
use crate::streaming::job::JobManager;
20-
use crate::wasm::taskexecutor::TaskManager;
2119
use crate::sql::schema::StreamSchemaProvider;
2220
use crate::stream_catalog::CatalogManager;
21+
use crate::streaming::job::JobManager;
22+
use crate::wasm::taskexecutor::TaskManager;
2323

2424
/// Dependencies shared by analyze / plan / execute, analogous to installing globals in
2525
/// [`TaskManager`], [`CatalogManager`], and [`JobManager`].

src/coordinator/src/sql_classify.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
1616
use std::collections::HashMap;
1717

18-
use datafusion::common::{plan_err, Result};
18+
use datafusion::common::{Result, plan_err};
1919
use datafusion::error::DataFusionError;
2020
use datafusion::sql::sqlparser::ast::{
2121
ObjectType, ShowCreateObject, SqlOption, Statement as DFStatement,
@@ -50,9 +50,9 @@ pub fn classify_statement(stmt: DFStatement) -> Result<Box<dyn Statement>> {
5050
DFStatement::ShowStreamingTable => Ok(Box::new(ShowStreamingTables::new())),
5151
DFStatement::ShowCreate { obj_type, obj_name } => match obj_type {
5252
ShowCreateObject::Table => Ok(Box::new(ShowCreateTable::new(obj_name.to_string()))),
53-
ShowCreateObject::StreamingTable => {
54-
Ok(Box::new(ShowCreateStreamingTable::new(obj_name.to_string())))
55-
}
53+
ShowCreateObject::StreamingTable => Ok(Box::new(ShowCreateStreamingTable::new(
54+
obj_name.to_string(),
55+
))),
5656
_ => plan_err!(
5757
"SHOW CREATE {obj_type} is not supported; use SHOW CREATE TABLE or SHOW CREATE STREAMING TABLE <name>"
5858
),
@@ -88,11 +88,12 @@ pub fn classify_statement(stmt: DFStatement) -> Result<Box<dyn Statement>> {
8888
}
8989
let table_name = names[0].to_string();
9090
Ok(Box::new(DropStreamingTableStatement::new(
91-
table_name,
92-
*if_exists,
91+
table_name, *if_exists,
9392
)))
9493
}
95-
_ => plan_err!("Only DROP TABLE and DROP STREAMING TABLE are supported in this SQL frontend"),
94+
_ => plan_err!(
95+
"Only DROP TABLE and DROP STREAMING TABLE are supported in this SQL frontend"
96+
),
9697
}
9798
}
9899
DFStatement::Insert { .. } => plan_err!(

src/streaming_planner/src/common/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ pub mod topology;
3333
pub mod with_option_keys;
3434

3535
// ── Re-exports from existing modules ──
36-
pub use function_stream_runtime_common::streaming_protocol::{CheckpointBarrier, Watermark};
3736
pub use arrow_ext::FsExtensionType;
37+
pub use function_stream_runtime_common::streaming_protocol::{CheckpointBarrier, Watermark};
3838
pub use time_utils::{from_nanos, to_micros, to_millis, to_nanos};
3939

4040
// ── Re-exports from new modules ──

src/streaming_planner/src/connector/sink/runtime_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ use std::collections::HashMap;
1414

1515
use datafusion::common::{DataFusionError, Result, plan_err};
1616

17+
use crate::common::connector_options::ConnectorOptions;
18+
use crate::common::with_option_keys as opt;
1719
use function_stream_config::global_config::{
1820
DEFAULT_OPERATOR_STATE_STORE_MEMORY_BYTES, DEFAULT_SINK_BUFFER_MEMORY_BYTES,
1921
};
2022
use function_stream_config::streaming_job::DEFAULT_CHECKPOINT_INTERVAL_MS;
21-
use crate::common::connector_options::ConnectorOptions;
22-
use crate::common::with_option_keys as opt;
2323

2424
#[derive(Debug, Clone, PartialEq, Eq, Default)]
2525
pub struct SinkRuntimeConfig {

src/streaming_planner/src/logical_node/aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ use protocol::function_stream_graph::{
3030
SessionWindowAggregateOperator, SlidingWindowAggregateOperator, TumblingWindowAggregateOperator,
3131
};
3232

33-
use crate::multifield_partial_ord;
3433
use crate::common::constants::{extension_node, proto_operator_name};
3534
use crate::common::{FsSchema, FsSchemaRef};
3635
use crate::logical_node::logical::{LogicalEdge, LogicalEdgeType, LogicalNode, OperatorName};
3736
use crate::logical_node::{
3837
CompiledTopologyNode, StreamingOperatorBlueprint, SystemTimestampInjectorNode,
3938
};
4039
use crate::logical_planner::planner::{NamedNode, Planner, SplitPlanOutput};
40+
use crate::multifield_partial_ord;
4141
use crate::physical::{StreamingExtensionCodec, window};
4242
use crate::types::{
4343
QualifiedField, TIMESTAMP_FIELD, WindowBehavior, WindowType, build_df_schema,

src/streaming_planner/src/logical_node/async_udf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
2424
use prost::Message;
2525
use protocol::function_stream_graph::{AsyncUdfOperator, AsyncUdfOrdering};
2626

27-
use crate::multifield_partial_ord;
2827
use crate::common::constants::extension_node;
2928
use crate::common::constants::sql_field;
3029
use crate::common::{FsSchema, FsSchemaRef};
@@ -35,6 +34,7 @@ use crate::logical_node::streaming_operator_blueprint::{
3534
CompiledTopologyNode, StreamingOperatorBlueprint,
3635
};
3736
use crate::logical_planner::planner::{NamedNode, Planner};
37+
use crate::multifield_partial_ord;
3838
use crate::types::{QualifiedField, build_df_schema, extract_qualified_fields};
3939

4040
pub const NODE_TYPE_NAME: &str = extension_node::ASYNC_FUNCTION_EXECUTION;

0 commit comments

Comments
 (0)