11use clap:: Parser ;
2- use play_file:: { build_rtp_conn, play_example_file } ;
2+ use play_file:: { build_rtp_conn, play_audio_file } ;
33use rsip:: prelude:: HeadersExt ;
44use rsip:: typed:: MediaType ;
55use rsipstack:: dialog:: dialog:: { Dialog , DialogState , DialogStateReceiver , DialogStateSender } ;
@@ -14,19 +14,19 @@ use rsipstack::{
1414 transport:: { udp:: UdpConnection , TransportLayer } ,
1515 EndpointBuilder , Error ,
1616} ;
17+ use std:: net:: IpAddr ;
1718use std:: { env, sync:: Arc , time:: Duration } ;
1819use tokio:: sync:: mpsc:: unbounded_channel;
1920use tokio:: { select, time:: sleep} ;
2021use tokio_util:: sync:: CancellationToken ;
21- use tracing:: { debug, info} ;
22+ use tracing:: { debug, error , info} ;
2223
2324use crate :: play_file:: play_echo;
2425mod play_file;
25- mod stun;
2626#[ derive( Debug , Clone ) ]
2727struct MediaSessionOption {
28- pub stun : bool ,
29- pub stun_server : Option < String > ,
28+ pub auto_answer : bool ,
29+ pub cancel_token : CancellationToken ,
3030 pub external_ip : Option < String > ,
3131 pub rtp_start_port : u16 ,
3232 pub echo : bool ,
@@ -63,28 +63,26 @@ struct Args {
6363 #[ arg( long) ]
6464 password : Option < String > ,
6565
66- #[ arg( long, default_value = "restsend.com:3478" ) ]
67- stun_server : Option < String > ,
68-
69- #[ arg( long) ]
70- stun : bool ,
71-
7266 #[ arg( long) ]
7367 call : Option < String > ,
7468
7569 #[ arg( long) ]
7670 reject : bool ,
71+
72+ #[ arg( long) ]
73+ auto_answer : bool ,
7774}
7875
79- async fn handle_user_input ( cancel_token : CancellationToken ) -> Result < ( ) > {
76+ async fn handle_user_input (
77+ cancel_token : CancellationToken ,
78+ answer_sender : tokio:: sync:: mpsc:: UnboundedSender < String > ,
79+ ) -> Result < ( ) > {
8080 use tokio:: io:: { AsyncBufReadExt , BufReader } ;
8181
8282 let stdin = tokio:: io:: stdin ( ) ;
8383 let reader = BufReader :: new ( stdin) ;
8484 let mut lines = reader. lines ( ) ;
8585
86- info ! ( "Press 'q' and Enter to hang up current call" ) ;
87-
8886 loop {
8987 select ! {
9088 line = lines. next_line( ) => {
@@ -96,6 +94,8 @@ async fn handle_user_input(cancel_token: CancellationToken) -> Result<()> {
9694 cancel_token. cancel( ) ;
9795 info!( "Cancelled all dialogs" ) ;
9896 break ;
97+ } else if input == "a" || input == "r" {
98+ answer_sender. send( input. to_string( ) ) . expect( "send answer" ) ;
9999 }
100100 } ,
101101 Ok ( None ) => {
@@ -116,7 +116,17 @@ async fn handle_user_input(cancel_token: CancellationToken) -> Result<()> {
116116 }
117117 Ok ( ( ) )
118118}
119-
119+ pub fn get_first_non_loopback_interface ( ) -> Result < IpAddr > {
120+ for i in get_if_addrs:: get_if_addrs ( ) ? {
121+ if !i. is_loopback ( ) {
122+ match i. addr {
123+ get_if_addrs:: IfAddr :: V4 ( ref addr) => return Ok ( std:: net:: IpAddr :: V4 ( addr. ip ) ) ,
124+ _ => continue ,
125+ }
126+ }
127+ }
128+ Err ( Error :: Error ( "No IPV4 interface found" . to_string ( ) ) )
129+ }
120130// A sip client example, that sends a REGISTER request to a sip server.
121131#[ tokio:: main]
122132async fn main ( ) -> rsipstack:: Result < ( ) > {
@@ -152,15 +162,15 @@ async fn main() -> rsipstack::Result<()> {
152162 . password
153163 . unwrap_or ( env:: var ( "SIP_PASSWORD" ) . unwrap_or_default ( ) ) ;
154164
165+ let token = CancellationToken :: new ( ) ;
155166 let opt = MediaSessionOption {
156- stun : args. stun ,
157- stun_server : args. stun_server . clone ( ) ,
167+ cancel_token : token. clone ( ) ,
158168 external_ip : args. external_ip . clone ( ) ,
159169 rtp_start_port : args. rtp_start_port ,
160170 echo : args. echo ,
171+ auto_answer : args. auto_answer ,
161172 } ;
162173
163- let token = CancellationToken :: new ( ) ;
164174 let transport_layer = TransportLayer :: new ( token. clone ( ) ) ;
165175
166176 let external_ip = args
@@ -173,23 +183,14 @@ async fn main() -> rsipstack::Result<()> {
173183 Some ( format ! ( "{}:{}" , external_ip, args. port) . parse ( ) ?)
174184 } ;
175185
176- let addr = stun :: get_first_non_loopback_interface ( ) . expect ( "get first non loopback interface" ) ;
177- let mut connection = UdpConnection :: create_connection (
186+ let addr = get_first_non_loopback_interface ( ) . expect ( "get first non loopback interface" ) ;
187+ let connection = UdpConnection :: create_connection (
178188 format ! ( "{}:{}" , addr, args. port) . parse ( ) ?,
179189 external. clone ( ) ,
180190 Some ( token. child_token ( ) ) ,
181191 )
182192 . await ?;
183193
184- if external. is_none ( ) && args. stun {
185- if let Some ( server) = args. stun_server {
186- match stun:: external_by_stun ( & mut connection, & server, Duration :: from_secs ( 5 ) ) . await {
187- Ok ( socket) => info ! ( "external IP: {:?}" , socket) ,
188- Err ( e) => info ! ( "Failed to get external IP, stunserver {} : {:?}" , server, e) ,
189- }
190- }
191- }
192-
193194 transport_layer. add_transport ( connection. into ( ) ) ;
194195
195196 let endpoint = EndpointBuilder :: new ( )
@@ -380,7 +381,7 @@ async fn process_dialog(
380381 // play example pcmu of handling incoming call
381382 //
382383 // [A] Ai answer, [R] Reject, [E] Play example pcmu
383- play_example_pcmu ( & opt, d) . await ?;
384+ process_invite ( & opt, d) . await ?;
384385 }
385386 Dialog :: ClientInvite ( _) => {
386387 info ! ( "Client invite dialog {}" , id) ;
@@ -457,14 +458,23 @@ async fn make_call(
457458 . and_then ( |m| m. media . fmt . parse :: < u8 > ( ) . ok ( ) )
458459 . unwrap_or ( 0 ) ;
459460 info ! ( "Peer address: {} payload_type:{}" , peer_addr, payload_type) ;
460- play_example_file ( rtp_conn, rtp_token, ssrc, peer_addr, payload_type)
461- . await
462- . expect ( "play example file" ) ;
461+ play_audio_file (
462+ rtp_conn,
463+ rtp_token,
464+ ssrc,
465+ "example" ,
466+ 0 ,
467+ 1 ,
468+ peer_addr,
469+ payload_type,
470+ )
471+ . await
472+ . expect ( "play example file" ) ;
463473 dialog. bye ( ) . await . expect ( "send BYE" ) ;
464474 Ok ( ( ) )
465475}
466476
467- async fn play_example_pcmu ( opt : & MediaSessionOption , dialog : ServerInviteDialog ) -> Result < ( ) > {
477+ async fn process_invite ( opt : & MediaSessionOption , dialog : ServerInviteDialog ) -> Result < ( ) > {
468478 let ssrc = rand:: random :: < u32 > ( ) ;
469479
470480 let body = String :: from_utf8_lossy ( dialog. initial_request ( ) . body ( ) ) . to_string ( ) ;
@@ -499,34 +509,98 @@ async fn play_example_pcmu(opt: &MediaSessionOption, dialog: ServerInviteDialog)
499509
500510 let ( conn, answer) = build_rtp_conn ( opt, ssrc, payload_type) . await ?;
501511
502- let headers = vec ! [ rsip:: typed:: ContentType ( MediaType :: Sdp ( vec![ ] ) ) . into( ) ] ;
503- dialog. accept ( Some ( headers) , Some ( answer. into ( ) ) ) ?;
512+ let ( answer_sender, mut answer_receiver) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
513+ if opt. auto_answer {
514+ let headers = vec ! [ rsip:: typed:: ContentType ( MediaType :: Sdp ( vec![ ] ) ) . into( ) ] ;
515+ dialog. accept ( Some ( headers) , Some ( answer. clone ( ) . into ( ) ) ) ?;
516+ answer_sender. send ( "a" . to_string ( ) ) . expect ( "send answer" ) ;
504517
505- info ! (
506- "Accepted call with answer SDP peer address: {} port: {} payload_type: {}" ,
507- peer_addr, peer_port, payload_type
508- ) ;
518+ info ! (
519+ "Accepted call with answer SDP peer address: {} port: {} payload_type: {}" ,
520+ peer_addr, peer_port, payload_type
521+ ) ;
522+ } else {
523+ let headers = vec ! [ rsip:: typed:: ContentType ( MediaType :: Sdp ( vec![ ] ) ) . into( ) ] ;
524+ dialog. ringing ( Some ( headers) , Some ( answer. clone ( ) . into ( ) ) ) ?;
525+ }
509526
510527 let peer_addr = format ! ( "{}:{}" , peer_addr, peer_port) ;
511528 let rtp_token = dialog. cancel_token ( ) . child_token ( ) ;
512529 let echo = opt. echo ;
530+ let answered = opt. auto_answer ;
513531
514532 tokio:: spawn ( async move {
515533 let input_token = CancellationToken :: new ( ) ;
516534 select ! {
517- _ = handle_user_input( input_token) => {
535+ _ = handle_user_input( input_token, answer_sender ) => {
518536 info!( "user input handler finished" ) ;
519537 }
520538 _ = async {
539+ let mut ts = 0 ;
540+ let mut seq = 1 ;
541+ let mut rejected = false ;
542+
543+ println!( "\x1b [32mPress 'a' to answer, 'r' to reject, or 'q' to quit.\x1b [0m" ) ;
544+ if !answered {
545+ let ringback_token = rtp_token. child_token( ) ;
546+ let ( pos, _) = tokio:: join!(
547+ play_audio_file(
548+ conn. clone( ) ,
549+ ringback_token. clone( ) ,
550+ ssrc,
551+ "ringback" ,
552+ ts,
553+ seq,
554+ peer_addr. clone( ) ,
555+ payload_type
556+ ) ,
557+ async {
558+ let r = answer_receiver. recv( ) . await . unwrap_or_default( ) ;
559+ ringback_token. cancel( ) ;
560+
561+ if r == "a" {
562+ info!( "User answered the call" ) ;
563+ } else if r == "r" {
564+ info!( "User rejected the call" ) ;
565+ dialog. reject( ) . ok( ) ;
566+ rejected = true ;
567+ return ;
568+ } else {
569+ info!( "Unknown command: {}" , r) ;
570+ return ;
571+ }
572+ }
573+ ) ;
574+ match pos {
575+ Ok ( ( t, s) ) => {
576+ ts = t;
577+ seq = s;
578+ }
579+ Err ( e) => {
580+ info!( "Failed to play ringback: {:?}" , e) ;
581+ }
582+ }
583+ }
584+ if rejected {
585+ return ;
586+ }
587+ let headers = vec![ rsip:: typed:: ContentType ( MediaType :: Sdp ( vec![ ] ) ) . into( ) ] ;
588+ match dialog. accept( Some ( headers) , Some ( answer. clone( ) . into( ) ) ) {
589+ Ok ( _) => info!( "Accepted call with answer SDP peer address: {} port: {} payload_type: {}" , peer_addr, peer_port, payload_type) ,
590+ Err ( e) => {
591+ error!( "Failed to accept call: {:?}" , e) ;
592+ return ;
593+ }
594+ }
521595 if echo {
522596 play_echo( conn, rtp_token) . await . expect( "play echo" ) ;
523597 } else {
524- play_example_file ( conn, rtp_token, ssrc, peer_addr, payload_type)
598+ play_audio_file ( conn, rtp_token, ssrc, "example" , ts , seq , peer_addr, payload_type)
525599 . await
526600 . expect( "play example file" ) ;
527601 }
528602 } => {
529- info!( "play example finished" ) ;
603+ info!( "answer receiver finished" ) ;
530604 }
531605 }
532606 dialog. bye ( ) . await . expect ( "send BYE" ) ;
0 commit comments