Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package connector_service;
import "catalog.proto";
import "common.proto";
import "plan_common.proto";
import "stream_plan.proto";

option java_outer_classname = "ConnectorServiceProto";
option java_package = "com.risingwave.proto";
Expand Down Expand Up @@ -227,6 +228,7 @@ message CoordinateRequest {
message CommitRequest {
uint64 epoch = 1;
SinkMetadata metadata = 2;
stream_plan.SinkAddColumns add_columns = 3;
}

message UpdateVnodeBitmapRequest {
Expand Down
5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ message StopMutation {
repeated uint32 dropped_sink_fragments = 2;
}

message SinkAddColumns {
repeated plan_common.Field fields = 1;
}

message UpdateMutation {
message DispatcherUpdate {
// Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
Expand Down Expand Up @@ -97,6 +101,7 @@ message UpdateMutation {
map<uint32, Dispatchers> actor_new_dispatchers = 6;
// CDC table snapshot splits
map<uint32, source.CdcTableSnapshotSplits> actor_cdc_table_snapshot_splits = 7;
map<uint32, SinkAddColumns> sink_add_columns = 8;
}

message SourceChangeSplitMutation {
Expand Down
1 change: 1 addition & 0 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl LogReader for MockRangeLogReader {
is_checkpoint: true,
new_vnode_bitmap: None,
is_stop: false,
add_columns: None,
},
))
}
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ impl Field {
name: self.name.clone(),
}
}

pub fn from_prost(pb: &PbField) -> Self {
Field {
data_type: DataType::from(pb.data_type.as_ref().unwrap()),
name: pb.name.clone(),
}
}
}

impl From<&ColumnDesc> for Field {
Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/sink/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::ops::DerefMut;
use async_trait::async_trait;
use futures::FutureExt;
use futures::future::BoxFuture;
use risingwave_common::catalog::Field;
use risingwave_pb::connector_service::SinkMetadata;

use super::SinkCommittedEpochSubscriber;
Expand Down Expand Up @@ -107,7 +108,12 @@ impl SinkCommitCoordinator for BoxCoordinator {
self.deref_mut().init(subscriber).await
}

async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> crate::sink::Result<()> {
self.deref_mut().commit(epoch, metadata).await
async fn commit(
&mut self,
epoch: u64,
metadata: Vec<SinkMetadata>,
add_columns: Option<Vec<Field>>,
) -> crate::sink::Result<()> {
self.deref_mut().commit(epoch, metadata, add_columns).await
}
}
13 changes: 12 additions & 1 deletion src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for Coordin
is_checkpoint,
new_vnode_bitmap,
is_stop,
add_columns,
} => {
let prev_epoch = match state {
LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
Expand All @@ -204,6 +205,7 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for Coordin
if current_checkpoint >= commit_checkpoint_interval.get()
|| new_vnode_bitmap.is_some()
|| is_stop
|| add_columns.is_some()
{
let start_time = Instant::now();
let metadata = sink_writer.barrier(true).await?;
Expand All @@ -212,7 +214,16 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for Coordin
"should get metadata on checkpoint barrier"
))
})?;
coordinator_stream_handle.commit(epoch, metadata).await?;
if add_columns.is_some() {
assert!(
is_stop,
"add columns should stop current sink for sink {}",
self.param.sink_id
);
}
coordinator_stream_handle
.commit(epoch, metadata, add_columns)
.await?;
sink_writer_metrics
.sink_commit_duration
.observe(start_time.elapsed().as_millis() as f64);
Expand Down
18 changes: 15 additions & 3 deletions src/connector/src/sink/deltalake/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use phf::{Set, phf_set};
use risingwave_common::array::StreamChunk;
use risingwave_common::array::arrow::DeltaLakeConvert;
use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_pb::connector_service::SinkMetadata;
Expand Down Expand Up @@ -535,8 +535,20 @@ impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
Ok(None)
}

async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
async fn commit(
&mut self,
epoch: u64,
metadata: Vec<SinkMetadata>,
add_columns: Option<Vec<Field>>,
) -> Result<()> {
tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
if let Some(add_columns) = add_columns {
return Err(anyhow!(
"Delta lake sink not support add columns, but got: {:?}",
add_columns
)
.into());
}

let deltalake_write_result = metadata
.iter()
Expand Down Expand Up @@ -682,7 +694,7 @@ mod test {
table: deltalake_table,
};
let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
committer.commit(1, vec![metadata]).await.unwrap();
committer.commit(1, vec![metadata], None).await.unwrap();

// The following code is to test reading the deltalake data table written with test data.
// To enable the following code, add `deltalake = { workspace = true, features = ["datafusion"] }`
Expand Down
18 changes: 15 additions & 3 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArr
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::connector_service::SinkMetadata;
Expand Down Expand Up @@ -1705,8 +1705,21 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
return Ok(None);
}

