Skip to content

Commit 14f0075

Browse files
committed
issue: 1792164 Add MSG_ZEROCOPY processing
These changes make workable MSG_ZEROCOPY send flow including notification mechanizm. It is needed to notify the process when it is safe to reuse a previously passed buffer. It queues completion notifications on the socket error queue. But copy avoidance internally is not done. So all data is copied in internal buffers as without MSG_ZEROCOPY. Full zcopy support will be implemented later. Signed-off-by: Igor Ivanov <[email protected]>
1 parent 7aa343d commit 14f0075

File tree

6 files changed

+105
-4
lines changed

6 files changed

+105
-4
lines changed

src/vma/dev/buffer_pool.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@
4141

4242
inline static void free_lwip_pbuf(struct pbuf_custom *pbuf_custom)
4343
{
44+
mem_buf_desc_t* p_desc = (mem_buf_desc_t *)pbuf_custom;
45+
46+
if (p_desc->m_flags & mem_buf_desc_t::ZCOPY) {
47+
p_desc->tx.zc.callback(p_desc);
48+
}
49+
pbuf_custom->pbuf.type = 0;
4450
pbuf_custom->pbuf.flags = 0;
4551
pbuf_custom->pbuf.ref = 0;
4652
}

src/vma/lwip/pbuf.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ pbuf_header(struct pbuf *p, s16_t header_size_increment)
200200
return 1;
201201
/* AlexV: we need to check that the header EXPANTION is legal for PBUF_REF & PBUF_ROM pbufs! */
202202
p->payload = (u8_t *)p->payload - header_size_increment;
203+
} else if (type == PBUF_ZEROCOPY) {
204+
/* temporary do the same as for PBUF_RAM until zcopy support is not ready */
205+
p->payload = (u8_t *)p->payload - header_size_increment;
203206
} else {
204207
/* Unknown type */
205208
LWIP_ASSERT("bad pbuf type", 0);

src/vma/lwip/tcp_out.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags)
440440
struct iovec piov[piov_max_size];
441441
int piov_cur_index = 0;
442442
int piov_cur_len = 0;
443-
pbuf_type type = PBUF_RAM;
443+
pbuf_type type = (apiflags & TCP_WRITE_ZEROCOPY ? PBUF_ZEROCOPY : PBUF_RAM);
444444

445445
int byte_queued = pcb->snd_nxt - pcb->lastack;
446446
if ( len < pcb->mss && !(apiflags & TCP_WRITE_DUMMY))

src/vma/proto/dst_entry_tcp.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ void dst_entry_tcp::put_buffer(mem_buf_desc_t * p_desc)
463463

464464
if (p_desc->lwip_pbuf.pbuf.ref == 0) {
465465
p_desc->p_next_desc = NULL;
466-
g_buffer_pool_tx->put_buffers_thread_safe(p_desc);
466+
buffer_pool::free_tx_lwip_pbuf_custom(&p_desc->lwip_pbuf.pbuf);
467467
}
468468
}
469469
}

src/vma/sock/sockinfo_tcp.cpp

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,10 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg)
824824
}
825825
#endif
826826

827+
if ((__flags & MSG_ZEROCOPY) && (m_b_zc)) {
828+
apiflags |= VMA_TX_PACKET_ZEROCOPY;
829+
}
830+
827831
for (int i = 0; i < sz_iov; i++) {
828832
si_tcp_logfunc("iov:%d base=%p len=%d", i, p_iov[i].iov_base, p_iov[i].iov_len);
829833

@@ -948,6 +952,14 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg)
948952
m_p_socket_stats->n_tx_ready_byte_count += total_tx;
949953
}
950954

955+
/* Each send call with MSG_ZEROCOPY that successfully sends
956+
* data increments the counter.
957+
* The counter is not incremented on failure or if called with length zero.
958+
*/
959+
if ((apiflags & VMA_TX_PACKET_ZEROCOPY) && (total_tx > 0)) {
960+
atomic_fetch_and_inc(&m_zckey);
961+
}
962+
951963
unlock_tcp_con();
952964

953965
#ifdef VMA_TIME_MEASURE
@@ -4540,9 +4552,11 @@ struct pbuf * sockinfo_tcp::tcp_tx_pbuf_alloc(void* p_conn, pbuf_type type)
45404552
dst_entry_tcp *p_dst = (dst_entry_tcp *)(p_si_tcp->m_p_connected_dst_entry);
45414553
mem_buf_desc_t* p_desc = NULL;
45424554

4543-
NOT_IN_USE(type);
45444555
if (likely(p_dst)) {
45454556
p_desc = p_dst->get_buffer();
4557+
if (p_desc && (type == PBUF_ZEROCOPY)) {
4558+
p_desc = p_si_tcp->tcp_tx_zc_alloc(p_desc);
4559+
}
45464560
}
45474561
return (struct pbuf *)p_desc;
45484562
}
@@ -4565,11 +4579,86 @@ void sockinfo_tcp::tcp_tx_pbuf_free(void* p_conn, struct pbuf *p_buff)
45654579

