Skip to content

Commit 5b596f2

Browse files
committed
update
1 parent 9b41175 commit 5b596f2

33 files changed

Lines changed: 300 additions & 231 deletions

Cargo.lock

Lines changed: 0 additions & 53 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ protocol = { path = "./protocol" }
3838
prost = "0.13"
3939
rdkafka = { version = "0.38", features = ["cmake-build", "ssl", "gssapi"] }
4040
crossbeam-channel = "0.5"
41-
pest = "2.7"
42-
pest_derive = "2.7"
43-
clap = { version = "4.5", features = ["derive"] }
4441
wasmtime = { version = "41.0.3", features = ["component-model", "async"] }
4542
base64 = "0.22"
4643
wasmtime-wasi = "41.0.3"
@@ -63,7 +60,6 @@ petgraph = "0.7"
6360
rand = { version = "0.8", features = ["small_rng"] }
6461
itertools = "0.14"
6562
strum = { version = "0.26", features = ["derive"] }
66-
datafusion-functions-aggregate = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
6763

6864
typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
6965
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '55.2.0/parquet'}

src/coordinator/execution/executor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,9 @@ impl PlanVisitor for Executor {
284284
let fs_program: FsProgram = plan.program.clone().into();
285285
let job_manager: Arc<JobManager> = Arc::clone(&self.job_manager);
286286

287+
let job_id = plan.name.clone();
287288
let job_id = tokio::task::block_in_place(|| {
288-
tokio::runtime::Handle::current().block_on(job_manager.submit_job(fs_program))
289+
tokio::runtime::Handle::current().block_on(job_manager.submit_job(job_id, fs_program))
289290
})
290291
.map_err(|e| ExecuteError::Internal(format!("Failed to submit streaming job: {e}")))?;
291292

src/coordinator/plan/logical_plan_visitor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ impl LogicalPlanVisitor {
155155
Ok(StreamingTable {
156156
name: sink_table_name,
157157
comment: comment.clone(),
158-
source_table: sink_definition,
159158
program: validated_program,
160159
})
161160
}

src/coordinator/plan/streaming_table_plan.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,12 @@
1212

1313
use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
1414
use crate::sql::logical_node::logical::LogicalProgram;
15-
use crate::sql::schema::source_table::SourceTable;
1615

1716
/// Plan node representing a fully resolved streaming table (DDL).
1817
#[derive(Debug)]
1918
pub struct StreamingTable {
2019
pub name: String,
2120
pub comment: Option<String>,
22-
pub source_table: SourceTable,
2321
pub program: LogicalProgram,
2422
}
2523

src/runtime/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
pub mod buffer_and_event;
1616
pub mod common;
1717
pub mod streaming;
18+
pub mod util;
1819
pub mod task;
1920
pub mod taskexecutor;
2021
pub mod wasm;

src/runtime/streaming/api/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ pub mod operator;
1717
pub mod source;
1818

1919
pub use context::TaskContext;
20-
pub use operator::{ConstructedOperator, MessageOperator, Registry};
20+
pub use operator::{ConstructedOperator, MessageOperator};
2121
pub use source::{SourceEvent, SourceOffset, SourceOperator};

src/runtime/streaming/api/operator.rs

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -15,68 +15,9 @@ use crate::runtime::streaming::api::source::SourceOperator;
1515
use crate::runtime::streaming::protocol::stream_out::StreamOutput;
1616
use arrow_array::RecordBatch;
1717
use async_trait::async_trait;
18-
use datafusion::common::Result as DfResult;
19-
use datafusion::execution::context::SessionContext;
20-
use datafusion::execution::FunctionRegistry;
21-
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
22-
use datafusion::logical_expr::planner::ExprPlanner;
23-
use std::collections::HashSet;
24-
use std::sync::Arc;
2518
use std::time::Duration;
2619
use crate::sql::common::{CheckpointBarrier, Watermark};
2720

28-
// ---------------------------------------------------------------------------
29-
// Registry — 算子 / UDF 注册表(取代 tracing_subscriber::Registry)
30-
// ---------------------------------------------------------------------------
31-
32-
/// 运行时函数与状态注册表。
33-
///
34-
/// 包装 DataFusion [`SessionContext`],为物理计划反序列化提供 UDF / UDAF / UDWF 查询能力。
35-
/// `Arc<Registry>` 在工厂中创建后,由各构造器共享。
36-
pub struct Registry {
37-
ctx: SessionContext,
38-
}
39-
40-
impl Registry {
41-
pub fn new() -> Self {
42-
Self {
43-
ctx: SessionContext::new(),
44-
}
45-
}
46-
47-
pub fn session_context(&self) -> &SessionContext {
48-
&self.ctx
49-
}
50-
}
51-
52-
impl Default for Registry {
53-
fn default() -> Self {
54-
Self::new()
55-
}
56-
}
57-
58-
impl FunctionRegistry for Registry {
59-
fn udfs(&self) -> HashSet<String> {
60-
self.ctx.udfs()
61-
}
62-
63-
fn udf(&self, name: &str) -> DfResult<Arc<ScalarUDF>> {
64-
self.ctx.udf(name)
65-
}
66-
67-
fn udaf(&self, name: &str) -> DfResult<Arc<AggregateUDF>> {
68-
self.ctx.udaf(name)
69-
}
70-
71-
fn udwf(&self, name: &str) -> DfResult<Arc<WindowUDF>> {
72-
self.ctx.udwf(name)
73-
}
74-
75-
fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
76-
self.ctx.expr_planners()
77-
}
78-
}
79-
8021
// ---------------------------------------------------------------------------
8122
// ConstructedOperator
8223
// ---------------------------------------------------------------------------
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
//! Source / Sink 连接器协议:按 [`ConnectorOp::connector`] 分发到具体实现。
14+
15+
use anyhow::{anyhow, Result};
16+
use prost::Message;
17+
use std::sync::Arc;
18+
19+
use protocol::grpc::api::ConnectorOp;
20+
21+
use crate::runtime::streaming::api::operator::ConstructedOperator;
22+
use crate::runtime::streaming::factory::global::Registry;
23+
use crate::runtime::streaming::factory::operator_constructor::OperatorConstructor;
24+
use crate::sql::common::constants::connector_type;
25+
26+
use super::kafka::{KafkaSinkDispatcher, KafkaSourceDispatcher};
27+
28+
pub struct ConnectorSourceDispatcher;
29+
30+
impl OperatorConstructor for ConnectorSourceDispatcher {
31+
fn with_config(&self, config: &[u8], registry: Arc<Registry>) -> Result<ConstructedOperator> {
32+
let op = ConnectorOp::decode(config)
33+
.map_err(|e| anyhow!("decode ConnectorOp (source): {e}"))?;
34+
35+
match op.connector.as_str() {
36+
ct if ct == connector_type::KAFKA => KafkaSourceDispatcher.with_config(config, registry),
37+
ct if ct == connector_type::REDIS => Err(anyhow!(
38+
"ConnectorSource '{}' factory wiring not yet implemented",
39+
op.connector
40+
)),
41+
other => Err(anyhow!("Unsupported source connector type: {}", other)),
42+
}
43+
}
44+
}
45+
46+
pub struct ConnectorSinkDispatcher;
47+
48+
impl OperatorConstructor for ConnectorSinkDispatcher {
49+
fn with_config(&self, config: &[u8], registry: Arc<Registry>) -> Result<ConstructedOperator> {
50+
let op = ConnectorOp::decode(config)
51+
.map_err(|e| anyhow!("decode ConnectorOp (sink): {e}"))?;
52+
53+
match op.connector.as_str() {
54+
ct if ct == connector_type::KAFKA => KafkaSinkDispatcher.with_config(config, registry),
55+
other => Err(anyhow!("Unsupported sink connector type: {}", other)),
56+
}
57+
}
58+
}