async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
async fn commit(
&mut self,
epoch: u64,
metadata: Vec<SinkMetadata>,
add_columns: Option<Vec<Field>>,
) -> Result<()> {
tracing::info!("Starting iceberg commit in epoch {epoch}.");
if let Some(add_columns) = add_columns {
return Err(anyhow!(
"Iceberg sink not support add columns, but got: {:?}",
add_columns
)
.into());
}

let write_results: Vec<IcebergCommitResult> = metadata
.iter()
.map(IcebergCommitResult::try_from)
Expand Down Expand Up @@ -2157,7 +2170,6 @@ mod test {
use std::collections::BTreeMap;

use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
use risingwave_common::catalog::Field;
use risingwave_common::types::{DataType, MapType, StructType};

use crate::connector_common::IcebergCommon;
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use futures::{TryFuture, TryFutureExt};
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Field;
use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
use risingwave_common_estimate_size::EstimateSize;
Expand Down Expand Up @@ -122,6 +123,7 @@ pub enum LogStoreReadItem {
is_checkpoint: bool,
new_vnode_bitmap: Option<Arc<Bitmap>>,
is_stop: bool,
add_columns: Option<Vec<Field>>,
},
}

Expand All @@ -146,6 +148,7 @@ pub struct FlushCurrentEpochOptions {
pub is_checkpoint: bool,
pub new_vnode_bitmap: Option<Arc<Bitmap>>,
pub is_stop: bool,
pub add_columns: Option<Vec<Field>>,
}

pub trait LogWriter: Send {
Expand Down
43 changes: 29 additions & 14 deletions src/connector/src/sink/mock_coordination_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl MockSinkCoordinationRpcClient {
match receiver_stream.try_recv() {
Ok(CoordinateRequest {
msg:
Some(risingwave_pb::connector_service::coordinate_request::Msg::StartRequest(
Some(coordinate_request::Msg::StartRequest(
coordinate_request::StartCoordinationRequest {
param: Some(_param),
vnode_bitmap: Some(_vnode_bitmap),
Expand Down Expand Up @@ -155,21 +155,36 @@ impl MockSinkCoordinationRpcClient {
match receiver_stream.recv().await {
Some(CoordinateRequest {
msg:
Some(risingwave_pb::connector_service::coordinate_request::Msg::CommitRequest(coordinate_request::CommitRequest {
epoch,
metadata,
})),
Some(coordinate_request::Msg::CommitRequest(
coordinate_request::CommitRequest {
epoch, metadata, ..
},
)),
}) => {
mock_coordinator_committer.clone().lock().await.commit(epoch, vec![metadata.unwrap()]).await.map_err(|e| Status::from_error(Box::new(e)))?;
response_tx_clone.clone().send(Ok(CoordinateResponse {
msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse{epoch})),
})).await.map_err(|e| Status::from_error(Box::new(e)))?;
},
mock_coordinator_committer
.clone()
.lock()
.await
.commit(epoch, vec![metadata.unwrap()], None)
.await
.map_err(|e| Status::from_error(Box::new(e)))?;
response_tx_clone
.clone()
.send(Ok(CoordinateResponse {
msg: Some(coordinate_response::Msg::CommitResponse(
CommitResponse { epoch },
)),
}))
.await
.map_err(|e| Status::from_error(Box::new(e)))?;
}
msg => {
return Err::<ReceiverStream<CoordinateResponse>, tonic::Status>(Status::invalid_argument(format!(
"expected CoordinateRequest::CommitRequest , get {:?}",
msg
)));
return Err::<ReceiverStream<CoordinateResponse>, tonic::Status>(
Status::invalid_argument(format!(
"expected CoordinateRequest::CommitRequest , get {:?}",
msg
)),
);
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,12 @@ pub trait SinkCommitCoordinator {
/// the set of metadata. The metadata is serialized into bytes, because the metadata is expected
/// to be passed between different gRPC node, so in this general trait, the metadata is
/// serialized bytes.
async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
async fn commit(
&mut self,
epoch: u64,
metadata: Vec<SinkMetadata>,
add_columns: Option<Vec<Field>>,
) -> Result<()>;
}

#[deprecated]
Expand All @@ -817,7 +822,12 @@ impl SinkCommitCoordinator for NoSinkCommitCoordinator {
unreachable!()
}

async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
async fn commit(
&mut self,
_epoch: u64,
_metadata: Vec<SinkMetadata>,
_add_columns: Option<Vec<Field>>,
) -> Result<()> {
unreachable!()
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use phf::phf_set;
use prost::Message;
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field};
use risingwave_common::global_jvm::JVM;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -697,7 +697,19 @@ impl SinkCommitCoordinator for RemoteCoordinator {
Ok(None)
}

async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
async fn commit(
&mut self,
epoch: u64,
metadata: Vec<SinkMetadata>,
add_columns: Option<Vec<Field>>,
) -> Result<()> {
if let Some(add_columns) = add_columns {
return Err(anyhow!(
"remote coordinator not support add columns, but got: {:?}",
add_columns
)
.into());
}
Ok(self.stream_handle.commit(epoch, metadata).await?)
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/sink/trivial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use phf::{Set, phf_set};
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use tracing::info;

use crate::enforce_secret::EnforceSecret;
use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
Expand Down Expand Up @@ -129,7 +130,7 @@ impl<T: TrivialSinkType> LogSinker for TrivialSink<T> {

log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?;
}
LogStoreReadItem::Barrier { .. } => {
LogStoreReadItem::Barrier { add_columns, .. } => {
if T::TRACE_LOG {
tracing::trace!(
target: "events::sink::message::barrier",
Expand All @@ -139,6 +140,10 @@ impl<T: TrivialSinkType> LogSinker for TrivialSink<T> {
);
}

if let Some(add_columns) = add_columns {
info!(?add_columns, "trivial sink receive add columns");
}

log_reader.truncate(TruncateOffset::Barrier { epoch })?;
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/sink/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(crate) mod dummy {

use anyhow::anyhow;
use phf::{Set, phf_set};
use risingwave_common::catalog::Field;
use risingwave_pb::connector_service::SinkMetadata;
use sea_orm::DatabaseConnection;
use tokio::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -69,7 +70,12 @@ pub(crate) mod dummy {
Err(err_feature_not_enabled(S::SINK_NAME))
}

async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
async fn commit(
&mut self,
_epoch: u64,
_metadata: Vec<SinkMetadata>,
_add_columns: Option<Vec<Field>>,
) -> Result<()> {
Err(err_feature_not_enabled(S::SINK_NAME))
}
}
Expand Down
Loading
Loading