22
33use std:: io:: { Error , ErrorKind } ;
44
5- use log:: { info, trace, warn} ;
5+ use log:: { error , info, trace, warn} ;
66use tokio:: { net:: TcpStream , time:: timeout} ;
77use uuid:: Uuid ;
88
99use crate :: {
10- config:: G_CFG ,
10+ config:: { Link , G_CFG } ,
1111 share:: { proxy, FrameStream , Message , NETWORK_TIMEOUT } ,
1212} ;
1313
1414/// run a client
15- pub async fn run ( ) -> Result < ( ) , Error > {
16- let stream = connect_with_timeout (
17- & G_CFG . get ( ) . unwrap ( ) . link . remote_host ,
18- G_CFG . get ( ) . unwrap ( ) . contrl_port ,
19- )
20- . await ?;
21- let mut frame_stream = FrameStream :: new ( stream) ;
15+ pub async fn run ( ) {
16+ let links = & G_CFG . get ( ) . unwrap ( ) . links ;
17+ let port = G_CFG . get ( ) . unwrap ( ) . port ;
18+ let mut joins = Vec :: new ( ) ;
19+ for link in links. iter ( ) {
20+ let join = tokio:: spawn ( async move {
21+ let stream = connect_with_timeout ( & link. remote_host , port) . await ;
22+ if stream. is_err ( ) {
23+ error ! ( "Failed to connect to remote host:{}" , stream. unwrap_err( ) ) ;
24+ return ;
25+ }
26+ let stream = stream. unwrap ( ) ;
27+ let mut frame_stream = FrameStream :: new ( stream) ;
2228
23- auth ( & mut frame_stream) . await ? ;
29+ let ret = auth ( & mut frame_stream) . await ;
2430
25- init_port ( & mut frame_stream) . await ?;
31+ if let Err ( e) = ret {
32+ error ! ( "auth failed:{e}" ) ;
33+ return ;
34+ }
2635
27- loop {
28- // sure connection is established
29- frame_stream. send ( & Message :: Heartbeat ) . await ?;
36+ let ret = init_port ( & mut frame_stream, link) . await ;
3037
31- let msg = frame_stream . recv_timeout ( ) . await ;
32- if msg . is_err ( ) {
33- continue ;
34- }
38+ if let Err ( e ) = ret {
39+ error ! ( "init port failed:{e}" ) ;
40+ return ;
41+ }
3542
36- match msg. unwrap ( ) {
37- Message :: InitPort ( _) => info ! ( "unexpected init" ) ,
38- Message :: Auth ( _) => warn ! ( "unexpected auth" ) ,
39- Message :: Heartbeat => trace ! ( "server check heartbeat" ) ,
40- Message :: Error ( e) => return Err ( Error :: new ( ErrorKind :: Other , e) ) ,
41- Message :: Connect ( id) => {
42- tokio:: spawn ( async move {
43- info ! ( "new connection" ) ;
44- match handle_proxy_connection ( id) . await {
45- Ok ( _) => info ! ( "connection exited" ) ,
46- Err ( err) => warn ! ( "connection exited with error {}" , err) ,
43+ loop {
44+ // sure connection is established
45+ let ret = frame_stream. send ( & Message :: Heartbeat ) . await ;
46+
47+ if let Err ( e) = ret {
48+ error ! ( "heartbeat failed:{e}" ) ;
49+ return ;
50+ }
51+
52+ let msg = frame_stream. recv_timeout ( ) . await ;
53+ if msg. is_err ( ) {
54+ continue ;
55+ }
56+
57+ match msg. unwrap ( ) {
58+ Message :: InitPort ( _) => info ! ( "unexpected init" ) ,
59+ Message :: Auth ( _) => warn ! ( "unexpected auth" ) ,
60+ Message :: Heartbeat => trace ! ( "server check heartbeat" ) ,
61+ Message :: Error ( e) => {
62+ error ! ( "{}" , e) ;
63+ return ;
4764 }
48- } ) ;
65+ Message :: Connect ( id) => {
66+ tokio:: spawn ( async move {
67+ info ! ( "new connection" ) ;
68+ match handle_proxy_connection ( id, link) . await {
69+ Ok ( _) => info ! ( "connection exited" ) ,
70+ Err ( err) => warn ! ( "connection exited with error {}" , err) ,
71+ }
72+ } ) ;
73+ }
74+ }
4975 }
50- }
76+ } ) ;
77+
78+ joins. push ( join) ;
79+ }
80+ for join in joins {
81+ let _ = join. await ;
5182 }
5283}
5384
@@ -70,9 +101,7 @@ async fn auth(frame_stream: &mut FrameStream) -> Result<(), Error> {
70101}
71102
72103/// send and recv InitPort message with server
73- async fn init_port ( frame_stream : & mut FrameStream ) -> Result < ( ) , Error > {
74- let link = & G_CFG . get ( ) . unwrap ( ) . link ;
75-
104+ async fn init_port ( frame_stream : & mut FrameStream , link : & Link ) -> Result < ( ) , Error > {
76105 frame_stream
77106 . send ( & Message :: InitPort ( link. remote_port ) )
78107 . await ?;
@@ -100,10 +129,8 @@ async fn connect_with_timeout(addr: &str, port: u16) -> Result<TcpStream, Error>
100129}
101130
102131/// deal connection from server proxy port
103- async fn handle_proxy_connection ( id : Uuid ) -> Result < ( ) , Error > {
104- let link = & G_CFG . get ( ) . unwrap ( ) . link ;
105-
106- let stream = connect_with_timeout ( & link. remote_host , G_CFG . get ( ) . unwrap ( ) . contrl_port ) . await ?;
132+ async fn handle_proxy_connection ( id : Uuid , link : & Link ) -> Result < ( ) , Error > {
133+ let stream = connect_with_timeout ( & link. remote_host , G_CFG . get ( ) . unwrap ( ) . port ) . await ?;
107134 let mut frame_stream = FrameStream :: new ( stream) ;
108135
109136 auth ( & mut frame_stream) . await ?;
0 commit comments