Skip to content

Commit 3533704

Browse files
issue: 4392235 Fixing corruption on full SQ completion
We tracked the index of last cmpleted WQE, then on each CQE we iterate from the CQE index back until we reach last completed WQE. However, since this WQE is considered free more than one WQE can be written in its space especially when we fill SQ to maximum. Consider the following state: | WQE3 | last_signalled (4 wqebb) | WQE1 | WQE2 | Now since last_signalled space is considered free we may add more WQE, for instance, | WQE3 | WQE4 (2wqebb, last_signalled_ptr) | WQE5 (1 wqebb) | WQE6 (1 wqebb) | WQE1 | WQE2 | Now if we get completion for WQE6, we would complete untill last_signalled (WQE6 and WQE5), leaking the completions for WQE1,2,3,4. This results in credits and resource leakage. To resolve this issue, we replace the last_signalled with last_to_complete and keep the size of wqe in wqebbs per each sq-prop. last_to_complete always points to the left most uncompleted wqe. Given the example above, last_to_complete will point to WQE1. After completion loop last_to_complete will point to the next WQE index to be completed being it already psted or not. Given | WQE1 (last_to_complete) | WQE2 | WQE3 |, on CQE for WQE3, we complete WQE3,WQE2,WQE1 and move last_to_complete to WQE4. We can do this since we know the size of each WQE in wqebbs. Removing unused reset_inflight_zc_buffers_ctx. Signed-off-by: Alexander Grissik <[email protected]>
1 parent c570813 commit 3533704

File tree

9 files changed

+242
-57
lines changed

9 files changed

+242
-57
lines changed

src/core/dev/cq_mgr_tx.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,11 @@ void cq_mgr_tx::handle_sq_wqe_prop(unsigned index)
286286

287287
prev = p;
288288
p = p->next;
289-
} while (p && m_hqtx_ptr->is_sq_wqe_prop_valid(p, prev));
289+
} while (prev != m_hqtx_ptr->m_last_sq_wqe_prop_to_complete);
290290

291291
m_p_ring->return_tx_pool_to_global_pool();
292292
m_hqtx_ptr->credits_return(credits);
293-
m_hqtx_ptr->m_sq_wqe_prop_last_signalled = index;
293+
m_hqtx_ptr->m_last_sq_wqe_prop_to_complete =
294+
&m_hqtx_ptr->m_sq_wqe_idx_to_prop[(index + m_hqtx_ptr->m_sq_wqe_idx_to_prop[index].wqebbs) %
295+
m_hqtx_ptr->m_tx_num_wr];
294296
}

src/core/dev/hw_queue_tx.cpp

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ void hw_queue_tx::init_queue()
440440
hwqtx_logerr("Failed allocating m_sq_wqe_idx_to_prop (errno=%d %m)", errno);
441441
return;
442442
}
443-
m_sq_wqe_prop_last_signalled = m_tx_num_wr - 1;
443+
m_last_sq_wqe_prop_to_complete = m_sq_wqe_idx_to_prop;
444444
m_sq_wqe_prop_last = nullptr;
445445
}
446446

@@ -790,6 +790,7 @@ inline void hw_queue_tx::submit_wqe(mem_buf_desc_t *buf, unsigned credits, uint8
790790
m_sq_wqe_idx_to_prop[m_sq_wqe_hot_index] = sq_wqe_prop {
791791
.buf = buf,
792792
.credits = credits,
793+
.wqebbs = wqebbs,
793794
.ti = ti,
794795
.next = m_sq_wqe_prop_last,
795796
};
@@ -1345,26 +1346,6 @@ void hw_queue_tx::trigger_completion_for_all_sent_packets()
13451346
}
13461347
}
13471348

