1- use anyhow:: { Result , bail} ;
1+ use anyhow:: { Context , Result , bail} ;
22use backoff:: ExponentialBackoffBuilder ;
33use backoff:: backoff:: Backoff ;
4+ use base64:: Engine ;
45use futures_util:: stream:: { SplitSink , SplitStream } ;
56use futures_util:: { SinkExt , StreamExt } ;
67use http:: HeaderValue ;
@@ -9,32 +10,151 @@ use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
910use std:: sync:: Arc ;
1011use std:: sync:: atomic:: { AtomicBool , Ordering } ;
1112use std:: time:: { Duration , Instant } ;
13+ use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
1214use tokio:: net:: TcpStream ;
1315use tokio:: select;
1416use tokio:: sync:: broadcast;
1517use tokio_tungstenite:: tungstenite:: client:: IntoClientRequest ;
1618use tokio_tungstenite:: {
17- MaybeTlsStream , WebSocketStream , connect_async_with_config,
19+ MaybeTlsStream , WebSocketStream , client_async , connect_async_with_config,
1820 tungstenite:: Message as TungsteniteMessage ,
1921} ;
2022use url:: Url ;
2123
2224type RelayerWsSender = SplitSink < WebSocketStream < MaybeTlsStream < TcpStream > > , TungsteniteMessage > ;
2325type RelayerWsReceiver = SplitStream < WebSocketStream < MaybeTlsStream < TcpStream > > > ;
2426
25- async fn connect_to_relayer ( url : Url , token : & str ) -> Result < ( RelayerWsSender , RelayerWsReceiver ) > {
26- tracing:: info!( "connecting to the relayer at {}" , url) ;
27- let mut req = url. clone ( ) . into_client_request ( ) ?;
27+ async fn connect_through_proxy (
28+ proxy_url : & Url ,
29+ target_url : & Url ,
30+ token : & str ,
31+ ) -> Result < ( RelayerWsSender , RelayerWsReceiver ) > {
32+ tracing:: info!(
33+ "connecting to the relayer at {} via proxy {}" ,
34+ target_url,
35+ proxy_url
36+ ) ;
37+
38+ let proxy_host = proxy_url. host_str ( ) . context ( "Proxy URL must have a host" ) ?;
39+ let proxy_port = proxy_url
40+ . port ( )
41+ . unwrap_or ( if proxy_url. scheme ( ) == "https" {
42+ 443
43+ } else {
44+ 80
45+ } ) ;
46+
47+ let proxy_addr = format ! ( "{}:{}" , proxy_host, proxy_port) ;
48+ let mut stream = TcpStream :: connect ( & proxy_addr)
49+ . await
50+ . context ( format ! ( "Failed to connect to proxy at {}" , proxy_addr) ) ?;
51+
52+ let target_host = target_url
53+ . host_str ( )
54+ . context ( "Target URL must have a host" ) ?;
55+ let target_port = target_url
56+ . port ( )
57+ . unwrap_or ( if target_url. scheme ( ) == "wss" {
58+ 443
59+ } else {
60+ 80
61+ } ) ;
62+
63+ let mut connect_request = format ! (
64+ "CONNECT {}:{} HTTP/1.1\r \n Host: {}:{}\r \n " ,
65+ target_host, target_port, target_host, target_port
66+ ) ;
67+
68+ let username = proxy_url. username ( ) ;
69+ if !username. is_empty ( ) {
70+ let password = proxy_url. password ( ) . unwrap_or ( "" ) ;
71+ let credentials = format ! ( "{}:{}" , username, password) ;
72+ let encoded = base64:: engine:: general_purpose:: STANDARD . encode ( credentials. as_bytes ( ) ) ;
73+ connect_request = format ! (
74+ "{}Proxy-Authorization: Basic {}\r \n " ,
75+ connect_request, encoded
76+ ) ;
77+ }
78+
79+ connect_request = format ! ( "{}\r \n " , connect_request) ;
80+
81+ stream
82+ . write_all ( connect_request. as_bytes ( ) )
83+ . await
84+ . context ( "Failed to send CONNECT request to proxy" ) ?;
85+
86+ let mut response = vec ! [ 0u8 ; 1024 ] ;
87+ let n = stream
88+ . read ( & mut response)
89+ . await
90+ . context ( "Failed to read CONNECT response from proxy" ) ?;
91+
92+ let response_str = String :: from_utf8_lossy ( & response[ ..n] ) ;
93+
94+ if !response_str. starts_with ( "HTTP/1.1 200" ) && !response_str. starts_with ( "HTTP/1.0 200" ) {
95+ bail ! (
96+ "Proxy CONNECT failed: {}" ,
97+ response_str. lines( ) . next( ) . unwrap_or( "Unknown error" )
98+ ) ;
99+ }
100+
101+ tracing:: info!( "Successfully connected through proxy" ) ;
102+
103+ let mut req = target_url. clone ( ) . into_client_request ( ) ?;
28104 let headers = req. headers_mut ( ) ;
29105 headers. insert (
30106 "Authorization" ,
31- HeaderValue :: from_str ( & format ! ( "Bearer {token}" ) ) ?,
107+ HeaderValue :: from_str ( & format ! ( "Bearer {}" , token) ) ?,
108+ ) ;
109+
110+ let maybe_tls_stream = if target_url. scheme ( ) == "wss" {
111+ let tls_connector = tokio_native_tls:: native_tls:: TlsConnector :: builder ( )
112+ . build ( )
113+ . context ( "Failed to build TLS connector" ) ?;
114+ let tokio_connector = tokio_native_tls:: TlsConnector :: from ( tls_connector) ;
115+ let domain = target_host;
116+ let tls_stream = tokio_connector
117+ . connect ( domain, stream)
118+ . await
119+ . context ( "Failed to establish TLS connection" ) ?;
120+
121+ MaybeTlsStream :: NativeTls ( tls_stream)
122+ } else {
123+ MaybeTlsStream :: Plain ( stream)
124+ } ;
125+
126+ let ( ws_stream, _) = client_async ( req, maybe_tls_stream)
127+ . await
128+ . context ( "Failed to complete WebSocket handshake" ) ?;
129+
130+ tracing:: info!(
131+ "WebSocket connection established to relayer at {}" ,
132+ target_url
32133 ) ;
33- let ( ws_stream, _) = connect_async_with_config ( req, None , true ) . await ?;
34- tracing:: info!( "connected to the relayer at {}" , url) ;
35134 Ok ( ws_stream. split ( ) )
36135}
37136
137+ async fn connect_to_relayer (
138+ url : Url ,
139+ token : & str ,
140+ proxy_url : Option < & Url > ,
141+ ) -> Result < ( RelayerWsSender , RelayerWsReceiver ) > {
142+ if let Some ( proxy) = proxy_url {
143+ connect_through_proxy ( proxy, & url, token) . await
144+ } else {
145+ tracing:: info!( "connecting to the relayer at {}" , url) ;
146+ let mut req = url. clone ( ) . into_client_request ( ) ?;
147+ let headers = req. headers_mut ( ) ;
148+ headers. insert (
149+ "Authorization" ,
150+ HeaderValue :: from_str ( & format ! ( "Bearer {token}" ) ) ?,
151+ ) ;
152+ let ( ws_stream, _) = connect_async_with_config ( req, None , true ) . await ?;
153+ tracing:: info!( "connected to the relayer at {}" , url) ;
154+ Ok ( ws_stream. split ( ) )
155+ }
156+ }
157+
38158struct RelayerWsSession {
39159 ws_sender : RelayerWsSender ,
40160}
@@ -58,11 +178,11 @@ impl RelayerWsSession {
58178}
59179
60180pub struct RelayerSessionTask {
61- // connection state
62181 pub url : Url ,
63182 pub token : String ,
64183 pub receiver : broadcast:: Receiver < SignedLazerTransaction > ,
65184 pub is_ready : Arc < AtomicBool > ,
185+ pub proxy_url : Option < Url > ,
66186}
67187
68188impl RelayerSessionTask {
@@ -108,10 +228,8 @@ impl RelayerSessionTask {
108228 }
109229
110230 pub async fn run_relayer_connection ( & mut self ) -> Result < ( ) > {
111- // Establish relayer connection
112- // Relayer will drop the connection if no data received in 5s
113231 let ( relayer_ws_sender, mut relayer_ws_receiver) =
114- connect_to_relayer ( self . url . clone ( ) , & self . token ) . await ?;
232+ connect_to_relayer ( self . url . clone ( ) , & self . token , self . proxy_url . as_ref ( ) ) . await ?;
115233 let mut relayer_ws_session = RelayerWsSession {
116234 ws_sender : relayer_ws_sender,
117235 } ;
@@ -236,11 +354,11 @@ mod tests {
236354 let ( relayer_sender, relayer_receiver) = broadcast:: channel ( RELAYER_CHANNEL_CAPACITY ) ;
237355
238356 let mut relayer_session_task = RelayerSessionTask {
239- // connection state
240357 url : Url :: parse ( "ws://127.0.0.1:12346" ) . unwrap ( ) ,
241358 token : "token1" . to_string ( ) ,
242359 receiver : relayer_receiver,
243360 is_ready : Arc :: new ( AtomicBool :: new ( false ) ) ,
361+ proxy_url : None ,
244362 } ;
245363 tokio:: spawn ( async move { relayer_session_task. run ( ) . await } ) ;
246364 tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 1000 ) ) . await ;
0 commit comments