Skip to content

Commit 87f7722

Browse files
committed
update
1 parent 174ebaa commit 87f7722

37 files changed

Lines changed: 983 additions & 998 deletions

protocol/proto/fs_api.proto

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,131 @@ package fs_api;
88

99
message ConnectorOp {
1010
string connector = 1;
11-
string config = 2;
12-
string description = 3;
11+
reserved 2; // removed: map<string, string> config_map
12+
optional FsSchema fs_schema = 3;
13+
string name = 4;
14+
string description = 5;
15+
16+
oneof config {
17+
KafkaSourceConfig kafka_source = 6;
18+
KafkaSinkConfig kafka_sink = 7;
19+
GenericConnectorConfig generic = 8;
20+
}
21+
}
22+
23+
// ─────────────────────── Kafka Connector Configs ───────────────────────
24+
25+
message KafkaSourceConfig {
26+
string topic = 1;
27+
string bootstrap_servers = 2;
28+
optional string group_id = 3;
29+
optional string group_id_prefix = 4;
30+
KafkaOffsetMode offset_mode = 5;
31+
KafkaReadMode read_mode = 6;
32+
KafkaAuthConfig auth = 7;
33+
map<string, string> client_configs = 8;
34+
FormatConfig format = 9;
35+
BadDataPolicy bad_data_policy = 10;
36+
uint32 rate_limit_msgs_per_sec = 11;
37+
optional string value_subject = 12;
38+
}
39+
40+
message KafkaSinkConfig {
41+
string topic = 1;
42+
string bootstrap_servers = 2;
43+
KafkaSinkCommitMode commit_mode = 3;
44+
optional string key_field = 4;
45+
optional string timestamp_field = 5;
46+
KafkaAuthConfig auth = 6;
47+
map<string, string> client_configs = 7;
48+
FormatConfig format = 8;
49+
optional string value_subject = 9;
50+
}
51+
52+
// Fallback for non-Kafka connectors that are not yet strongly typed.
53+
message GenericConnectorConfig {
54+
map<string, string> properties = 1;
55+
}
56+
57+
// ─────────────────────── Kafka Auth ───────────────────────
58+
59+
message KafkaAuthConfig {
60+
oneof auth {
61+
KafkaAuthNone none = 1;
62+
KafkaAuthSasl sasl = 2;
63+
KafkaAuthAwsMskIam aws_msk_iam = 3;
64+
}
65+
}
66+
67+
message KafkaAuthNone {}
68+
69+
message KafkaAuthSasl {
70+
string protocol = 1;
71+
string mechanism = 2;
72+
string username = 3;
73+
string password = 4;
74+
}
75+
76+
message KafkaAuthAwsMskIam {
77+
string region = 1;
78+
}
79+
80+
// ─────────────────────── Format & Data-Quality ───────────────────────
81+
82+
message FormatConfig {
83+
oneof format {
84+
JsonFormatConfig json = 1;
85+
RawStringFormatConfig raw_string = 2;
86+
RawBytesFormatConfig raw_bytes = 3;
87+
}
88+
}
89+
90+
message JsonFormatConfig {
91+
TimestampFormatProto timestamp_format = 1;
92+
DecimalEncodingProto decimal_encoding = 2;
93+
bool include_schema = 3;
94+
bool confluent_schema_registry = 4;
95+
optional uint32 schema_id = 5;
96+
bool debezium = 6;
97+
bool unstructured = 7;
98+
}
99+
100+
message RawStringFormatConfig {}
101+
message RawBytesFormatConfig {}
102+
103+
// ─────────────────────── Kafka Enums ───────────────────────
104+
105+
enum TimestampFormatProto {
106+
TIMESTAMP_RFC3339 = 0;
107+
TIMESTAMP_UNIX_MILLIS = 1;
108+
}
109+
110+
enum DecimalEncodingProto {
111+
DECIMAL_NUMBER = 0;
112+
DECIMAL_STRING = 1;
113+
DECIMAL_BYTES = 2;
114+
}
115+
116+
enum BadDataPolicy {
117+
BAD_DATA_FAIL = 0;
118+
BAD_DATA_DROP = 1;
119+
}
120+
121+
enum KafkaOffsetMode {
122+
KAFKA_OFFSET_EARLIEST = 0;
123+
KAFKA_OFFSET_LATEST = 1;
124+
KAFKA_OFFSET_GROUP = 2;
125+
}
126+
127+
enum KafkaReadMode {
128+
KAFKA_READ_DEFAULT = 0;
129+
KAFKA_READ_COMMITTED = 1;
130+
KAFKA_READ_UNCOMMITTED = 2;
131+
}
132+
133+
enum KafkaSinkCommitMode {
134+
KAFKA_SINK_AT_LEAST_ONCE = 0;
135+
KAFKA_SINK_EXACTLY_ONCE = 1;
13136
}
14137

15138
message ValuePlanOperator {

protocol/proto/storage.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@ message CatalogSourceTable {
3030
bytes arrow_schema_ipc = 1;
3131
optional string event_time_field = 2;
3232
optional string watermark_field = 3;
33-
// Original CREATE TABLE ... WITH ('k'='v', ...) pairs (best-effort; keys sorted in DDL).
33+
// Original CREATE TABLE ... WITH ('k'='v', ...) pairs — single source of truth.
3434
map<string, string> with_options = 4;
3535
// Canonical connector identifier (e.g. kafka, postgres-cdc).
3636
string connector = 5;
37+
reserved 6; // removed: string opaque_config (JSON blob no longer needed)
38+
// Human-readable note from DDL (ConnectorOp.description).
39+
string description = 7;
3740
}
3841

3942
// =============================================================================

src/common/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub use control::{
4848
pub use fs_schema::{FsSchema, FsSchemaRef};
4949
pub use errors::DataflowError;
5050
pub use formats::{BadData, Format, Framing, JsonFormat};
51-
pub use operator_config::{MetadataField, OperatorConfig, RateLimit};
51+
pub use operator_config::MetadataField;
5252

5353
// ── Well-known column names ──
5454
pub const TIMESTAMP_FIELD: &str = "_timestamp";

src/runtime/streaming/api/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ pub mod operator;
1616
pub mod source;
1717

1818
pub use context::TaskContext;
19-
pub use operator::{ConstructedOperator, MessageOperator};
19+
pub use operator::{ConstructedOperator, Operator};
2020
pub use source::{SourceEvent, SourceOffset, SourceOperator};

src/runtime/streaming/api/operator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ use crate::sql::common::{CheckpointBarrier, Watermark};
2424

2525
pub enum ConstructedOperator {
2626
Source(Box<dyn SourceOperator>),
27-
Operator(Box<dyn MessageOperator>),
27+
Operator(Box<dyn Operator>),
2828
}
2929

3030
#[async_trait]
31-
pub trait MessageOperator: Send + 'static {
31+
pub trait Operator: Send + 'static {
3232
fn name(&self) -> &str;
3333

3434
async fn on_start(&mut self, _ctx: &mut TaskContext) -> anyhow::Result<()> {

0 commit comments

Comments
 (0)