2121#include < assert.h>
2222#include < sys/time.h>
2323
24-
2524namespace holoscan ::ops {
2625
2726class AdvNetworkingRdmaOp : public Operator {
@@ -31,11 +30,12 @@ class AdvNetworkingRdmaOp : public Operator {
3130 AdvNetworkingRdmaOp () = default ;
3231
3332 ~AdvNetworkingRdmaOp () {
34- HOLOSCAN_LOG_INFO (" Finished receiver with {}/{} bytes/packets received and {}/{} bytes/packets sent" ,
35- ttl_bytes_recv_,
36- ttl_pkts_recv_,
37- ttl_bytes_sent_,
38- ttl_pkts_sent_);
33+ HOLOSCAN_LOG_INFO (
34+ " Finished receiver with {}/{} bytes/packets received and {}/{} bytes/packets sent" ,
35+ ttl_bytes_recv_,
36+ ttl_pkts_recv_,
37+ ttl_bytes_sent_,
38+ ttl_pkts_sent_);
3939
4040 HOLOSCAN_LOG_INFO (" ANO benchmark clent op shutting down" );
4141 freeResources ();
@@ -48,114 +48,92 @@ class AdvNetworkingRdmaOp : public Operator {
4848 if (send_.get ()) {
4949 send_mr_name_ = server_.get () ? " DATA_TX_CPU_SERVER" : " DATA_TX_CPU_CLIENT" ;
5050 }
51- if (receive_.get ()) {
51+ if (receive_.get ()) {
5252 receive_mr_name_ = server_.get () ? " DATA_RX_CPU_SERVER" : " DATA_RX_CPU_CLIENT" ;
5353 }
5454
55- HOLOSCAN_LOG_INFO (" AdvNetworkingRdmaOp::initialize() complete in {} mode" ,
56- server_.get () ? " server" : " client" );
55+ HOLOSCAN_LOG_INFO (" AdvNetworkingRdmaOp::initialize() complete in {} mode" ,
56+ server_.get () ? " server" : " client" );
5757 }
5858
5959 void freeResources () {
6060 HOLOSCAN_LOG_INFO (" AdvNetworkingRdmaOp::freeResources() start" );
6161 HOLOSCAN_LOG_INFO (" AdvNetworkingRdmaOp::freeResources() complete" );
6262 }
6363
64-
6564 void setup (OperatorSpec& spec) override {
66- spec.param <int >(message_size_,
67- " message_size" ,
68- " Message size" ,
69- " Message size in bytes" ,
70- 1024 );
71- spec.param <std::string>(server_addr_str_,
72- " server_address" ,
73- " Server address" ,
74- " Server address" ,
75- " 192.168.3.1" );
76- spec.param <std::string>(client_addr_str_,
77- " client_address" ,
78- " Client address" ,
79- " Client address" ,
80- " 192.168.2.1" );
81- spec.param <uint16_t >(server_port_,
82- " server_port" ,
83- " Server port" ,
84- " Server port" ,
85- 4096 );
86- spec.param <bool >(server_,
87- " server" ,
88- " Server" ,
89- " Server" ,
90- false );
91- spec.param <bool >(send_,
92- " send" ,
93- " Send" ,
94- " Send" ,
95- false );
96- spec.param <bool >(receive_,
97- " receive" ,
98- " Receive" ,
99- " Receive" ,
100- false );
65+ spec.param <int >(message_size_, " message_size" , " Message size" , " Message size in bytes" , 1024 );
66+ spec.param <std::string>(
67+ server_addr_str_, " server_address" , " Server address" , " Server address" , " 192.168.3.1" );
68+ spec.param <std::string>(
69+ client_addr_str_, " client_address" , " Client address" , " Client address" , " 192.168.2.1" );
70+ spec.param <uint16_t >(server_port_, " server_port" , " Server port" , " Server port" , 4096 );
71+ spec.param <bool >(server_, " server" , " Server" , " Server" , false );
72+ spec.param <bool >(send_, " send" , " Send" , " Send" , false );
73+ spec.param <bool >(receive_, " receive" , " Receive" , " Receive" , false );
10174 }
10275
103-
104- void compute (InputContext& op_input, OutputContext& op_output, ExecutionContext& context) override {
105- BurstParams * burst;
76+ void compute (InputContext& op_input, OutputContext& op_output,
77+ ExecutionContext& context) override {
78+ BurstParams* burst;
10679
10780 // Establish connection. If we're a client we connect to the server. If we're a server we ask
10881 // for a connection ID from the ANO.
10982 if (conn_id_ == 0 ) {
11083 if (!server_.get ()) {
111- HOLOSCAN_LOG_INFO (" Connecting to server at {}:{}" , server_addr_str_.get (), server_port_.get ());
112- auto res = rdma_connect_to_server (server_addr_str_.get (), server_port_.get (), client_addr_str_.get (), &conn_id_);
84+ HOLOSCAN_LOG_INFO (
85+ " Connecting to server at {}:{}" , server_addr_str_.get (), server_port_.get ());
86+ auto res = rdma_connect_to_server (
87+ server_addr_str_.get (), server_port_.get (), client_addr_str_.get (), &conn_id_);
11388 if (res != Status::SUCCESS) {
11489 HOLOSCAN_LOG_CRITICAL (" Failed to connect to server: {}" , (int )res);
11590 conn_id_ = 0 ;
11691 return ;
92+ } else {
93+ HOLOSCAN_LOG_INFO (" Connected to server at {}:{} with ID: {}" ,
94+ server_addr_str_.get (),
95+ server_port_.get (),
96+ (void *)conn_id_);
11797 }
118- else {
119- HOLOSCAN_LOG_INFO (" Connected to server at {}:{} with ID: {}" , server_addr_str_.get (), server_port_.get (), (void *)conn_id_);
120- }
121- }
122- else {
98+ } else {
12399 auto ret = rdma_get_server_conn_id (server_addr_str_.get (), server_port_.get (), &conn_id_);
124100 if (ret != Status::SUCCESS) {
125101 HOLOSCAN_LOG_INFO (" Server connection ID not ready" );
126102 sleep (1 );
127103 return ;
128- }
129- else {
104+ } else {
130105 HOLOSCAN_LOG_INFO (" Server connection ID: {}" , (void *)conn_id_);
131106 }
132107 }
133108 }
134109
135110 // SEND and RECEIVE use almost the same code, so we can use a lambda to handle both
136- auto process_post_msg = [&](int &completion_cnt, uint64_t &wr_id, RDMAOpCode opcode, const std::string &mr_name) {
137- if (completion_cnt < MAX_OUTSTANDING_COMPLETIONS) {
138- auto msg = create_burst_params ();
111+ auto process_post_msg =
112+ [&](int & completion_cnt, uint64_t & wr_id, RDMAOpCode opcode, const std::string& mr_name) {
113+ if (completion_cnt < MAX_OUTSTANDING_COMPLETIONS) {
114+ auto msg = create_burst_params ();
139115
140- Status ret = rdma_set_header (msg, opcode, conn_id_, server_.get (), 1 , wr_id, mr_name.c_str ());
116+ Status ret =
117+ rdma_set_header (msg, opcode, conn_id_, server_.get (), 1 , wr_id, mr_name.c_str ());
141118
142- while ((ret = get_tx_packet_burst (msg)) != Status::SUCCESS) {}
119+ while ((ret = get_tx_packet_burst (msg)) != Status::SUCCESS) {}
143120
144- // Set the length the same as the buffer size
145- set_packet_lengths (msg, 0 , {message_size_.get ()});
146- send_tx_burst (msg);
121+ // Set the length the same as the buffer size
122+ set_packet_lengths (msg, 0 , {message_size_.get ()});
123+ send_tx_burst (msg);
147124
148- completion_cnt++;
149- wr_id++;
150- }
151- };
125+ completion_cnt++;
126+ wr_id++;
127+ }
128+ };
152129
153130 if (send_.get ()) {
154131 process_post_msg (outstanding_send_completions, send_wr_id, RDMAOpCode::SEND, send_mr_name_);
155132 }
156133
157134 if (receive_.get ()) {
158- process_post_msg (outstanding_receive_completions, receive_wr_id, RDMAOpCode::RECEIVE, receive_mr_name_);
135+ process_post_msg (
136+ outstanding_receive_completions, receive_wr_id, RDMAOpCode::RECEIVE, receive_mr_name_);
159137 }
160138
161139 // Process any completions
@@ -164,8 +142,7 @@ class AdvNetworkingRdmaOp : public Operator {
164142 outstanding_receive_completions--;
165143 ttl_bytes_recv_ += get_packet_length (burst, 0 );
166144 ttl_pkts_recv_++;
167- }
168- else if (rdma_get_opcode (burst) == RDMAOpCode::SEND) {
145+ } else if (rdma_get_opcode (burst) == RDMAOpCode::SEND) {
169146 outstanding_send_completions--;
170147 ttl_bytes_sent_ += get_packet_length (burst, 0 );
171148 ttl_pkts_sent_++;
@@ -174,7 +151,9 @@ class AdvNetworkingRdmaOp : public Operator {
174151 uint64_t received_wr_id = burst->rdma_hdr .wr_id ;
175152
176153 if (burst->rdma_hdr .status != Status::SUCCESS) {
177- HOLOSCAN_LOG_ERROR (" Received completion for WR ID: {} with status: {}" , received_wr_id, (int )burst->rdma_hdr .status );
154+ HOLOSCAN_LOG_ERROR (" Received completion for WR ID: {} with status: {}" ,
155+ received_wr_id,
156+ (int )burst->rdma_hdr .status );
178157 }
179158
180159 free_tx_burst (burst);
@@ -189,18 +168,18 @@ class AdvNetworkingRdmaOp : public Operator {
189168 int outstanding_receive_completions = 0 ;
190169 uint64_t send_wr_id = 0x1234 ;
191170 uint64_t receive_wr_id = 0x2345 ;
192- int64_t ttl_bytes_recv_ = 0 ; // Total bytes received in operator
193- int64_t ttl_pkts_recv_ = 0 ; // Total packets received in operator
194- int64_t ttl_bytes_sent_ = 0 ; // Total bytes sent in operator
195- int64_t ttl_pkts_sent_ = 0 ; // Total packets sent in operator
171+ int64_t ttl_bytes_recv_ = 0 ; // Total bytes received in operator
172+ int64_t ttl_pkts_recv_ = 0 ; // Total packets received in operator
173+ int64_t ttl_bytes_sent_ = 0 ; // Total bytes sent in operator
174+ int64_t ttl_pkts_sent_ = 0 ; // Total packets sent in operator
196175 uintptr_t conn_id_ = 0 ;
197176 Parameter<bool > server_;
198- Parameter<int > message_size_; // Message size in bytes
199- Parameter<std::string> server_addr_str_; // Server address
200- Parameter<std::string> client_addr_str_; // Client address
201- Parameter<uint16_t > server_port_; // Server port
177+ Parameter<int > message_size_; // Message size in bytes
178+ Parameter<std::string> server_addr_str_; // Server address
179+ Parameter<std::string> client_addr_str_; // Client address
180+ Parameter<uint16_t > server_port_; // Server port
202181 Parameter<bool > send_;
203182 Parameter<bool > receive_;
204183};
205184
206- } // namespace holoscan::ops
185+ } // namespace holoscan::ops
0 commit comments