1348-
void hw_queue_tx::reset_inflight_zc_buffers_ctx(void *ctx)
1349-
{
1350-
sq_wqe_prop *p = m_sq_wqe_prop_last;
1351-
sq_wqe_prop *prev;
1352-
if (p) {
1353-
unsigned p_i = p - m_sq_wqe_idx_to_prop;
1354-
if (p_i == m_sq_wqe_prop_last_signalled) {
1355-
return;
1356-
}
1357-
do {
1358-
mem_buf_desc_t *desc = p->buf;
1359-
if (desc && desc->tx.zc.ctx == ctx) {
1360-
desc->tx.zc.ctx = nullptr;
1361-
}
1362-
prev = p;
1363-
p = p->next;
1364-
} while (p && is_sq_wqe_prop_valid(p, prev));
1365-
}
1366-
}
1367-
13681349
uint32_t hw_queue_tx::is_ratelimit_change(struct xlio_rate_limit_t &rate_limit)
13691350
{
13701351
uint32_t rl_changes = 0;

src/core/dev/hw_queue_tx.h

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ struct sq_wqe_prop {
4242
mem_buf_desc_t *buf;
4343
/* Number of credits (usually number of WQEBBs). */
4444
unsigned credits;
45+
/* Size of the WQE in WQEBBs. */
46+
uint8_t wqebbs;
4547
/* Transport interface (TIS/TIR) current WQE holds reference to. */
4648
xlio_ti *ti;
4749
struct sq_wqe_prop *next;
@@ -101,8 +103,6 @@ class hw_queue_tx : public xlio_ti_owner {
101103
void put_tls_dek(std::unique_ptr<dpcp::tls_dek> &&dek_obj);
102104
#endif
103105

104-
void reset_inflight_zc_buffers_ctx(void *ctx);
105-
106106
void credits_return(unsigned credits) { m_sq_free_credits += credits; }
107107

108108
bool credits_get(unsigned credits)
@@ -188,15 +188,6 @@ class hw_queue_tx : public xlio_ti_owner {
188188
}
189189
}
190190

191-
bool is_sq_wqe_prop_valid(sq_wqe_prop *p, sq_wqe_prop *prev)
192-
{
193-
unsigned p_i = p - m_sq_wqe_idx_to_prop;
194-
unsigned prev_i = prev - m_sq_wqe_idx_to_prop;
195-
return (p_i != m_sq_wqe_prop_last_signalled) &&
196-
((m_tx_num_wr + p_i - m_sq_wqe_prop_last_signalled) % m_tx_num_wr <
197-
(m_tx_num_wr + prev_i - m_sq_wqe_prop_last_signalled) % m_tx_num_wr);
198-
}
199-
200191
#if defined(DEFINED_UTLS)
201192
inline void tls_fill_static_params_wqe(struct mlx5_wqe_tls_static_params_seg *params,
202193
const struct xlio_tls_info *info, uint32_t key_id,
@@ -229,13 +220,24 @@ class hw_queue_tx : public xlio_ti_owner {
229220
sq_wqe_prop *m_sq_wqe_idx_to_prop = nullptr;
230221
sq_wqe_prop *m_sq_wqe_prop_last = nullptr;
231222

223+
/**
224+
* @brief Keeps track of the last Send Queue WQE (Work Queue Element) to be completed.
225+
*
226+
* This pointer is used to mark the last WQE in the chain that needs to be completed
227+
* by a completion event (CQE). For example, if WQEs are posted in order | WQE1 | WQE2 | WQE3 |,
228+
* m_last_sq_wqe_prop_to_complete will initially point to WQE1. When a CQE is received for WQE3,
229+
* the completion logic will process and complete WQE3, WQE2, WQE1 (from right to left) until
230+
* reaching m_last_sq_wqe_prop_to_complete. After that, m_last_sq_wqe_prop_to_complete is
231+
* updated to point to the index where the next WQE (e.g., WQE4) should be tracked.
232+
*/
233+
sq_wqe_prop *m_last_sq_wqe_prop_to_complete = nullptr;
234+
232235
struct mlx5_eth_wqe (*m_sq_wqes)[] = nullptr;
233236
struct mlx5_eth_wqe *m_sq_wqe_hot = nullptr;
234237
uint8_t *m_sq_wqes_end = nullptr;
235238

236239
const uint32_t m_n_sysvar_tx_num_wr_to_signal;
237240
uint32_t m_tx_num_wr;
238-
unsigned m_sq_wqe_prop_last_signalled = 0U;
239241
unsigned m_sq_free_credits = 0U;
240242
uint32_t m_n_unsignaled_count = 0U;
241243
int m_sq_wqe_hot_index = 0;

src/core/dev/ring.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,6 @@ class ring {
191191
NOT_IN_USE(first);
192192
}
193193

194-
virtual void reset_inflight_zc_buffers_ctx(ring_user_id_t id, void *ctx)
195-
{
196-
NOT_IN_USE(id);
197-
NOT_IN_USE(ctx);
198-
}
199-
200194
// TODO Add id argument for bonding
201195
virtual bool credits_get(unsigned credits)
202196
{

src/core/dev/ring_bond.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,6 @@ class ring_bond : public ring {
8080
virtual bool is_tso(void);
8181
void slave_create(int if_index);
8282

83-
void reset_inflight_zc_buffers_ctx(ring_user_id_t id, void *ctx)
84-
{
85-
m_xmit_rings[id]->reset_inflight_zc_buffers_ctx(id, ctx);
86-
}
8783
virtual uint64_t get_rx_cq_out_of_buffer_drop();
8884

8985
protected:

src/core/dev/ring_simple.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,6 @@ class ring_simple : public ring_slave {
223223
m_hqtx->post_dump_wqe(tis, addr, len, lkey, is_first);
224224
}
225225

226-
void reset_inflight_zc_buffers_ctx(ring_user_id_t id, void *ctx) override
227-
{
228-
std::lock_guard<decltype(m_lock_ring_tx)> lock(m_lock_ring_tx);
229-
NOT_IN_USE(id);
230-
m_hqtx->reset_inflight_zc_buffers_ctx(ctx);
231-
}
232-
233226
bool credits_get(unsigned credits) override
234227
{
235228
std::lock_guard<decltype(m_lock_ring_tx)> lock(m_lock_ring_tx);

src/core/proto/dst_entry.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,6 @@ class dst_entry : public cache_observer, public tostr {
104104
uint8_t get_tos() const { return m_tos; }
105105
uint8_t get_ttl_hop_limit() const { return m_ttl_hop_limit; }
106106
void set_external_vlan_tag(uint16_t vlan_tag) { m_external_vlan_tag = vlan_tag; }
107-
void reset_inflight_zc_buffers_ctx(void *ctx)
108-
{
109-
m_p_ring->reset_inflight_zc_buffers_ctx(m_id, ctx);
110-
}
111107

112108
inline bool is_the_same_ifname(const std::string &ifname)
113109
{

tests/gtest/Makefile.am

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ gtest_SOURCES = \
131131
xlio_ultra_api/xlio_socket_migrate.cc \
132132
xlio_ultra_api/xlio_socket_migrate_2.cc \
133133
xlio_ultra_api/xlio_socket_send_receive.cc \
134-
xlio_ultra_api/xlio_socket_send_receive_2.cc
134+
xlio_ultra_api/xlio_socket_send_receive_2.cc \
135+
xlio_ultra_api/xlio_socket_send_receive_full_sq.cc
135136

136137
noinst_HEADERS = \
137138
common/tap.h \
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
3+
* Copyright (c) 2021-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
4+
* SPDX-License-Identifier: GPL-2.0-only or BSD-2-Clause
5+
*/
6+
7+
#include "common/def.h"
8+
#include "common/log.h"
9+
#include "common/sys.h"
10+
#include "common/base.h"
11+
#include <infiniband/verbs.h>
12+
#include <pthread.h>
13+
#include <unistd.h>
14+
#include "core/xlio_base.h"
15+
16+
#if defined(EXTRA_API_ENABLED) && (EXTRA_API_ENABLED == 1)
17+
18+
static int connected_counter = 0;
19+
static int terminated_counter = 0;
20+
static int rx_cb_counter = 0;
21+
static unsigned int comp_cb_counter = 0U;
22+
static struct ibv_pd *pd = NULL;
23+
static struct ibv_mr *mr_buf;
24+
static char sndbuf[32];
25+
static std::vector<xlio_socket_t> accepted_sockets;
26+
static unsigned int init_msgs = 10; // Consequent 10 segments should take 4 wqebb
27+
28+
struct xlio_socket_send_attr send_attr = {
29+
.flags = 0,
30+
.mkey = 0U,
31+
.userdata_op = 0x1,
32+
};
33+
34+
class ultra_api_socket_send_receive_full_sq : public ultra_api_base {
35+
public:
36+
virtual void SetUp()
37+
{
38+
errno = EOK;
39+
// Reset static variables between test runs
40+
connected_counter = 0;
41+
terminated_counter = 0;
42+
rx_cb_counter = 0;
43+
comp_cb_counter = 0;
44+
pd = NULL;
45+
mr_buf = NULL;
46+
accepted_sockets.clear();
47+
memset(sndbuf, 'A', sizeof(sndbuf));
48+
};
49+
virtual void TearDown()
50+
{
51+
// Clean up memory registration if it exists (parent process only)
52+
if (mr_buf) {
53+
ibv_dereg_mr(mr_buf);
54+
mr_buf = NULL;
55+
}
56+
};
57+
void destroy_poll_group(xlio_poll_group_t group) { base_destroy_poll_group(group); }
58+
static void socket_event_cb(xlio_socket_t sock, uintptr_t userdata_sq, int event, int value)
59+
{
60+
UNREFERENCED_PARAMETER(userdata_sq);
61+
UNREFERENCED_PARAMETER(value);
62+
if (event == XLIO_SOCKET_EVENT_ESTABLISHED) {
63+
pd = xlio_api->xlio_socket_get_pd(sock);
64+
ASSERT_TRUE(pd != NULL);
65+
mr_buf = ibv_reg_mr(pd, sndbuf, sizeof(sndbuf), IBV_ACCESS_LOCAL_WRITE);
66+
ASSERT_TRUE(mr_buf != NULL);
67+
send_attr.mkey = mr_buf->lkey;
68+
69+
log_trace("Sending initial %u segments to form 4wqebb WQE\n", init_msgs);
70+
for (auto temp_msg = init_msgs; temp_msg; --temp_msg) {
71+
ASSERT_EQ(0, xlio_api->xlio_socket_send(sock, sndbuf, sizeof(sndbuf), &send_attr));
72+
}
73+
74+
xlio_api->xlio_socket_flush(sock);
75+
connected_counter++;
76+
} else if (event == XLIO_SOCKET_EVENT_CLOSED) {
77+
terminated_counter++;
78+
} else if (event == XLIO_SOCKET_EVENT_TERMINATED) {
79+
terminated_counter++;
80+
}
81+
}
82+
static void socket_comp_cb(xlio_socket_t sock, uintptr_t userdata_sq, uintptr_t userdata_op)
83+
{
84+
UNREFERENCED_PARAMETER(sock);
85+
UNREFERENCED_PARAMETER(userdata_sq);
86+
UNREFERENCED_PARAMETER(userdata_op);
87+
comp_cb_counter++;
88+
}
89+
90+
static void socket_rx_cb(xlio_socket_t sock, uintptr_t userdata_sq, void *data, size_t len,
91+
struct xlio_buf *buf)
92+
{
93+
UNREFERENCED_PARAMETER(sock);
94+
UNREFERENCED_PARAMETER(userdata_sq);
95+
UNREFERENCED_PARAMETER(data);
96+
UNREFERENCED_PARAMETER(len);
97+
rx_cb_counter++;
98+
xlio_api->xlio_socket_buf_free(sock, buf);
99+
}
100+
101+
static void socket_accept_cb(xlio_socket_t sock, xlio_socket_t parent_sock,
102+
uintptr_t parent_userdata)
103+
{
104+
UNREFERENCED_PARAMETER(sock);
105+
UNREFERENCED_PARAMETER(parent_sock);
106+
UNREFERENCED_PARAMETER(parent_userdata);
107+
int rc = xlio_api->xlio_socket_update(sock, 0, 0x1);
108+
ASSERT_EQ(rc, 0);
109+
accepted_sockets.push_back(sock);
110+
connected_counter++;
111+
}
112+
};
113+
114+
/**
115+
* @test ultra_api_socket_send_receive_full_sq.ti_1
116+
* @brief
117+
* Create TCP socket/connect/send(initiator)/receive(target)
118+
* @details
119+
*/
120+
TEST_F(ultra_api_socket_send_receive_full_sq, ti_1)
121+
{
122+
int rc;
123+
int pid = fork();
124+
ultra_api_base::SetUp();
125+
xlio_poll_group_t group;
126+
xlio_socket_t sock;
127+
128+
base_create_poll_group(&group, &socket_event_cb, &socket_comp_cb, &socket_rx_cb,
129+
&socket_accept_cb);
130+
xlio_socket_attr sattr = {
131+
.flags = 0,
132+
.domain = server_addr.addr.sa_family,
133+
.group = group,
134+
.userdata_sq = 0,
135+
};
136+
if (pid == 0) {
137+
base_create_socket(&sattr, &sock);
138+
139+
rc = xlio_api->xlio_socket_bind(sock, (struct sockaddr *)&server_addr, sizeof(server_addr));
140+
ASSERT_EQ(0, rc);
141+
142+
rc = xlio_api->xlio_socket_listen(sock);
143+
ASSERT_EQ(0, rc);
144+
145+
barrier_fork(pid, true);
146+
147+
while (connected_counter < 1 || rx_cb_counter < 1) {
148+
xlio_api->xlio_poll_group_poll(group);
149+
}
150+
151+
base_wait_for_delayed_acks(group);
152+
153+
barrier_fork(pid, false); // Wait for parent to receive last ack
154+
155+
base_destroy_socket(sock);
156+
base_cleanup_accepted_sockets(accepted_sockets);
157+
while (terminated_counter < 1) {
158+
xlio_api->xlio_poll_group_poll(group);
159+
}
160+
161+
if (mr_buf) {
162+
ibv_dereg_mr(mr_buf);
163+
mr_buf = NULL;
164+
}
165+
166+
destroy_poll_group(group);
167+
exit(testing::Test::HasFailure());
168+
} else {
169+
base_create_socket(&sattr, &sock);
170+
171+
rc = xlio_api->xlio_socket_bind(sock, (struct sockaddr *)&client_addr, sizeof(client_addr));
172+
ASSERT_EQ(0, rc);
173+
174+
barrier_fork(pid, true); // Wait for child to bind and listen
175+
176+
rc = xlio_api->xlio_socket_connect(sock, (struct sockaddr *)&server_addr,
177+
sizeof(server_addr));
178+
ASSERT_EQ(0, rc);
179+
180+
while (connected_counter < 1 || comp_cb_counter < init_msgs) {
181+
xlio_api->xlio_poll_group_poll(group);
182+
}
183+
184+
// At this point we should have a completion for the last WQE of 4 wqebb.
185+
// Fill SQ with WQEs of 2wqebb size to overwrite on the last signalled space with 2 WQEs.
186+
187+
unsigned int default_sq_size_wqebb = 32768;
188+
unsigned int num_wqe_to_send =
189+
default_sq_size_wqebb / 2; // Default Size of SQ (32768) / 2 (2wqebb per WQE)
190+
log_trace("Filling full SQ with %u WQEs of 2wqebb size\n", num_wqe_to_send);
191+
192+
while (num_wqe_to_send-- > 0) {
193+
ASSERT_EQ(0, xlio_api->xlio_socket_send(sock, sndbuf, sizeof(sndbuf), &send_attr));
194+
xlio_api->xlio_socket_flush(sock);
195+
}
196+
197+
log_trace("Waiting for completions\n");
198+
while (comp_cb_counter <
199+
4096) { // More than 2 completions is enough to overpass last signalled.
200+
xlio_api->xlio_poll_group_poll(group);
201+
}
202+
203+
log_trace("Completions received\n");
204+
205+
base_wait_for_delayed_acks(group);
206+
207+
barrier_fork(pid, false);
208+
209+
base_destroy_socket(sock);
210+
while (terminated_counter < 1) {
211+
xlio_api->xlio_poll_group_poll(group);
212+
}
213+
214+
destroy_poll_group(group);
215+
216+
wait_fork(pid);
217+
}
218+
}
219+
220+
#endif /* EXTRA_API_ENABLED */

0 commit comments

Comments
 (0)