Skip to content

Commit 293dd5e

Browse files
committed
feat: refreshable table, source part
Signed-off-by: xxchan <[email protected]>
1 parent e06c1e8 commit 293dd5e

File tree

29 files changed

+795
-21
lines changed

29 files changed

+795
-21
lines changed

proto/stream_service.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ message BarrierCompleteResponse {
7171
// prev_epoch of barrier
7272
uint64 epoch = 9;
7373
uint32 database_id = 10;
74+
// Used for refreshable batch source.
75+
// SourceExecutor reports the source load is finished, and then
76+
// meta will issue a LoadFinished barrier to notify MaterializeExecutor to start diff calculation.
77+
repeated uint32 load_finished_source_ids = 11;
7478
}
7579

7680
message StreamingControlStreamRequest {

src/connector/src/macros.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ macro_rules! for_all_classified_sources {
3939
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
4040
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
4141
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
42+
{ BatchPosixFs, $crate::source::filesystem::opendal_source::BatchPosixFsProperties, $crate::source::filesystem::opendal_source::BatchPosixFsSplit },
4243
{ Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
4344
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
4445
{ Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}

src/connector/src/source/base.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use crate::error::ConnectorResult as Result;
5050
use crate::parser::ParserConfig;
5151
use crate::parser::schema_change::SchemaChangeEnvelope;
5252
use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53+
use crate::source::batch::BatchSourceSplitImpl;
5354
use crate::source::monitor::EnumeratorMetrics;
5455
use crate::with_options::WithOptions;
5556
use crate::{
@@ -450,7 +451,10 @@ pub type BoxSourceMessageStream =
450451
BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
451452
/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
452453
pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
454+
/// `StreamChunk` with the latest split state.
455+
/// The state is constructed in `StreamReaderBuilder::into_retry_stream`
453456
pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
457+
/// See [`StreamChunkWithState`].
454458
pub type BoxSourceChunkWithStateStream =
455459
BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
456460

@@ -701,6 +705,15 @@ impl SplitImpl {
701705
_ => unreachable!("get_cdc_split_offset() is only for cdc split"),
702706
}
703707
}
708+
709+
pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
710+
match self {
711+
SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
712+
Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
713+
}
714+
_ => None,
715+
}
716+
}
704717
}
705718

706719
impl SplitMetaData for SplitImpl {

src/connector/src/source/batch.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2025 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::source::filesystem::opendal_source::BatchPosixFsSplit;
16+
use crate::source::{SplitImpl, SplitMetaData};
17+
18+
/// # Batch Refreshable Source
19+
///
20+
/// A batch refreshable source can be refreshed - reload all data from the source, e.g., re-run a `SELECT *` query from the source.
21+
/// The reloaded data will be handled by `RefreshableMaterialize` to calculate a diff to send to downstream.
22+
pub trait BatchSourceSplit: SplitMetaData {
23+
fn finished(&self) -> bool;
24+
/// Mark the source as finished. Called after the source is exhausted.
25+
/// A `LoadFinish` signal will be sent by `SourceExecutor`, and the `RefreshableMaterialize` will begin to calculate the diff.
26+
fn finish(&mut self);
27+
/// Refresh the source to make it ready for re-run.
28+
/// A `Refresh` signal will be sent, and the `RefreshableMaterialize` will begin to write the new data into a temporary staging table.
29+
fn refresh(&mut self);
30+
}
31+
32+
pub enum BatchSourceSplitImpl {
33+
BatchPosixFs(BatchPosixFsSplit),
34+
}
35+
36+
/// See [`BatchSourceSplit`] for more details.
37+
impl BatchSourceSplitImpl {
38+
pub fn finished(&self) -> bool {
39+
match self {
40+
BatchSourceSplitImpl::BatchPosixFs(split) => split.finished(),
41+
}
42+
}
43+
44+
pub fn finish(&mut self) {
45+
tracing::info!("finishing batch source split");
46+
match self {
47+
BatchSourceSplitImpl::BatchPosixFs(split) => split.finish(),
48+
}
49+
}
50+
51+
pub fn refresh(&mut self) {
52+
tracing::info!("refreshing batch source split");
53+
match self {
54+
BatchSourceSplitImpl::BatchPosixFs(split) => split.refresh(),
55+
}
56+
}
57+
}
58+
59+
impl From<BatchSourceSplitImpl> for SplitImpl {
60+
fn from(batch_split: BatchSourceSplitImpl) -> Self {
61+
match batch_split {
62+
BatchSourceSplitImpl::BatchPosixFs(split) => SplitImpl::BatchPosixFs(split),
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)