Skip to content

Commit 35447b6

Browse files
committed
refactor(dialog): use TransactionEventSender
1 parent 2c74264 commit 35447b6

File tree

6 files changed

+81
-71
lines changed

6 files changed

+81
-71
lines changed

src/dialog/dialog.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,14 @@ pub struct DialogInner {
178178
pub route_set: Vec<Route>,
179179
pub(super) endpoint_inner: EndpointInnerRef,
180180
pub(super) state_sender: DialogStateSender,
181-
pub(super) tu_sender: TuSenderRef,
181+
pub(super) tu_sender: TransactionEventSender,
182182
pub(super) initial_request: Request,
183183
}
184184

185185
pub type DialogStateReceiver = UnboundedReceiver<DialogState>;
186186
pub type DialogStateSender = UnboundedSender<DialogState>;
187187

188188
pub(super) type DialogInnerRef = Arc<DialogInner>;
189-
pub(super) type TuSenderRef = Mutex<Option<TransactionEventSender>>;
190189

191190
impl DialogState {
192191
pub fn can_cancel(&self) -> bool {
@@ -212,6 +211,7 @@ impl DialogInner {
212211
state_sender: DialogStateSender,
213212
credential: Option<Credential>,
214213
local_contact: Option<rsip::Uri>,
214+
tu_sender: TransactionEventSender,
215215
) -> Result<Self> {
216216
let cseq = initial_request.cseq_header()?.seq()?;
217217

@@ -257,7 +257,7 @@ impl DialogInner {
257257
route_set,
258258
endpoint_inner,
259259
state_sender,
260-
tu_sender: Mutex::new(None),
260+
tu_sender,
261261
state: Mutex::new(DialogState::Calling(id)),
262262
initial_request,
263263
local_contact,

src/dialog/dialog_layer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ impl DialogLayer {
174174
state_sender,
175175
credential,
176176
contact,
177+
tx.tu_sender.clone(),
177178
)?;
178179

179180
let dialog = ServerInviteDialog {

src/dialog/invitation.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,10 @@ impl DialogLayer {
370370
request.headers.unique_push(rsip::Header::ContentLength(
371371
(request.body.len() as u32).into(),
372372
));
373+
let key = TransactionKey::from_request(&request, TransactionRole::Client)?;
374+
let mut tx = Transaction::new_client(key, request.clone(), self.endpoint.clone(), None);
375+
tx.destination = opt.destination;
376+
373377
let id = DialogId::try_from(&request)?;
374378
let dlg_inner = DialogInner::new(
375379
TransactionRole::Client,
@@ -379,12 +383,9 @@ impl DialogLayer {
379383
state_sender,
380384
opt.credential,
381385
Some(opt.contact),
386+
tx.tu_sender.clone(),
382387
)?;
383388

384-
let key =
385-
TransactionKey::from_request(&dlg_inner.initial_request, TransactionRole::Client)?;
386-
let mut tx = Transaction::new_client(key, request.clone(), self.endpoint.clone(), None);
387-
tx.destination = opt.destination;
388389
let dialog = ClientInviteDialog {
389390
inner: Arc::new(dlg_inner),
390391
};

src/dialog/server_dialog.rs

Lines changed: 45 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -148,33 +148,28 @@ impl ServerInviteDialog {
148148
/// # }
149149
/// ```
150150
pub fn accept(&self, headers: Option<Vec<Header>>, body: Option<Vec<u8>>) -> Result<()> {
151-
if let Some(sender) = self.inner.tu_sender.lock().unwrap().as_ref() {
152-
let resp = self.inner.make_response(
153-
&self.inner.initial_request,
154-
rsip::StatusCode::OK,
155-
headers,
156-
body,
157-
);
158-
let via = self.inner.initial_request.via_header()?;
159-
let via_received = SipConnection::parse_target_from_via(via)?;
160-
let contact = rsip::Uri {
161-
host_with_port: via_received,
162-
params: vec![rsip::Param::Transport(via.trasnport()?)],
163-
..Default::default()
164-
};
165-
debug!(id = %self.id(), "accepting dialog with contact: {}", contact);
166-
self.inner.remote_contact.lock().unwrap().replace(contact);
167-
sender.send(TransactionEvent::Respond(resp.clone()))?;
151+
let resp = self.inner.make_response(
152+
&self.inner.initial_request,
153+
rsip::StatusCode::OK,
154+
headers,
155+
body,
156+
);
157+
let via = self.inner.initial_request.via_header()?;
158+
let via_received = SipConnection::parse_target_from_via(via)?;
159+
let contact = rsip::Uri {
160+
host_with_port: via_received,
161+
params: vec![rsip::Param::Transport(via.trasnport()?)],
162+
..Default::default()
163+
};
164+
debug!(id = %self.id(), "accepting dialog with contact: {}", contact);
165+
self.inner.remote_contact.lock().unwrap().replace(contact);
166+
self.inner
167+
.tu_sender
168+
.send(TransactionEvent::Respond(resp.clone()))?;
168169

169-
self.inner
170-
.transition(DialogState::WaitAck(self.id(), resp))?;
171-
Ok(())
172-
} else {
173-
Err(crate::Error::DialogError(
174-
"transaction is already terminated".to_string(),
175-
self.id(),
176-
))
177-
}
170+
self.inner
171+
.transition(DialogState::WaitAck(self.id(), resp))?;
172+
Ok(())
178173
}
179174

180175
/// Accept the incoming INVITE request with NAT-aware Contact header
@@ -272,24 +267,20 @@ impl ServerInviteDialog {
272267
return Ok(());
273268
}
274269
info!(id=%self.id(), "rejecting dialog");
275-
if let Some(sender) = self.inner.tu_sender.lock().unwrap().as_ref() {
276-
let resp = self.inner.make_response(
277-
&self.inner.initial_request,
278-
rsip::StatusCode::Decline,
279-
None,
280-
None,
281-
);
282-
sender.send(TransactionEvent::Respond(resp)).ok();
283-
self.inner.transition(DialogState::Terminated(
284-
self.id(),
285-
TerminatedReason::UasDecline,
286-
))
287-
} else {
288-
Err(crate::Error::DialogError(
289-
"transaction is already terminated".to_string(),
290-
self.id(),
291-
))
292-
}
270+
let resp = self.inner.make_response(
271+
&self.inner.initial_request,
272+
rsip::StatusCode::Decline,
273+
None,
274+
None,
275+
);
276+
self.inner
277+
.tu_sender
278+
.send(TransactionEvent::Respond(resp))
279+
.ok();
280+
self.inner.transition(DialogState::Terminated(
281+
self.id(),
282+
TerminatedReason::UasDecline,
283+
))
293284
}
294285

295286
/// Send a BYE request to terminate the dialog
@@ -551,15 +542,10 @@ impl ServerInviteDialog {
551542
} else {
552543
match tx.original.method {
553544
rsip::Method::Ack => {
554-
if let Some(sender) = self.inner.tu_sender.lock().unwrap().as_ref() {
555-
sender
556-
.send(TransactionEvent::Received(
557-
tx.original.clone().into(),
558-
tx.connection.clone(),
559-
))
560-
.ok();
561-
}
562-
return Ok(());
545+
self.inner.tu_sender.send(TransactionEvent::Received(
546+
tx.original.clone().into(),
547+
tx.connection.clone(),
548+
))?;
563549
}
564550
_ => {}
565551
}
@@ -600,16 +586,14 @@ impl ServerInviteDialog {
600586
}
601587

602588
async fn handle_invite(&mut self, tx: &mut Transaction) -> Result<()> {
603-
self.inner
604-
.tu_sender
605-
.lock()
606-
.unwrap()
607-
.replace(tx.tu_sender.clone());
608-
609589
let handle_loop = async {
610590
if !self.inner.is_confirmed() {
611-
self.inner.transition(DialogState::Calling(self.id()))?;
612-
tx.send_trying().await?;
591+
match self.inner.transition(DialogState::Calling(self.id())) {
592+
Ok(_) => {
593+
tx.send_trying().await.ok();
594+
}
595+
Err(_) => {}
596+
}
613597
}
614598

615599
while let Some(msg) = tx.receive().await {
@@ -644,11 +628,9 @@ impl ServerInviteDialog {
644628
match handle_loop.await {
645629
Ok(_) => {
646630
trace!(id = %self.id(),"process done");
647-
self.inner.tu_sender.lock().unwrap().take();
648631
Ok(())
649632
}
650633
Err(e) => {
651-
self.inner.tu_sender.lock().unwrap().take();
652634
warn!(id = %self.id(),"handle_invite error: {:?}", e);
653635
Err(e)
654636
}

src/dialog/tests/test_client_dialog.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async fn test_client_dialog_creation() -> crate::Result<()> {
5555
};
5656

5757
let invite_req = create_invite_request("alice-tag", "", "test-call-id");
58-
58+
let (tu_sender, _tu_receiver) = unbounded_channel();
5959
let dialog_inner = DialogInner::new(
6060
TransactionRole::Client,
6161
dialog_id.clone(),
@@ -64,6 +64,7 @@ async fn test_client_dialog_creation() -> crate::Result<()> {
6464
state_sender,
6565
None,
6666
Some(Uri::try_from("sip:[email protected]:5060").unwrap()),
67+
tu_sender,
6768
)?;
6869

6970
let client_dialog = ClientInviteDialog {
@@ -89,6 +90,7 @@ async fn test_client_dialog_sequence_handling() -> crate::Result<()> {
8990
};
9091

9192
let invite_req = create_invite_request("alice-tag", "bob-tag", "test-call-seq");
93+
let (tu_sender, _tu_receiver) = unbounded_channel();
9294

9395
let dialog_inner = DialogInner::new(
9496
TransactionRole::Client,
@@ -98,6 +100,7 @@ async fn test_client_dialog_sequence_handling() -> crate::Result<()> {
98100
state_sender,
99101
None,
100102
Some(Uri::try_from("sip:[email protected]:5060").unwrap()),
103+
tu_sender,
101104
)?;
102105

103106
let client_dialog = ClientInviteDialog {
@@ -127,6 +130,7 @@ async fn test_client_dialog_state_transitions() -> crate::Result<()> {
127130
};
128131

129132
let invite_req = create_invite_request("alice-tag", "", "test-call-flow");
133+
let (tu_sender, _tu_receiver) = unbounded_channel();
130134

131135
let dialog_inner = DialogInner::new(
132136
TransactionRole::Client,
@@ -136,6 +140,7 @@ async fn test_client_dialog_state_transitions() -> crate::Result<()> {
136140
state_sender,
137141
None,
138142
Some(Uri::try_from("sip:[email protected]:5060").unwrap()),
143+
tu_sender,
139144
)?;
140145

141146
let client_dialog = ClientInviteDialog {
@@ -201,6 +206,8 @@ async fn test_client_dialog_termination_scenarios() -> crate::Result<()> {
201206
};
202207

203208
let invite_req_1 = create_invite_request("alice-tag", "", "test-call-term-early");
209+
let (tu_sender, _tu_receiver) = unbounded_channel();
210+
204211
let dialog_inner_1 = DialogInner::new(
205212
TransactionRole::Client,
206213
dialog_id_1.clone(),
@@ -209,6 +216,7 @@ async fn test_client_dialog_termination_scenarios() -> crate::Result<()> {
209216
state_sender.clone(),
210217
None,
211218
Some(Uri::try_from("sip:[email protected]:5060").unwrap()),
219+
tu_sender,
212220
)?;
213221

214222
let client_dialog_1 = ClientInviteDialog {
@@ -235,6 +243,8 @@ async fn test_client_dialog_termination_scenarios() -> crate::Result<()> {
235243
};
236244

237245
let invite_req_2 = create_invite_request("alice-tag", "bob-tag", "test-call-term-normal");
246+
let (tu_sender, _tu_receiver) = unbounded_channel();
247+
238248
let dialog_inner_2 = DialogInner::new(
239249
TransactionRole::Client,
240250
dialog_id_2.clone(),
@@ -243,6 +253,7 @@ async fn test_client_dialog_termination_scenarios() -> crate::Result<()> {
243253
state_sender,
244254
None,
245255
Some(Uri::try_from("sip:[email protected]:5060").unwrap()),
256+
tu_sender,
246257
)?;
247258

248259
let client_dialog_2 = ClientInviteDialog {

src/dialog/tests/test_dialog_states.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ async fn test_dialog_state_transitions() -> crate::Result<()> {
110110

111111
// Create INVITE request
112112
let invite_req = create_invite_request("alice-tag-456", "", "test-call-id-123");
113+
let (tu_sender, _tu_receiver) = unbounded_channel();
113114

114115
// Create dialog inner
115116
let dialog_inner = DialogInner::new(
@@ -120,6 +121,7 @@ async fn test_dialog_state_transitions() -> crate::Result<()> {
120121
state_sender,
121122
None,
122123
Some(rsip::Uri::try_from("sip:[email protected]:5060")?),
124+
tu_sender,
123125
)?;
124126

125127
// Test initial state
@@ -173,6 +175,7 @@ async fn test_server_dialog_state_transitions() -> crate::Result<()> {
173175

174176
// Create INVITE request
175177
let invite_req = create_invite_request("alice-tag-456", "", "test-call-id-server-123");
178+
let (tu_sender, _tu_receiver) = unbounded_channel();
176179

177180
// Create server dialog inner
178181
let dialog_inner = DialogInner::new(
@@ -183,6 +186,7 @@ async fn test_server_dialog_state_transitions() -> crate::Result<()> {
183186
state_sender,
184187
None,
185188
Some(rsip::Uri::try_from("sip:[email protected]:5060")?),
189+
tu_sender,
186190
)?;
187191

188192
// Test initial state
@@ -229,6 +233,7 @@ async fn test_dialog_in_dialog_requests() -> crate::Result<()> {
229233
// Create initial INVITE request
230234
let invite_req =
231235
create_invite_request("alice-tag-456", "bob-tag-789", "test-call-id-in-dialog-123");
236+
let (tu_sender, _tu_receiver) = unbounded_channel();
232237

233238
// Create confirmed dialog
234239
let dialog_inner = DialogInner::new(
@@ -239,6 +244,7 @@ async fn test_dialog_in_dialog_requests() -> crate::Result<()> {
239244
state_sender,
240245
None,
241246
Some(rsip::Uri::try_from("sip:[email protected]:5060")?),
247+
tu_sender,
242248
)?;
243249

244250
// Set dialog to confirmed state
@@ -315,6 +321,8 @@ async fn test_dialog_termination_scenarios() -> crate::Result<()> {
315321
};
316322

317323
let invite_req_1 = create_invite_request("alice-tag-456", "", "test-call-id-term-1");
324+
let (tu_sender, _tu_receiver) = unbounded_channel();
325+
318326
let dialog_inner_1 = DialogInner::new(
319327
TransactionRole::Client,
320328
dialog_id_1.clone(),
@@ -323,6 +331,7 @@ async fn test_dialog_termination_scenarios() -> crate::Result<()> {
323331
state_sender.clone(),
324332
None,
325333
Some(rsip::Uri::try_from("sip:[email protected]:5060")?),
334+
tu_sender,
326335
)?;
327336

328337
// Terminate with error
@@ -344,6 +353,8 @@ async fn test_dialog_termination_scenarios() -> crate::Result<()> {
344353
};
345354

346355
let invite_req_2 = create_invite_request("alice-tag-456", "bob-tag-789", "test-call-id-term-2");
356+
let (tu_sender, _tu_receiver) = unbounded_channel();
357+
347358
let dialog_inner_2 = DialogInner::new(
348359
TransactionRole::Client,
349360
dialog_id_2.clone(),
@@ -352,6 +363,7 @@ async fn test_dialog_termination_scenarios() -> crate::Result<()> {
352363
state_sender.clone(),
353364
None,
354365
Some(rsip::Uri::try_from("sip:[email protected]:5060")?),
366+
tu_sender,
355367
)?;
356368

357369
// First confirm the dialog
@@ -381,6 +393,8 @@ async fn test_dialog_sequence_numbers() -> crate::Result<()> {
381393
};
382394

383395
let invite_req = create_invite_request("alice-tag-456", "bob-tag-789", "test-call-id-seq-123");
396+
let (tu_sender, _tu_receiver) = unbounded_channel();
397+
384398
let dialog_inner = DialogInner::new(
385399
TransactionRole::Client,
386400
dialog_id.clone(),
@@ -389,6 +403,7 @@ async fn test_dialog_sequence_numbers() -> crate::Result<()> {
389403
state_sender,
390404
None,
391405
Some(rsip::Uri::try_from("sip:[email protected]:5060")?),
406+
tu_sender,
392407
)?;
393408

394409
// Test initial sequence number

0 commit comments

Comments
 (0)