src/runtime/streaming/factory/registry/kafka_factory.rs renamed to src/runtime/streaming/factory/connector/kafka.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ use std::sync::Arc;
2121
use protocol::grpc::api::ConnectorOp;
2222
use tracing::{info, warn};
2323

24-
use super::OperatorConstructor;
25-
use crate::runtime::streaming::api::operator::{ConstructedOperator, Registry};
24+
use crate::runtime::streaming::api::operator::ConstructedOperator;
2625
use crate::runtime::streaming::api::source::SourceOffset;
26+
use crate::runtime::streaming::factory::global::Registry;
27+
use crate::runtime::streaming::factory::operator_constructor::OperatorConstructor;
2728
use crate::runtime::streaming::format::{
2829
BadDataPolicy, DataSerializer, DecimalEncoding as RtDecimalEncoding, Format as RuntimeFormat,
2930
JsonFormat as RuntimeJsonFormat, TimestampFormat as RtTimestampFormat,
@@ -332,10 +333,3 @@ impl OperatorConstructor for KafkaSinkDispatcher {
332333
Ok(ConstructedOperator::Operator(Box::new(sink_op)))
333334
}
334335
}
335-
336-
/// 注册 `KafkaSource` / `KafkaSink` 构造器(由 [`super::OperatorFactory::register_builtins`] 调用)。
337-
pub fn register_kafka_plugins(factory: &mut super::OperatorFactory) {
338-
factory.register("KafkaSource", Box::new(KafkaSourceDispatcher));
339-
factory.register("KafkaSink", Box::new(KafkaSinkDispatcher));
340-
info!("Registered Kafka connector plugins (KafkaSource, KafkaSink)");
341-
}

0 commit comments

Comments
 (0)