45664580
if (p_desc->lwip_pbuf.pbuf.ref == 0) {
45674581
p_desc->p_next_desc = NULL;
4568-
g_buffer_pool_tx->put_buffers_thread_safe(p_desc);
4582+
buffer_pool::free_tx_lwip_pbuf_custom(p_buff);
45694583
}
45704584
}
45714585
}
45724586

4587+
mem_buf_desc_t* sockinfo_tcp::tcp_tx_zc_alloc(mem_buf_desc_t* p_desc)
4588+
{
4589+
p_desc->m_flags |= mem_buf_desc_t::ZCOPY;
4590+
p_desc->tx.zc.id = atomic_read(&m_zckey);
4591+
p_desc->tx.zc.count = 1;
4592+
p_desc->tx.zc.len = p_desc->lwip_pbuf.pbuf.len;
4593+
p_desc->tx.zc.ctx = (void *)this;
4594+
p_desc->tx.zc.callback = tcp_tx_zc_callback;
4595+
4596+
return p_desc;
4597+
}
4598+
4599+
void sockinfo_tcp::tcp_tx_zc_callback(mem_buf_desc_t* p_desc)
4600+
{
4601+
uint32_t lo, hi;
4602+
uint16_t count;
4603+
uint32_t prev_lo, prev_hi;
4604+
mem_buf_desc_t* err_queue = NULL;
4605+
sockinfo_tcp* sock = NULL;
4606+
4607+
if (!p_desc || !p_desc->tx.zc.ctx) {
4608+
return;
4609+
}
4610+
4611+
sock = (sockinfo_tcp *)p_desc->tx.zc.ctx;
4612+
4613+
if (sock->m_state != SOCKINFO_OPENED) {
4614+
return;
4615+
}
4616+
4617+
count = p_desc->tx.zc.count;
4618+
lo = p_desc->tx.zc.id;
4619+
hi = lo + count - 1;
4620+
memset(&p_desc->ee, 0, sizeof(p_desc->ee));
4621+
p_desc->ee.ee_errno = 0;
4622+
p_desc->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY;
4623+
p_desc->ee.ee_data = hi;
4624+
p_desc->ee.ee_info = lo;
4625+
// p_desc->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
4626+
4627+
/* Update last error queue element in case it has the same type */
4628+
err_queue = sock->m_error_queue.back();
4629+
if (err_queue &&
4630+
(err_queue->ee.ee_origin == p_desc->ee.ee_origin) &&
4631+
(err_queue->ee.ee_code == p_desc->ee.ee_code)) {
4632+
uint64_t sum_count = 0;
4633+
4634+
prev_hi = err_queue->ee.ee_data;
4635+
prev_lo = err_queue->ee.ee_info;
4636+
sum_count = prev_hi - prev_lo + 1ULL + count;
4637+
4638+
if (lo == prev_lo) {
4639+
err_queue->ee.ee_data = hi;
4640+
} else if ((sum_count >= (1ULL << 32)) || (lo != prev_hi + 1)) {
4641+
err_queue = NULL;
4642+
} else {
4643+
err_queue->ee.ee_data += count;
4644+
}
4645+
}
4646+
4647+
/* Add information into error queue element */
4648+
if (!err_queue) {
4649+
err_queue = p_desc->clone();
4650+
sock->m_error_queue.push_back(err_queue);
4651+
}
4652+
4653+
/* Clean up */
4654+
p_desc->m_flags &= ~mem_buf_desc_t::ZCOPY;
4655+
memset(&p_desc->tx.zc, 0, sizeof(p_desc->tx.zc));
4656+
4657+
/* Signal events on socket */
4658+
NOTIFY_ON_EVENTS(sock, EPOLLERR);
4659+
sock->do_wakeup();
4660+
}
4661+
45734662
struct tcp_seg * sockinfo_tcp::tcp_seg_alloc(void* p_conn)
45744663
{
45754664
sockinfo_tcp *p_si_tcp = (sockinfo_tcp *)(((struct tcp_pcb*)p_conn)->my_container);

src/vma/sock/sockinfo_tcp.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ class sockinfo_tcp : public sockinfo, public timer_handler
185185
static struct tcp_seg * tcp_seg_alloc(void* p_conn);
186186
static void tcp_seg_free(void* p_conn, struct tcp_seg * seg);
187187

188+
mem_buf_desc_t* tcp_tx_zc_alloc(mem_buf_desc_t* p_desc);
189+
static void tcp_tx_zc_callback(mem_buf_desc_t* p_desc);
190+
188191
bool inline is_readable(uint64_t *p_poll_sn, fd_array_t *p_fd_array = NULL);
189192
bool inline is_writeable();
190193
bool inline is_errorable(int *errors);

0 commit comments

Comments
 (0)