Skip to content

Commit 383fcff

Browse files
Better journal mismatch error (#44)
1 parent 6d3ecd6 commit 383fcff

File tree

9 files changed

+958
-63
lines changed

9 files changed

+958
-63
lines changed

src/error.rs

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub(crate) struct CommandMetadata {
1818

1919
impl fmt::Display for CommandMetadata {
2020
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21-
write!(f, "{:?} ", self.ty)?;
21+
write!(f, "{} ", self.ty)?;
2222
if let Some(name) = &self.name {
2323
write!(f, "[{}]", name)?;
2424
} else {
@@ -171,3 +171,133 @@ impl From<CommandType> for MessageType {
171171
}
172172
}
173173
}
174+
175+
impl TryFrom<MessageType> for CommandType {
176+
type Error = MessageType;
177+
178+
fn try_from(value: MessageType) -> Result<Self, Self::Error> {
179+
match value {
180+
MessageType::InputCommand => Ok(CommandType::Input),
181+
MessageType::OutputCommand => Ok(CommandType::Output),
182+
MessageType::GetLazyStateCommand | MessageType::GetEagerStateCommand => {
183+
Ok(CommandType::GetState)
184+
}
185+
MessageType::GetLazyStateKeysCommand | MessageType::GetEagerStateKeysCommand => {
186+
Ok(CommandType::GetStateKeys)
187+
}
188+
MessageType::SetStateCommand => Ok(CommandType::SetState),
189+
MessageType::ClearStateCommand => Ok(CommandType::ClearState),
190+
MessageType::ClearAllStateCommand => Ok(CommandType::ClearAllState),
191+
MessageType::GetPromiseCommand => Ok(CommandType::GetPromise),
192+
MessageType::PeekPromiseCommand => Ok(CommandType::PeekPromise),
193+
MessageType::CompletePromiseCommand => Ok(CommandType::CompletePromise),
194+
MessageType::SleepCommand => Ok(CommandType::Sleep),
195+
MessageType::CallCommand => Ok(CommandType::Call),
196+
MessageType::OneWayCallCommand => Ok(CommandType::OneWayCall),
197+
MessageType::SendSignalCommand => Ok(CommandType::SendSignal),
198+
MessageType::RunCommand => Ok(CommandType::Run),
199+
MessageType::AttachInvocationCommand => Ok(CommandType::AttachInvocation),
200+
MessageType::GetInvocationOutputCommand => Ok(CommandType::GetInvocationOutput),
201+
MessageType::CompleteAwakeableCommand => Ok(CommandType::CompleteAwakeable),
202+
_ => Err(value),
203+
}
204+
}
205+
}
206+
207+
#[cfg(test)]
208+
mod tests {
209+
use super::*;
210+
211+
#[test]
212+
fn test_message_type_to_command_type_conversion() {
213+
// Test successful conversions
214+
assert_eq!(
215+
CommandType::try_from(MessageType::InputCommand).unwrap(),
216+
CommandType::Input
217+
);
218+
assert_eq!(
219+
CommandType::try_from(MessageType::OutputCommand).unwrap(),
220+
CommandType::Output
221+
);
222+
assert_eq!(
223+
CommandType::try_from(MessageType::GetLazyStateCommand).unwrap(),
224+
CommandType::GetState
225+
);
226+
assert_eq!(
227+
CommandType::try_from(MessageType::GetLazyStateKeysCommand).unwrap(),
228+
CommandType::GetStateKeys
229+
);
230+
assert_eq!(
231+
CommandType::try_from(MessageType::SetStateCommand).unwrap(),
232+
CommandType::SetState
233+
);
234+
assert_eq!(
235+
CommandType::try_from(MessageType::ClearStateCommand).unwrap(),
236+
CommandType::ClearState
237+
);
238+
assert_eq!(
239+
CommandType::try_from(MessageType::ClearAllStateCommand).unwrap(),
240+
CommandType::ClearAllState
241+
);
242+
assert_eq!(
243+
CommandType::try_from(MessageType::GetPromiseCommand).unwrap(),
244+
CommandType::GetPromise
245+
);
246+
assert_eq!(
247+
CommandType::try_from(MessageType::PeekPromiseCommand).unwrap(),
248+
CommandType::PeekPromise
249+
);
250+
assert_eq!(
251+
CommandType::try_from(MessageType::CompletePromiseCommand).unwrap(),
252+
CommandType::CompletePromise
253+
);
254+
assert_eq!(
255+
CommandType::try_from(MessageType::SleepCommand).unwrap(),
256+
CommandType::Sleep
257+
);
258+
assert_eq!(
259+
CommandType::try_from(MessageType::CallCommand).unwrap(),
260+
CommandType::Call
261+
);
262+
assert_eq!(
263+
CommandType::try_from(MessageType::OneWayCallCommand).unwrap(),
264+
CommandType::OneWayCall
265+
);
266+
assert_eq!(
267+
CommandType::try_from(MessageType::SendSignalCommand).unwrap(),
268+
CommandType::SendSignal
269+
);
270+
assert_eq!(
271+
CommandType::try_from(MessageType::RunCommand).unwrap(),
272+
CommandType::Run
273+
);
274+
assert_eq!(
275+
CommandType::try_from(MessageType::AttachInvocationCommand).unwrap(),
276+
CommandType::AttachInvocation
277+
);
278+
assert_eq!(
279+
CommandType::try_from(MessageType::GetInvocationOutputCommand).unwrap(),
280+
CommandType::GetInvocationOutput
281+
);
282+
assert_eq!(
283+
CommandType::try_from(MessageType::CompleteAwakeableCommand).unwrap(),
284+
CommandType::CompleteAwakeable
285+
);
286+
287+
// Test failed conversions
288+
assert_eq!(
289+
CommandType::try_from(MessageType::Start).err().unwrap(),
290+
MessageType::Start
291+
);
292+
assert_eq!(
293+
CommandType::try_from(MessageType::End).err().unwrap(),
294+
MessageType::End
295+
);
296+
assert_eq!(
297+
CommandType::try_from(MessageType::GetLazyStateCompletionNotification)
298+
.err()
299+
.unwrap(),
300+
MessageType::GetLazyStateCompletionNotification
301+
);
302+
}
303+
}

src/fmt.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use crate::CommandType;
2+
use std::fmt;
3+
use std::sync::OnceLock;
4+
5+
pub trait CommandFormatter: Send + Sync + fmt::Debug + 'static {
6+
fn ty_display(&self, ty: CommandType) -> &'static str;
7+
}
8+
9+
static GLOBAL_COMMAND_FORMATTER: OnceLock<Box<dyn CommandFormatter>> = OnceLock::new();
10+
11+
// Public function for the crate user to set the formatter
12+
pub fn set_error_formatter(formatter: impl CommandFormatter + 'static) {
13+
GLOBAL_COMMAND_FORMATTER
14+
.set(Box::new(formatter))
15+
.expect("Error formatter already set! It can only be set once.");
16+
}
17+
18+
#[derive(Debug)]
19+
struct DefaultCommandFormatter;
20+
21+
impl CommandFormatter for DefaultCommandFormatter {
22+
fn ty_display(&self, ty: CommandType) -> &'static str {
23+
match ty {
24+
CommandType::Input => "handler input",
25+
CommandType::Output => "handler return",
26+
CommandType::GetState => "get state",
27+
CommandType::GetStateKeys => "get state keys",
28+
CommandType::SetState => "set state",
29+
CommandType::ClearState => "clear state",
30+
CommandType::ClearAllState => "clear all state",
31+
CommandType::GetPromise => "get promise",
32+
CommandType::PeekPromise => "peek promise",
33+
CommandType::CompletePromise => "complete promise",
34+
CommandType::Sleep => "sleep",
35+
CommandType::Call => "call",
36+
CommandType::OneWayCall => "one way call/send",
37+
CommandType::SendSignal => "send signal",
38+
CommandType::Run => "run",
39+
CommandType::AttachInvocation => "attach invocation",
40+
CommandType::GetInvocationOutput => "get invocation output",
41+
CommandType::CompleteAwakeable => "complete awakeable",
42+
CommandType::CancelInvocation => "cancel invocation",
43+
}
44+
}
45+
}
46+
47+
pub(crate) fn display_command_ty(command_type: CommandType) -> &'static str {
48+
if let Some(custom_formatter) = GLOBAL_COMMAND_FORMATTER.get() {
49+
custom_formatter.ty_display(command_type)
50+
} else {
51+
DefaultCommandFormatter.ty_display(command_type)
52+
}
53+
}
54+
55+
pub(crate) struct DiffFormatter<'a, 'b> {
56+
fmt: &'a mut fmt::Formatter<'b>,
57+
indentation: &'static str,
58+
}
59+
60+
impl<'a, 'b: 'a> DiffFormatter<'a, 'b> {
61+
pub(crate) fn new(fmt: &'a mut fmt::Formatter<'b>, indentation: &'static str) -> Self {
62+
Self { fmt, indentation }
63+
}
64+
65+
pub(crate) fn write_diff(
66+
&mut self,
67+
field_name: &'static str,
68+
actual: impl fmt::Display,
69+
expected: impl fmt::Display,
70+
) -> fmt::Result {
71+
write!(
72+
self.fmt,
73+
"\n{}{field_name}: {actual} != {expected}",
74+
self.indentation
75+
)
76+
}
77+
78+
pub(crate) fn write_bytes_diff(
79+
&mut self,
80+
field_name: &'static str,
81+
actual: &[u8],
82+
expected: &[u8],
83+
) -> fmt::Result {
84+
write!(self.fmt, "\n{}{field_name}: ", self.indentation)?;
85+
match (std::str::from_utf8(actual), std::str::from_utf8(expected)) {
86+
(Ok(actual), Ok(expected)) => {
87+
write!(self.fmt, "'{actual}' != '{expected}'",)
88+
}
89+
(Ok(actual), Err(_)) => {
90+
write!(self.fmt, "'{actual}' != {expected:?}")
91+
}
92+
(Err(_), Ok(expected)) => {
93+
write!(self.fmt, "{actual:?} != '{expected}'")
94+
}
95+
(Err(_), Err(_)) => {
96+
write!(self.fmt, "{actual:?} != {expected:?}")
97+
}
98+
}
99+
}
100+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod error;
2+
pub mod fmt;
23
mod headers;
34
#[cfg(feature = "request_identity")]
45
mod request_identity;

