Skip to content

Commit ca59314

Browse files
committed
feat(frontend): add refreshable property & refresh command
Signed-off-by: xxchan <[email protected]>
1 parent d7e50cc commit ca59314

File tree

30 files changed

+484
-5
lines changed

30 files changed

+484
-5
lines changed

proto/catalog.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,10 @@ message Table {
498498
// NOTICE: when it is "None", the watermark column should be the first column in the pk
499499
optional int32 clean_watermark_index_in_pk = 44;
500500

501+
// Whether the table supports manual refresh operation:
502+
// reload data from external source and emit messages based on the diff with current data.
503+
bool refreshable = 45;
504+
501505
// Per-table catalog version, used by schema change. `None` for internal
502506
// tables and tests. Not to be confused with the global catalog version for
503507
// notification service.

proto/meta.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,15 @@ message ResumeRequest {}
178178

179179
message ResumeResponse {}
180180

181+
message RefreshRequest {
182+
uint32 table_id = 1;
183+
uint32 associated_source_id = 2;
184+
}
185+
186+
message RefreshResponse {
187+
common.Status status = 1;
188+
}
189+
181190
message CancelCreatingJobsRequest {
182191
message CreatingJobInfo {
183192
uint32 database_id = 1;
@@ -366,6 +375,7 @@ service StreamManagerService {
366375
rpc Flush(FlushRequest) returns (FlushResponse);
367376
rpc Pause(PauseRequest) returns (PauseResponse);
368377
rpc Resume(ResumeRequest) returns (ResumeResponse);
378+
rpc Refresh(RefreshRequest) returns (RefreshResponse);
369379
rpc CancelCreatingJobs(CancelCreatingJobsRequest) returns (CancelCreatingJobsResponse);
370380
rpc ListTableFragments(ListTableFragmentsRequest) returns (ListTableFragmentsResponse);
371381
rpc ListStreamingJobStates(ListStreamingJobStatesRequest) returns (ListStreamingJobStatesResponse);

proto/stream_plan.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,17 @@ message StartFragmentBackfillMutation {
123123
repeated uint32 fragment_ids = 1;
124124
}
125125

126+
message RefreshStartMutation {
127+
// Table ID to start refresh operation.
128+
uint32 table_id = 1;
129+
// Associated source ID for this refresh operation.
130+
uint32 associated_source_id = 2;
131+
}
132+
message LoadFinishMutation {
133+
// Associated source ID for this load operation.
134+
uint32 associated_source_id = 1;
135+
}
136+
126137
message BarrierMutation {
127138
oneof mutation {
128139
// Add new dispatchers to some actors, used for creating materialized views.
@@ -150,6 +161,10 @@ message BarrierMutation {
150161
// If we use rate limit to pause / resume backfill fragments, if user manually
151162
// resumes some fragments, this will overwrite the backfill order configuration.
152163
StartFragmentBackfillMutation start_fragment_backfill = 14;
164+
// Start refresh signal for refreshing tables
165+
RefreshStartMutation refresh_start = 15;
166+
// Load finish signal for refreshing tables
167+
LoadFinishMutation load_finish = 16;
153168
// Combined mutation.
154169
// Currently, it can only be Add & Update, which is for sink into table.
155170
CombinedMutation combined = 100;

src/connector/src/with_options.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ pub trait WithPropertiesExt: Get + GetKeyIter + Sized {
201201
})
202202
.unwrap_or(false)
203203
}
204+
205+
fn is_refreshable_connector(&self) -> bool {
206+
false
207+
}
204208
}
205209

206210
impl<T: Get + GetKeyIter> WithPropertiesExt for T {}

src/frontend/src/catalog/system_catalog/rw_catalog/rw_tables.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ struct RwTable {
2828
owner: i32,
2929
definition: String,
3030
append_only: bool,
31+
refreshable: bool,
3132
acl: Vec<String>,
3233
initialized_at: Option<Timestamptz>,
3334
created_at: Option<Timestamptz>,
@@ -57,6 +58,7 @@ fn read_rw_table_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwTable>> {
5758
owner: table.owner as i32,
5859
definition: table.create_sql_purified(),
5960
append_only: table.append_only,
61+
refreshable: table.refreshable,
6062
acl: get_acl_items(
6163
&GrantObject::TableId(table.id.table_id),
6264
true,

src/frontend/src/catalog/table_catalog.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ pub struct TableCatalog {
197197
pub engine: Engine,
198198

199199
pub clean_watermark_index_in_pk: Option<usize>,
200+
201+
/// Whether the table supports manual refresh operations
202+
pub refreshable: bool,
200203
}
201204

202205
pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
@@ -582,6 +585,7 @@ impl TableCatalog {
582585
job_id: self.job_id.map(|id| id.table_id),
583586
engine: Some(self.engine.to_protobuf().into()),
584587
clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
588+
refreshable: self.refreshable,
585589
}
586590
}
587591

@@ -784,6 +788,7 @@ impl From<PbTable> for TableCatalog {
784788
job_id: tb.job_id.map(TableId::from),
785789
engine,
786790
clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
791+
refreshable: tb.refreshable,
787792
}
788793
}
789794
}

src/frontend/src/handler/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ mod prepared_statement;
108108
pub mod privilege;
109109
pub mod query;
110110
mod recover;
111+
mod refresh;
111112
pub mod show;
112113
mod transaction;
113114
mod use_db;
@@ -1283,6 +1284,9 @@ pub async fn handle(
12831284
prepared_statement::handle_deallocate(name, prepare).await
12841285
}
12851286
Statement::Vacuum { object_name } => vacuum::handle_vacuum(handler_args, object_name).await,
1287+
Statement::Refresh { table_name } => {
1288+
refresh::handle_refresh(handler_args, table_name).await
1289+
}
12861290
_ => bail_not_implemented!("Unhandled statement: {}", stmt),
12871291
}
12881292
}

src/frontend/src/handler/refresh.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2025 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use pgwire::pg_response::{PgResponse, StatementType};
16+
use risingwave_pb::meta::RefreshRequest;
17+
use risingwave_sqlparser::ast::ObjectName;
18+
19+
use crate::catalog::table_catalog::TableType;
20+
use crate::error::{ErrorCode, Result};
21+
use crate::handler::util::get_table_catalog_by_table_name;
22+
use crate::handler::{HandlerArgs, RwPgResponse};
23+
24+
/// Handle REFRESH statement
25+
///
26+
/// This function processes the REFRESH statement by:
27+
/// 1. Validating the table exists and is refreshable
28+
/// 2. Sending a refresh command to the meta service
29+
/// 3. Returning appropriate response to the client
30+
pub async fn handle_refresh(
31+
handler_args: HandlerArgs,
32+
table_name: ObjectName,
33+
) -> Result<RwPgResponse> {
34+
let session = handler_args.session;
35+
36+
// Get table catalog to validate table exists
37+
let (table_catalog, schema_name) =
38+
get_table_catalog_by_table_name(session.as_ref(), &table_name)?;
39+
40+
// Check if table supports refresh operations
41+
if !table_catalog.refreshable {
42+
return Err(ErrorCode::InvalidInputSyntax(format!(
43+
"Table '{}.{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
44+
schema_name, table_name
45+
)).into());
46+
}
47+
48+
// Only allow refresh on tables, not views or materialized views
49+
match table_catalog.table_type() {
50+
TableType::Table => {
51+
// This is valid
52+
}
53+
t @ (TableType::MaterializedView | TableType::Index | TableType::Internal) => {
54+
return Err(ErrorCode::InvalidInputSyntax(format!(
55+
"REFRESH is only supported for tables, got {:?}.",
56+
t
57+
))
58+
.into());
59+
}
60+
}
61+
62+
let table_id = table_catalog.id();
63+
64+
// Create refresh request
65+
let refresh_request = RefreshRequest {
66+
table_id: table_id.table_id(),
67+
associated_source_id: table_catalog.associated_source_id().unwrap().table_id(),
68+
};
69+
70+
// Send refresh command to meta service via stream manager
71+
let meta_client = session.env().meta_client();
72+
match meta_client.refresh(refresh_request).await {
73+
Ok(_) => {
74+
// Refresh command sent successfully
75+
tracing::info!(
76+
table_id = %table_id,
77+
table_name = %table_name,
78+
"Manual refresh initiated"
79+
);
80+
81+
// Return success response
82+
Ok(PgResponse::builder(StatementType::OTHER)
83+
.notice(format!(
84+
"REFRESH initiated for table '{}.{}'",
85+
schema_name, table_name
86+
))
87+
.into())
88+
}
89+
Err(e) => {
90+
tracing::error!(
91+
error = %e,
92+
table_id = %table_id,
93+
table_name = %table_name,
94+
"Failed to initiate refresh"
95+
);
96+
97+
Err(ErrorCode::InternalError(format!(
98+
"Failed to refresh table '{}.{}': {}",
99+
schema_name, table_name, e
100+
))
101+
.into())
102+
}
103+
}
104+
}

src/frontend/src/meta_client.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies
3636
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
3737
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
3838
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
39-
use risingwave_pb::meta::{EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus};
39+
use risingwave_pb::meta::{
40+
EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus, RefreshRequest,
41+
RefreshResponse,
42+
};
4043
use risingwave_pb::secret::PbSecretRef;
4144
use risingwave_rpc_client::error::Result;
4245
use risingwave_rpc_client::{HummockMetaClient, MetaClient};
@@ -161,6 +164,8 @@ pub trait FrontendMetaClient: Send + Sync {
161164
async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()>;
162165

163166
async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64>;
167+
168+
async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse>;
164169
}
165170

166171
pub struct FrontendMetaClientImpl(pub MetaClient);
@@ -402,4 +407,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
402407
async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
403408
self.0.compact_iceberg_table(sink_id).await
404409
}
410+
411+
async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
412+
self.0.refresh(request).await
413+
}
405414
}

src/frontend/src/optimizer/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Fi
5858
use risingwave_common::types::DataType;
5959
use risingwave_common::util::column_index_mapping::ColIndexMapping;
6060
use risingwave_common::util::iter_util::ZipEqDebug;
61+
use risingwave_connector::WithPropertiesExt;
6162
use risingwave_connector::sink::catalog::SinkFormatDesc;
6263
use risingwave_pb::stream_plan::StreamScanType;
6364

@@ -943,6 +944,12 @@ impl LogicalPlanRoot {
943944
StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
944945
}
945946

947+
// Determine if the table should be refreshable based on the connector type
948+
let refreshable = source_catalog
949+
.as_ref()
950+
.map(|catalog| catalog.with_properties.is_refreshable_connector())
951+
.unwrap_or(false);
952+
946953
StreamMaterialize::create_for_table(
947954
stream_plan,
948955
table_name,
@@ -960,6 +967,7 @@ impl LogicalPlanRoot {
960967
retention_seconds,
961968
webhook_info,
962969
engine,
970+
refreshable,
963971
)
964972
}
965973

0 commit comments

Comments
 (0)