Skip to content

Commit e0f1777

Browse files
committed
feat: support refreshable table
1 parent a409585 commit e0f1777

File tree

45 files changed

+1731
-94
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1731
-94
lines changed

e2e_test/streaming/refresh_table.slt

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
# Test cases for refreshable table functionality using batch_posix_fs connector
2+
# This test validates the REFRESH TABLE command and covers insert, update, delete scenarios
3+
4+
control substitution on
5+
6+
# Setup test directory and initial test files
7+
system ok
8+
rm -rf /tmp/rw_refresh_test && mkdir -p /tmp/rw_refresh_test
9+
10+
system ok
11+
echo "1,alice" > /tmp/rw_refresh_test/file1.csv
12+
13+
system ok
14+
echo "2,bob" > /tmp/rw_refresh_test/file2.csv
15+
16+
# ----
17+
# Test 1: Create refreshable table and initial load (must have PRIMARY KEY)
18+
statement ok
19+
CREATE TABLE refresh_fs_t (id int PRIMARY KEY, name varchar) WITH (
20+
connector = 'batch_posix_fs',
21+
batch_posix_fs.root = '/tmp/rw_refresh_test',
22+
match_pattern = '*.csv'
23+
) FORMAT PLAIN ENCODE CSV (without_header = 'true', delimiter = ',');
24+
25+
sleep 2s
26+
27+
# Initial load should read existing files
28+
query II rowsort retry 3 backoff 5s
29+
SELECT * FROM refresh_fs_t;
30+
----
31+
1 alice
32+
2 bob
33+
34+
# ----
35+
# Test 2: Add new files and refresh (INSERT scenario)
36+
system ok
37+
echo "3,charlie" > /tmp/rw_refresh_test/file3.csv
38+
39+
system ok
40+
echo "4,david" > /tmp/rw_refresh_test/new_file.csv
41+
42+
sleep 2s
43+
44+
# won't see new data after load finish, before next refresh
45+
query II rowsort
46+
SELECT * FROM refresh_fs_t;
47+
----
48+
1 alice
49+
2 bob
50+
51+
statement ok
52+
REFRESH TABLE refresh_fs_t;
53+
54+
sleep 2s
55+
56+
57+
58+
59+
query II rowsort retry 3 backoff 5s
60+
SELECT * FROM refresh_fs_t;
61+
----
62+
1 alice
63+
2 bob
64+
3 charlie
65+
4 david
66+
67+
# ----
68+
# Test 3: Modify existing file content (UPDATE scenario)
69+
system ok
70+
echo "1,alice_updated" > /tmp/rw_refresh_test/file1.csv
71+
72+
system ok
73+
echo "2,bob_updated" > /tmp/rw_refresh_test/file2.csv
74+
75+
statement ok
76+
REFRESH TABLE refresh_fs_t;
77+
78+
sleep 2s
79+
80+
81+
82+
query II rowsort retry 3 backoff 5s
83+
SELECT * FROM refresh_fs_t;
84+
----
85+
1 alice_updated
86+
2 bob_updated
87+
3 charlie
88+
4 david
89+
90+
# ----
91+
# Test 4: Remove files (DELETE scenario)
92+
system ok
93+
rm /tmp/rw_refresh_test/file3.csv
94+
95+
system ok
96+
rm /tmp/rw_refresh_test/new_file.csv
97+
98+
statement ok
99+
REFRESH TABLE refresh_fs_t;
100+
101+
sleep 2s
102+
103+
104+
105+
query II rowsort retry 3 backoff 5s
106+
SELECT * FROM refresh_fs_t;
107+
----
108+
1 alice_updated
109+
2 bob_updated
110+
111+
# ----
112+
# Test 5: Mixed operations - add, update, delete simultaneously
113+
system ok
114+
echo "1,alice_final" > /tmp/rw_refresh_test/file1.csv
115+
116+
system ok
117+
rm /tmp/rw_refresh_test/file2.csv
118+
119+
system ok
120+
echo "5,eve" > /tmp/rw_refresh_test/file5.csv
121+
122+
system ok
123+
echo "6,frank" > /tmp/rw_refresh_test/file6.csv
124+
125+
statement ok
126+
REFRESH TABLE refresh_fs_t;
127+
128+
sleep 2s
129+
130+
131+
132+
query II rowsort retry 3 backoff 5s
133+
SELECT * FROM refresh_fs_t;
134+
----
135+
1 alice_final
136+
5 eve
137+
6 frank
138+
139+
# ----
140+
# Test 6: Empty directory (all files deleted)
141+
system ok
142+
rm /tmp/rw_refresh_test/*.csv
143+
144+
statement ok
145+
REFRESH TABLE refresh_fs_t;
146+
147+
sleep 2s
148+
149+
150+
151+
152+
query II rowsort retry 3 backoff 5s
153+
SELECT * FROM refresh_fs_t;
154+
----
155+
156+
# ----
157+
# Test 7: Re-populate after empty
158+
system ok
159+
echo "7,grace" > /tmp/rw_refresh_test/new1.csv
160+
161+
system ok
162+
echo "8,henry" > /tmp/rw_refresh_test/new2.csv
163+
164+
statement ok
165+
REFRESH TABLE refresh_fs_t;
166+
167+
sleep 2s
168+
169+
170+
171+
172+
173+
query II rowsort retry 3 backoff 5s
174+
SELECT * FROM refresh_fs_t;
175+
----
176+
7 grace
177+
8 henry
178+
179+
# ----
180+
# Test 8: Error cases
181+
182+
# Test refreshing non-existent table
183+
statement error
184+
REFRESH TABLE non_existent_table;
185+
186+
# Test refreshing a non-refreshable table
187+
statement ok
188+
CREATE TABLE non_refreshable_t (v1 int) WITH (
189+
connector = 'datagen',
190+
fields.v1.kind = 'sequence',
191+
fields.v1.start = '1',
192+
fields.v1.end = '10'
193+
) FORMAT NATIVE ENCODE NATIVE;
194+
195+
statement error Table 'public.non_refreshable_t' is not refreshable
196+
REFRESH TABLE non_refreshable_t;
197+
198+
# ----
199+
# Test 9: Error case - create refreshable table without PRIMARY KEY
200+
statement error
201+
CREATE TABLE no_pk_refresh_t (id int, name varchar) WITH (
202+
connector = 'batch_posix_fs',
203+
batch_posix_fs.root = '/tmp/rw_refresh_test',
204+
match_pattern = '*.csv'
205+
) FORMAT PLAIN ENCODE CSV (without_header = 'true', delimiter = ',');
206+
207+
# ----
208+
# Cleanup
209+
statement ok
210+
DROP TABLE refresh_fs_t;
211+
212+
statement ok
213+
DROP TABLE non_refreshable_t;
214+
215+
system ok
216+
rm -rf /tmp/rw_refresh_test

proto/stream_plan.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,9 @@ message MaterializeNode {
372372
repeated common.ColumnOrder column_orders = 2;
373373
// Used for internal table states.
374374
catalog.Table table = 3;
375+
// For refreshable tables, staging table for collecting new data during refresh.
376+
// This table has the same schema as the main table but with a different table_id.
377+
optional catalog.Table staging_table = 5;
375378
}
376379

377380
message AggCallState {

proto/stream_service.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ message BarrierCompleteResponse {
7171
// prev_epoch of barrier
7272
uint64 epoch = 9;
7373
uint32 database_id = 10;
74+
// Used for refreshable batch source.
75+
// SourceExecutor reports the source load is finished, and then
76+
// meta will issue a LoadFinished barrier to notify MaterializeExecutor to start diff calculation.
77+
repeated uint32 load_finished_source_ids = 11;
7478
}
7579

7680
message StreamingControlStreamRequest {

src/common/src/util/stream_graph_visitor.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,12 +269,16 @@ pub fn visit_stream_node_tables_inner<F>(
269269
}
270270

271271
// Note: add internal tables for new nodes here.
272-
NodeBody::Materialize(node) if !internal_tables_only => {
273-
// Note: Plan directly generated by frontend may not have the table filled,
274-
// as it is filled by the meta service during building fragments.
275-
// However, if caller requests to visit this table via `!internal_tables_only`,
276-
// we still assert on its existence to avoid undefined behavior.
277-
always!(node.table, "Materialize")
272+
NodeBody::Materialize(node) => {
273+
if !internal_tables_only {
274+
// Note: Plan directly generated by frontend may not have the table filled,
275+
// as it is filled by the meta service during building fragments.
276+
// However, if caller requests to visit this table via `!internal_tables_only`,
277+
// we still assert on its existence to avoid undefined behavior.
278+
always!(node.table, "Materialize");
279+
}
280+
// Also visit the staging table if it exists (for refreshable tables)
281+
optional!(node.staging_table, "MaterializeStaging");
278282
}
279283

280284
// Global Approx Percentile

src/connector/src/macros.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ macro_rules! for_all_classified_sources {
3939
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
4040
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
4141
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
42+
{ BatchPosixFs, $crate::source::filesystem::opendal_source::BatchPosixFsProperties, $crate::source::filesystem::opendal_source::BatchPosixFsSplit },
4243
{ Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
4344
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
4445
{ Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}

src/connector/src/source/base.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use crate::error::ConnectorResult as Result;
5050
use crate::parser::ParserConfig;
5151
use crate::parser::schema_change::SchemaChangeEnvelope;
5252
use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53+
use crate::source::batch::BatchSourceSplitImpl;
5354
use crate::source::monitor::EnumeratorMetrics;
5455
use crate::with_options::WithOptions;
5556
use crate::{
@@ -450,7 +451,10 @@ pub type BoxSourceMessageStream =
450451
BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
451452
/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
452453
pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
454+
/// `StreamChunk` with the latest split state.
455+
/// The state is constructed in `StreamReaderBuilder::into_retry_stream`
453456
pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
457+
/// See [`StreamChunkWithState`].
454458
pub type BoxSourceChunkWithStateStream =
455459
BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
456460

@@ -701,6 +705,15 @@ impl SplitImpl {
701705
_ => unreachable!("get_cdc_split_offset() is only for cdc split"),
702706
}
703707
}
708+
709+
pub fn as_batch_split(self) -> Option<BatchSourceSplitImpl> {
710+
match self {
711+
SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
712+
Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
713+
}
714+
_ => None,
715+
}
716+
}
704717
}
705718

706719
impl SplitMetaData for SplitImpl {

src/connector/src/source/batch.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2025 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::source::filesystem::opendal_source::BatchPosixFsSplit;
16+
use crate::source::{SplitImpl, SplitMetaData};
17+
18+
/// # Batch Refreshable Source
19+
///
20+
/// A batch refreshable source can be refreshed - reload all data from the source, e.g., re-run a `SELECT *` query from the source.
21+
/// The reloaded data will be handled by `RefreshableMaterialize` to calculate a diff to send to downstream.
22+
pub trait BatchSourceSplit: SplitMetaData {
23+
fn finished(&self) -> bool;
24+
/// Mark the source as finished. Called after the source is exhausted.
25+
/// A `LoadFinish` signal will be sent by `SourceExecutor`, and the `RefreshableMaterialize` will begin to calculate the diff.
26+
fn finish(&mut self);
27+
/// Refresh the source to make it ready for re-run.
28+
/// A `Refresh` signal will be sent, and the `RefreshableMaterialize` will begin to write the new data into a temporary staging table.
29+
fn refresh(&mut self);
30+
}
31+
32+
pub enum BatchSourceSplitImpl {
33+
BatchPosixFs(BatchPosixFsSplit),
34+
}
35+
36+
/// See [`BatchSourceSplit`] for more details.
37+
impl BatchSourceSplitImpl {
38+
pub fn finished(&self) -> bool {
39+
match self {
40+
BatchSourceSplitImpl::BatchPosixFs(split) => split.finished(),
41+
}
42+
}
43+
44+
pub fn finish(&mut self) {
45+
tracing::info!("finishing batch source split");
46+
match self {
47+
BatchSourceSplitImpl::BatchPosixFs(split) => split.finish(),
48+
}
49+
}
50+
51+
pub fn refresh(&mut self) {
52+
tracing::info!("refreshing batch source split");
53+
match self {
54+
BatchSourceSplitImpl::BatchPosixFs(split) => split.refresh(),
55+
}
56+
}
57+
}
58+
59+
impl From<BatchSourceSplitImpl> for SplitImpl {
60+
fn from(batch_split: BatchSourceSplitImpl) -> Self {
61+
match batch_split {
62+
BatchSourceSplitImpl::BatchPosixFs(split) => SplitImpl::BatchPosixFs(split),
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)