src/service_protocol/encoding.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use super::*;
1414

1515
use std::mem;
1616

17+
use crate::vm::errors::CommandTypeMismatchError;
1718
use bytes::{Buf, BufMut, Bytes, BytesMut};
1819
use bytes_utils::SegmentedBuf;
1920
use prost::Message;
@@ -22,11 +23,8 @@ use prost::Message;
2223
pub enum DecodingError {
2324
#[error("cannot decode protocol message type {0:?}. Reason: {1:?}")]
2425
DecodeMessage(MessageType, #[source] prost::DecodeError),
25-
#[error("Replayed journal doesn't match the handler code.\nThe handler code generated: {expected:?}\nwhile the replayed entry is: {actual:?}")]
26-
UnexpectedMessageType {
27-
expected: MessageType,
28-
actual: MessageType,
29-
},
26+
#[error(transparent)]
27+
UnexpectedMessageType(CommandTypeMismatchError),
3028
#[error("expected message type {expected:?} to have field {field}")]
3129
MissingField {
3230
expected: MessageType,
@@ -94,10 +92,9 @@ impl RawMessage {
9492

9593
pub fn decode_to<M: RestateMessage>(self) -> Result<M, DecodingError> {
9694
if self.0.message_type() != M::ty() {
97-
return Err(DecodingError::UnexpectedMessageType {
98-
expected: M::ty(),
99-
actual: self.0.message_type(),
100-
});
95+
return Err(DecodingError::UnexpectedMessageType(
96+
CommandTypeMismatchError::new(self.0.message_type(), M::ty()),
97+
));
10198
}
10299
M::decode(self.1).map_err(|e| DecodingError::DecodeMessage(self.0.message_type(), e))
103100
}

src/service_protocol/header.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use crate::CommandType;
12+
use std::fmt;
13+
1114
const COMMAND_ENTRY_MASK: u16 = 0x0400;
1215
const NOTIFICATION_ENTRY_MASK: u16 = 0x8000;
1316
const CUSTOM_ENTRY_MASK: u16 = 0xFC00;
@@ -118,6 +121,15 @@ impl MessageType {
118121
}
119122
}
120123

124+
impl fmt::Display for MessageType {
125+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126+
match CommandType::try_from(*self) {
127+
Ok(ct) => write!(f, "{}", crate::fmt::display_command_ty(ct)),
128+
Err(mt) => write!(f, "{:?}", mt),
129+
}
130+
}
131+
}
132+
121133
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
122134
pub struct MessageHeader {
123135
ty: MessageType,

0 commit comments

Comments
 (0)