Skip to content

Commit 5c3c59a

Browse files
authored
chore: use rayon to write snapshot updates (#1423)
* chore: use rayon to write snapshot updates * chore: use rayon * chore: rayon deps * chore: client api check * fix: encode version * chore: rename function, add verbose logs * chore: use default sv * chore: logs * chore: replaying small collab on current thread
1 parent c368670 commit 5c3c59a

File tree

31 files changed

+519
-409
lines changed

31 files changed

+519
-409
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libs/client-api/src/v2/actor.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ impl WorkspaceControllerActor {
256256
Ok(())
257257
}
258258

259+
#[instrument(level = "trace", skip_all)]
259260
pub(crate) fn set_connection_status(&self, status: ConnectionStatus) {
260261
sync_trace!("set connection status: {:?}", status);
261262
self.status_tx.send_replace(status);
@@ -305,6 +306,7 @@ impl WorkspaceControllerActor {
305306
}
306307
}
307308

309+
#[instrument(level = "trace", skip_all)]
308310
async fn handle_action(actor: &Arc<Self>, action: WorkspaceAction) {
309311
let id = actor.db.client_id();
310312
sync_trace!("[{}] action {:?}", id, action);
@@ -358,6 +360,7 @@ impl WorkspaceControllerActor {
358360
}
359361
}
360362

363+
#[instrument(level = "trace", skip_all)]
361364
async fn handle_send(&self, msg: ClientMessage, source: ActionSource) -> anyhow::Result<()> {
362365
if let ClientMessage::Update {
363366
object_id,
@@ -422,7 +425,7 @@ impl WorkspaceControllerActor {
422425
.borrow()
423426
.disconnected_reason()
424427
.as_ref()
425-
.map(|v| v.retriable() || matches!(v, DisconnectedReason::ReachMaximumRetry))
428+
.map(|v| v.retriable_when_editing())
426429
.unwrap_or(false);
427430

428431
if should_retry {
@@ -441,6 +444,7 @@ impl WorkspaceControllerActor {
441444
}
442445
}
443446

447+
#[instrument(level = "trace", skip_all)]
444448
pub(crate) async fn handle_connect(
445449
actor: &Arc<Self>,
446450
access_token: String,
@@ -469,7 +473,12 @@ impl WorkspaceControllerActor {
469473

470474
match result {
471475
None => {
472-
actor.set_connection_status(ConnectionStatus::Disconnected { reason: None });
476+
sync_info!("[{}] connection established failed", actor.db.client_id());
477+
actor.set_connection_status(ConnectionStatus::Disconnected {
478+
reason: Some(DisconnectedReason::Unexpected(
479+
"Establish connect failed".into(),
480+
)),
481+
});
473482
},
474483
Some(connection) => {
475484
sync_debug!("[{}] connected to {}", client_id, actor.options.url);
@@ -502,6 +511,7 @@ impl WorkspaceControllerActor {
502511
let reason = Self::remote_receiver_loop(weak_actor.clone(), stream, cancel.clone())
503512
.await
504513
.err();
514+
505515
if let Some(actor) = weak_actor.upgrade() {
506516
if reason.is_some() {
507517
sync_error!("failed to receive messages from server: {:?}", reason);
@@ -521,7 +531,7 @@ impl WorkspaceControllerActor {
521531
while let Some(res) = stream.next().await {
522532
if cancel.is_cancelled() {
523533
sync_trace!("remote receiver loop cancelled");
524-
break;
534+
return Err(DisconnectedReason::UserDisconnect("User disconnect".into()));
525535
}
526536
let actor = match weak_actor.upgrade() {
527537
Some(inner) => inner,
@@ -580,8 +590,7 @@ impl WorkspaceControllerActor {
580590
frame.reason
581591
),
582592
}
583-
cancel.cancel();
584-
break;
593+
return Err(DisconnectedReason::ServerForceClose);
585594
},
586595
}
587596
}
@@ -1112,17 +1121,17 @@ impl WorkspaceControllerActor {
11121121
res = fut => {
11131122
match res {
11141123
Ok((stream, _resp)) => {
1115-
info!("establishing WebSocket successfully");
1124+
sync_info!("establishing WebSocket successfully");
11161125
Ok(Some(stream))
11171126
},
11181127
Err(err) => {
1119-
error!("establishing WebSocket failed");
1128+
sync_error!("establishing WebSocket failed");
11201129
Err(AppError::from(err).into())
11211130
}
11221131
}
11231132
}
11241133
_ = cancel.cancelled() => {
1125-
tracing::debug!("connection cancelled");
1134+
tracing::debug!("establishing connection cancelled");
11261135
Ok(None)
11271136
}
11281137
}

libs/client-api/src/v2/controller.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use tokio_stream::StreamExt;
2222
use tokio_tungstenite::tungstenite::error::ProtocolError;
2323
use tokio_tungstenite::tungstenite::{Error, Message};
2424
use tokio_util::sync::CancellationToken;
25+
use tracing::instrument;
2526
use yrs::block::ClientID;
2627

2728
#[derive(Clone)]
@@ -188,6 +189,7 @@ pub enum DisconnectedReason {
188189
MessageLoopEnd(Arc<str>),
189190
CannotHandleReceiveMessage(Arc<str>),
190191
UserDisconnect(Arc<str>),
192+
ServerForceClose,
191193
Unauthorized(Arc<str>),
192194
}
193195

@@ -206,6 +208,7 @@ impl Display for DisconnectedReason {
206208
},
207209
DisconnectedReason::UserDisconnect(reason) => write!(f, "user disconnect: {}", reason),
208210
DisconnectedReason::Unauthorized(reason) => write!(f, "unauthorized: {}", reason),
211+
DisconnectedReason::ServerForceClose => write!(f, "server force close"),
209212
}
210213
}
211214
}
@@ -247,6 +250,16 @@ impl DisconnectedReason {
247250
Self::Unexpected(..) | Self::ResetWithoutClosingHandshake
248251
)
249252
}
253+
254+
pub fn retriable_when_editing(&self) -> bool {
255+
matches!(
256+
self,
257+
Self::Unexpected(..)
258+
| Self::ResetWithoutClosingHandshake
259+
| DisconnectedReason::Unauthorized(_)
260+
| DisconnectedReason::ReachMaximumRetry
261+
)
262+
}
250263
}
251264

252265
#[derive(Debug, Clone)]
@@ -345,6 +358,7 @@ impl ReconnectTarget for WorkspaceControllerActor {
345358
WorkspaceControllerActor::handle_connect(&self, token).await
346359
}
347360

361+
#[instrument(level = "trace", skip_all)]
348362
fn set_disconnected(&self, reason: DisconnectedReason) {
349363
self.set_connection_status(ConnectionStatus::Disconnected {
350364
reason: Some(reason),

libs/collab-stream/src/collab_update_sink.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ impl CollabUpdateSink {
3535
self.collab_type,
3636
&msg.sender,
3737
msg.data.clone(),
38+
msg.flags,
3839
)
3940
.query_async(&mut *lock)
4041
.await?;

libs/collab-stream/src/model.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ impl UpdateStreamMessage {
492492
collab_type: CollabType,
493493
sender: &CollabOrigin,
494494
update: Vec<u8>,
495+
flag: UpdateFlags,
495496
) -> Cmd {
496497
let mut cmd = cmd("XADD");
497498
cmd
@@ -504,7 +505,9 @@ impl UpdateStreamMessage {
504505
.arg("sender")
505506
.arg(sender.to_string())
506507
.arg("data")
507-
.arg(update.to_vec());
508+
.arg(update.to_vec())
509+
.arg("flags")
510+
.arg(flag);
508511
cmd
509512
}
510513
}

libs/indexer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ uuid.workspace = true
2222
async-trait.workspace = true
2323
serde_json.workspace = true
2424
anyhow.workspace = true
25-
infra.workspace = true
25+
infra = { workspace = true }
2626
prometheus-client = "0.22.3"
2727
chrono = "0.4.39"
2828
serde.workspace = true

libs/indexer/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,5 @@ pub mod error;
44
pub mod metrics;
55
pub mod queue;
66
pub mod scheduler;
7-
pub mod thread_pool;
87
mod unindexed_workspace;
98
pub mod vector;

libs/infra/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ tokio = { workspace = true, optional = true }
1616
pin-project.workspace = true
1717
futures = "0.3.30"
1818
validator = { workspace = true, features = ["validator_derive", "derive"] }
19+
thiserror = { workspace = true }
20+
rayon = { workspace = true }
1921

2022
[features]
2123
file_util = ["tokio/fs"]
22-
request_util = ["reqwest"]
24+
request_util = ["reqwest"]

libs/infra/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ pub mod env_util;
44
pub mod file_util;
55
#[cfg(feature = "request_util")]
66
pub mod reqwest;
7+
8+
pub mod thread_pool;
79
pub mod validate;
File renamed without changes.

0 commit comments

Comments
 (0)