Skip to content

Commit 2e2c8c0

Browse files
committed
feat(sink): add mutation to add sink schema
1 parent 817a704 commit 2e2c8c0

File tree

33 files changed

+366
-68
lines changed

33 files changed

+366
-68
lines changed

proto/connector_service.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package connector_service;
55
import "catalog.proto";
66
import "common.proto";
77
import "plan_common.proto";
8+
import "stream_plan.proto";
89

910
option java_outer_classname = "ConnectorServiceProto";
1011
option java_package = "com.risingwave.proto";
@@ -227,6 +228,7 @@ message CoordinateRequest {
227228
message CommitRequest {
228229
uint64 epoch = 1;
229230
SinkMetadata metadata = 2;
231+
stream_plan.SinkAddColumns add_columns = 3;
230232
}
231233

232234
message UpdateVnodeBitmapRequest {

proto/stream_plan.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ message StopMutation {
3838
repeated uint32 actors = 1;
3939
}
4040

41+
message SinkAddColumns {
42+
repeated plan_common.Field fields = 1;
43+
}
44+
4145
message UpdateMutation {
4246
message DispatcherUpdate {
4347
// Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
@@ -79,6 +83,7 @@ message UpdateMutation {
7983
// When modifying the Materialized View, we need to recreate the Dispatcher from the old upstream to the new TableFragment.
8084
// Consistent with the semantics in AddMutation.
8185
map<uint32, Dispatchers> actor_new_dispatchers = 6;
86+
map<uint32, SinkAddColumns> sink_add_columns = 7;
8287
}
8388

8489
message SourceChangeSplitMutation {

src/bench/sink_bench/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ impl LogReader for MockRangeLogReader {
111111
is_checkpoint: true,
112112
new_vnode_bitmap: None,
113113
is_stop: false,
114+
add_columns: None,
114115
},
115116
))
116117
}

src/common/src/catalog/schema.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ impl Field {
5050
name: self.name.clone(),
5151
}
5252
}
53+
54+
pub fn from_prost(pb: &PbField) -> Self {
55+
Field {
56+
data_type: DataType::from(pb.data_type.as_ref().unwrap()),
57+
name: pb.name.clone(),
58+
}
59+
}
5360
}
5461

5562
impl From<&ColumnDesc> for Field {

src/connector/src/sink/boxed.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::ops::DerefMut;
1818
use async_trait::async_trait;
1919
use futures::FutureExt;
2020
use futures::future::BoxFuture;
21+
use risingwave_common::catalog::Field;
2122
use risingwave_pb::connector_service::SinkMetadata;
2223

2324
use super::SinkCommittedEpochSubscriber;
@@ -107,7 +108,12 @@ impl SinkCommitCoordinator for BoxCoordinator {
107108
self.deref_mut().init(subscriber).await
108109
}
109110

110-
async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> crate::sink::Result<()> {
111-
self.deref_mut().commit(epoch, metadata).await
111+
async fn commit(
112+
&mut self,
113+
epoch: u64,
114+
metadata: Vec<SinkMetadata>,
115+
add_columns: Option<Vec<Field>>,
116+
) -> crate::sink::Result<()> {
117+
self.deref_mut().commit(epoch, metadata, add_columns).await
112118
}
113119
}

src/connector/src/sink/coordinate.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for Coordin
194194
is_checkpoint,
195195
new_vnode_bitmap,
196196
is_stop,
197+
add_columns,
197198
} => {
198199
let prev_epoch = match state {
199200
LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
@@ -204,6 +205,7 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for Coordin
204205
if current_checkpoint >= commit_checkpoint_interval.get()
205206
|| new_vnode_bitmap.is_some()
206207
|| is_stop
208+
|| add_columns.is_some()
207209
{
208210
let start_time = Instant::now();
209211
let metadata = sink_writer.barrier(true).await?;
@@ -212,7 +214,16 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for Coordin
212214
"should get metadata on checkpoint barrier"
213215
))
214216
})?;
215-
coordinator_stream_handle.commit(epoch, metadata).await?;
217+
if add_columns.is_some() {
218+
assert!(
219+
is_stop,
220+
"add columns should stop current sink for sink {}",
221+
self.param.sink_id
222+
);
223+
}
224+
coordinator_stream_handle
225+
.commit(epoch, metadata, add_columns)
226+
.await?;
216227
sink_writer_metrics
217228
.sink_commit_duration
218229
.observe(start_time.elapsed().as_millis() as f64);

src/connector/src/sink/deltalake/imp.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use phf::{Set, phf_set};
3131
use risingwave_common::array::StreamChunk;
3232
use risingwave_common::array::arrow::DeltaLakeConvert;
3333
use risingwave_common::bail;
34-
use risingwave_common::catalog::Schema;
34+
use risingwave_common::catalog::{Field, Schema};
3535
use risingwave_common::types::DataType;
3636
use risingwave_common::util::iter_util::ZipEqDebug;
3737
use risingwave_pb::connector_service::SinkMetadata;
@@ -535,8 +535,20 @@ impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
535535
Ok(None)
536536
}
537537

538-
async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
538+
async fn commit(
539+
&mut self,
540+
epoch: u64,
541+
metadata: Vec<SinkMetadata>,
542+
add_columns: Option<Vec<Field>>,
543+
) -> Result<()> {
539544
tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
545+
if let Some(add_columns) = add_columns {
546+
return Err(anyhow!(
547+
"Delta lake sink not support add columns, but got: {:?}",
548+
add_columns
549+
)
550+
.into());
551+
}
540552

541553
let deltalake_write_result = metadata
542554
.iter()
@@ -682,7 +694,7 @@ mod test {
682694
table: deltalake_table,
683695
};
684696
let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
685-
committer.commit(1, vec![metadata]).await.unwrap();
697+
committer.commit(1, vec![metadata], None).await.unwrap();
686698

687699
// The following code is to test reading the deltalake data table written with test data.
688700
// To enable the following code, add `deltalake = { workspace = true, features = ["datafusion"] }`

src/connector/src/sink/iceberg/mod.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArr
6161
use risingwave_common::array::{Op, StreamChunk};
6262
use risingwave_common::bail;
6363
use risingwave_common::bitmap::Bitmap;
64-
use risingwave_common::catalog::Schema;
64+
use risingwave_common::catalog::{Field, Schema};
6565
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
6666
use risingwave_common_estimate_size::EstimateSize;
6767
use risingwave_pb::connector_service::SinkMetadata;
@@ -1639,8 +1639,21 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
16391639
return Ok(None);
16401640
}
16411641

1642-
async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1642+
async fn commit(
1643+
&mut self,
1644+
epoch: u64,
1645+
metadata: Vec<SinkMetadata>,
1646+
add_columns: Option<Vec<Field>>,
1647+
) -> Result<()> {
16431648
tracing::info!("Starting iceberg commit in epoch {epoch}.");
1649+
if let Some(add_columns) = add_columns {
1650+
return Err(anyhow!(
1651+
"Iceberg sink not support add columns, but got: {:?}",
1652+
add_columns
1653+
)
1654+
.into());
1655+
}
1656+
16441657
let write_results: Vec<IcebergCommitResult> = metadata
16451658
.iter()
16461659
.map(IcebergCommitResult::try_from)
@@ -2066,7 +2079,6 @@ mod test {
20662079
use std::collections::BTreeMap;
20672080

20682081
use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2069-
use risingwave_common::catalog::Field;
20702082
use risingwave_common::types::{DataType, MapType, StructType};
20712083

20722084
use crate::connector_common::IcebergCommon;

src/connector/src/sink/log_store.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use futures::{TryFuture, TryFutureExt};
2828
use risingwave_common::array::StreamChunk;
2929
use risingwave_common::bail;
3030
use risingwave_common::bitmap::Bitmap;
31+
use risingwave_common::catalog::Field;
3132
use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
3233
use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
3334
use risingwave_common_estimate_size::EstimateSize;
@@ -122,6 +123,7 @@ pub enum LogStoreReadItem {
122123
is_checkpoint: bool,
123124
new_vnode_bitmap: Option<Arc<Bitmap>>,
124125
is_stop: bool,
126+
add_columns: Option<Vec<Field>>,
125127
},
126128
}
127129

@@ -146,6 +148,7 @@ pub struct FlushCurrentEpochOptions {
146148
pub is_checkpoint: bool,
147149
pub new_vnode_bitmap: Option<Arc<Bitmap>>,
148150
pub is_stop: bool,
151+
pub add_columns: Option<Vec<Field>>,
149152
}
150153

151154
pub trait LogWriter: Send {

src/connector/src/sink/mock_coordination_client.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl MockSinkCoordinationRpcClient {
119119
match receiver_stream.try_recv() {
120120
Ok(CoordinateRequest {
121121
msg:
122-
Some(risingwave_pb::connector_service::coordinate_request::Msg::StartRequest(
122+
Some(coordinate_request::Msg::StartRequest(
123123
coordinate_request::StartCoordinationRequest {
124124
param: Some(_param),
125125
vnode_bitmap: Some(_vnode_bitmap),
@@ -155,21 +155,36 @@ impl MockSinkCoordinationRpcClient {
155155
match receiver_stream.recv().await {
156156
Some(CoordinateRequest {
157157
msg:
158-
Some(risingwave_pb::connector_service::coordinate_request::Msg::CommitRequest(coordinate_request::CommitRequest {
159-
epoch,
160-
metadata,
161-
})),
158+
Some(coordinate_request::Msg::CommitRequest(
159+
coordinate_request::CommitRequest {
160+
epoch, metadata, ..
161+
},
162+
)),
162163
}) => {
163-
mock_coordinator_committer.clone().lock().await.commit(epoch, vec![metadata.unwrap()]).await.map_err(|e| Status::from_error(Box::new(e)))?;
164-
response_tx_clone.clone().send(Ok(CoordinateResponse {
165-
msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse{epoch})),
166-
})).await.map_err(|e| Status::from_error(Box::new(e)))?;
167-
},
164+
mock_coordinator_committer
165+
.clone()
166+
.lock()
167+
.await
168+
.commit(epoch, vec![metadata.unwrap()], None)
169+
.await
170+
.map_err(|e| Status::from_error(Box::new(e)))?;
171+
response_tx_clone
172+
.clone()
173+
.send(Ok(CoordinateResponse {
174+
msg: Some(coordinate_response::Msg::CommitResponse(
175+
CommitResponse { epoch },
176+
)),
177+
}))
178+
.await
179+
.map_err(|e| Status::from_error(Box::new(e)))?;
180+
}
168181
msg => {
169-
return Err::<ReceiverStream<CoordinateResponse>, tonic::Status>(Status::invalid_argument(format!(
170-
"expected CoordinateRequest::CommitRequest , get {:?}",
171-
msg
172-
)));
182+
return Err::<ReceiverStream<CoordinateResponse>, tonic::Status>(
183+
Status::invalid_argument(format!(
184+
"expected CoordinateRequest::CommitRequest , get {:?}",
185+
msg
186+
)),
187+
);
173188
}
174189
}
175190
}

0 commit comments

Comments
 (0)