Skip to content

Commit 0ed80ba

Browse files
Merge pull request #83 from kevinGC/rx-zerocopy
TCP RX zerocopy
2 parents 4560588 + 46dda29 commit 0ed80ba

File tree

16 files changed

+169
-29
lines changed

16 files changed

+169
-29
lines changed

check_all_options.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ void check_options_tcp(struct options *opts, struct callbacks *cb)
6868
"Source ports need to be in the range of 1..0xFFFF. "
6969
"Best larger than 1024.");
7070
}
71+
CHECK(cb, !(opts->rx_zerocopy && opts->skip_rx_copy),
72+
"TCP RX zerocopy is not supported in conjunction RX copy skip.");
7173
}
7274

7375
void check_options_udp(struct options *opts, struct callbacks *cb)

common.c

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,18 @@
1414
* limitations under the License.
1515
*/
1616

17-
#include <ctype.h>
1817
#include <fcntl.h>
1918
#include <netinet/tcp.h>
19+
#include <stddef.h>
20+
#include <stdio.h>
21+
#include <stdlib.h>
22+
#include <string.h>
23+
#include <sys/mman.h>
2024

2125
#include "common.h"
22-
#include "hexdump.h"
23-
#include "parse.h"
26+
#include "lib.h"
27+
#include "logging.h"
28+
#include "or_die.h"
2429

2530
#define kilo (1000)
2631
#define kibi (1024)
@@ -187,7 +192,7 @@ void set_reuseaddr(int fd, int on, struct callbacks *cb)
187192
PLOG_ERROR(cb, "setsockopt(SO_REUSEADDR)");
188193
}
189194

190-
void set_zerocopy(int fd, int on, struct callbacks *cb)
195+
void set_tx_zerocopy(int fd, int on, struct callbacks *cb)
191196
{
192197
#ifndef SO_ZEROCOPY
193198
#define SO_ZEROCOPY 60

common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ struct addrinfo **parse_local_hosts(const struct options *opts, int n,
152152
void set_reuseport(int fd, struct callbacks *cb);
153153
void set_nonblocking(int fd, struct callbacks *cb);
154154
void set_reuseaddr(int fd, int on, struct callbacks *cb);
155-
void set_zerocopy(int fd, int on, struct callbacks *cb);
155+
void set_tx_zerocopy(int fd, int on, struct callbacks *cb);
156156
void set_freebind(int fd, struct callbacks *cb);
157157
void set_debug(int fd, int onoff, struct callbacks *cb);
158158
void set_mark(int fd, int mark, struct callbacks *cb);

define_all_flags.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ struct flags_parser *add_flags_stream(struct flags_parser *fp)
106106
DEFINE_FLAG(fp, int, test_length, 10, 'l', "Test length in seconds");
107107
DEFINE_FLAG(fp, bool, edge_trigger, false, 'E', "Edge-triggered epoll");
108108
DEFINE_FLAG(fp, bool, reuseaddr, false, 'R', "Use SO_REUSEADDR on sockets");
109-
DEFINE_FLAG(fp, bool, zerocopy, false, 'Z', "Set MSG_ZEROCOPY when sending");
109+
DEFINE_FLAG_NAMED(fp, bool, tx_zerocopy, false, "zerocopy", 'Z', "Set MSG_ZEROCOPY when sending");
110110
DEFINE_FLAG(fp, const struct rate_conversion *, throughput_opt, neper_units_mb_pointer_hack, 0, "Units to display for throughput");
111111
DEFINE_FLAG_PARSER(fp, throughput_opt, parse_unit);
112112
DEFINE_FLAG_PRINTER(fp, throughput_opt, print_unit);
@@ -143,6 +143,7 @@ struct flags_parser *add_flags_tcp_stream(struct flags_parser *fp)
143143
DEFINE_FLAG(fp, unsigned long, delay, 0, 'D', "Nanosecond delay between each send()/write()");
144144
DEFINE_FLAG(fp, int, buffer_size, 16384, 'B', "Number of bytes that each read/write uses as the buffer");
145145
DEFINE_FLAG(fp, bool, skip_rx_copy, false, 0, "Skip kernel->user payload copy on receives");
146+
DEFINE_FLAG(fp, bool, rx_zerocopy, false, 'z', "Use TCP RX zerocopy");
146147
DEFINE_FLAG(fp, bool, enable_read, false, 'r', "Read from flows? enabled by default for the server");
147148
DEFINE_FLAG(fp, bool, enable_write, false, 'w', "Write to flows? Enabled by default for the client");
148149
DEFINE_FLAG(fp, bool, split_bidir , false, 0, "Bidirectional using separate tx/rx sockets");

flags.h

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,30 @@ void flags_parser_run(struct flags_parser *fp, int argc, char **argv);
3939
void flags_parser_dump(struct flags_parser *fp);
4040
void flags_parser_destroy(struct flags_parser *fp);
4141

42-
#define DEFINE_FLAG(FP, TYPE, VAR, DEFAULT_VALUE, SHORT_NAME, USAGE) \
43-
do { \
44-
TYPE default_value = DEFAULT_VALUE; \
45-
struct options *opts = flags_parser_opts(FP); \
46-
opts->VAR = default_value; \
47-
flags_parser_add(FP, SHORT_NAME, #VAR, \
48-
USAGE " (default " #DEFAULT_VALUE ")", #TYPE, \
49-
&opts->VAR); \
50-
} while (0)
42+
/* You probably want DEFINE_FLAG or DEFINE_FLAG_NAMED instead of this. This
43+
* macro takes extra arguments -- separating TYPE and TYPESTR -- because the
44+
* preprocessor will convert a TYPE of "bool" too "_Bool" *except* in the
45+
* specific case where the macro contains #TYPE. This was not fun to debug.
46+
*/
47+
#define DEFINE_FLAG_NAMED_TYPED(FP, TYPE, TYPESTR, VAR, DEFAULT_VALUE, NAME, \
48+
SHORT_NAME, USE) \
49+
do { \
50+
TYPE default_value = DEFAULT_VALUE; \
51+
struct options *opts = flags_parser_opts(FP); \
52+
opts->VAR = default_value; \
53+
flags_parser_add(FP, SHORT_NAME, NAME, \
54+
USE " (default " #DEFAULT_VALUE ")", TYPESTR, \
55+
&opts->VAR); \
56+
} while (0);
57+
58+
/* DEFINE_FLAG_NAMED is like DEFINE_FLAG, but uses a custom flag name. */
59+
#define DEFINE_FLAG_NAMED(FP, TYPE, VAR, DEFAULT_VALUE, NAME, SHORT_NAME, USE) \
60+
DEFINE_FLAG_NAMED_TYPED(FP, TYPE, #TYPE, VAR, DEFAULT_VALUE, NAME, \
61+
SHORT_NAME, USE)
62+
63+
#define DEFINE_FLAG(FP, TYPE, VAR, DEFAULT_VALUE, SHORT_NAME, USAGE) \
64+
DEFINE_FLAG_NAMED_TYPED(FP, TYPE, #TYPE, VAR, DEFAULT_VALUE, #VAR, \
65+
SHORT_NAME, USAGE)
5166

5267
#define DEFINE_FLAG_PARSER(FP, VAR, PARSER) do { \
5368
struct options *opts = flags_parser_opts(FP); \

flow.c

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,19 @@
1414
* limitations under the License.
1515
*/
1616

17+
#include <linux/tcp.h>
1718
#include <stdint.h>
19+
#include <stdlib.h>
20+
#include <sys/mman.h>
1821
#include <time.h>
22+
#include <unistd.h>
1923

2024
#include "common.h"
2125
#include "flow.h"
26+
#include "logging.h"
2227
#include "socket.h"
23-
#include "thread.h"
2428
#include "stats.h"
29+
#include "thread.h"
2530

2631
/*
2732
* We define the flow struct locally to this file to force outside users to go
@@ -41,6 +46,10 @@ struct flow {
4146
uint32_t f_events; /* pending epoll events */
4247

4348
struct neper_stat *f_stat;
49+
50+
/* TCP RX zerocopy state. */
51+
void *f_rx_zerocopy_buffer;
52+
size_t f_rx_zerocopy_buffer_sz;
4453
};
4554

4655
int flow_fd(const struct flow *f)
@@ -120,7 +129,24 @@ void flow_reconnect(struct flow *f, flow_handler fh, uint32_t events)
120129
flow_ctl(f, EPOLL_CTL_ADD, fh, events, true);
121130
}
122131

123-
void flow_create(const struct flow_create_args *args)
132+
void flow_init_rx_zerocopy(struct flow *f, int buffer_size, struct callbacks *cb)
133+
{
134+
// Use RCVLOWAT to reduce syscall overhead.
135+
int rcvlowat = buffer_size;
136+
if (setsockopt(f->f_fd, SOL_SOCKET, SO_RCVLOWAT, &rcvlowat,
137+
sizeof(rcvlowat)) == -1)
138+
PLOG_FATAL(cb, "setsockopt(SO_RCVLOWAT)");
139+
140+
// Zerocopy requires mmap'd pages. Each flow has its own pages.
141+
f->f_rx_zerocopy_buffer = mmap(NULL, buffer_size, PROT_READ,
142+
MAP_SHARED, f->f_fd, 0);
143+
if (f->f_rx_zerocopy_buffer == (void *)-1)
144+
PLOG_FATAL(cb, "failed to map RX zerocopy buffer");
145+
146+
f->f_rx_zerocopy_buffer_sz = buffer_size;
147+
}
148+
149+
struct flow *flow_create(const struct flow_create_args *args)
124150
{
125151
struct thread *t = args->thread;
126152
struct flow *f = calloc_or_die(1, sizeof(struct flow), t->cb);
@@ -164,6 +190,7 @@ void flow_create(const struct flow_create_args *args)
164190
events &= (f->f_id & 1) ? EPOLLOUT : EPOLLIN;
165191

166192
flow_ctl(f, EPOLL_CTL_ADD, args->handler, events, true);
193+
return f;
167194
}
168195

169196
/* Returns true if the deadline for the flow has expired.
@@ -292,10 +319,58 @@ void flow_delete(struct flow *f)
292319
*/
293320
if (f->f_mbuf != f->f_thread->f_mbuf)
294321
free(f->f_mbuf);
322+
323+
/* Cleanup TCP RX zerocopy. */
324+
if (f->f_rx_zerocopy_buffer)
325+
munmap(f->f_rx_zerocopy_buffer, f->f_rx_zerocopy_buffer_sz);
326+
295327
free(f);
296328
}
297329

298330
void flow_update_next_event(struct flow *f, uint64_t duration)
299331
{
300332
f->f_next_event += duration;
301333
}
334+
335+
ssize_t flow_recv_zerocopy(struct flow *f, void *copybuf, size_t copybuf_len) {
336+
struct tcp_zerocopy_receive zc = {0};
337+
socklen_t zc_len = sizeof(zc);
338+
int result;
339+
340+
/* Setup both the mmap address and extra buffer for bytes that aren't
341+
* zerocopy-able.
342+
*/
343+
zc.address = (__u64)f->f_rx_zerocopy_buffer;
344+
zc.length = copybuf_len; /* Same size used as zerocopy buffer. */
345+
346+
/* The kernel will effectively use copybuf_len as a hint as to what the
347+
* cutoff point between zerocopy and recv is. So passing a large copybuf
348+
* causes less zerocopy. Thus we pass just under a page to maximize
349+
* zerocopying.
350+
*/
351+
zc.copybuf_address = (__u64)copybuf;
352+
zc.copybuf_len = copybuf_len < 4096 ? copybuf_len : 4095;
353+
354+
result = getsockopt(f->f_fd, IPPROTO_TCP, TCP_ZEROCOPY_RECEIVE, &zc,
355+
&zc_len);
356+
if (result == -1)
357+
return result;
358+
359+
/* Handle overflow data, i.e. bytes that couldn't be zerocopied. */
360+
if (zc.recv_skip_hint) {
361+
int read_len = zc.recv_skip_hint < copybuf_len ?
362+
zc.recv_skip_hint : copybuf_len;
363+
result = read(f->f_fd, copybuf, read_len);
364+
if (result < 0)
365+
PLOG_FATAL(f->f_thread->cb, "failed to read extra "
366+
"bytes");
367+
}
368+
369+
/* Handle zerocopy data. */
370+
if (zc.length) {
371+
flow_thread(f)->io_stats.rx_zc_bytes += zc.length;
372+
madvise(f->f_rx_zerocopy_buffer, zc.length, MADV_DONTNEED);
373+
}
374+
375+
return zc.recv_skip_hint + zc.length + zc.copybuf_len;
376+
}

flow.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,15 @@ struct flow_create_args {
5656
struct neper_stat *(*stat)(struct flow *); /* stats callback */
5757
};
5858

59-
void flow_create(const struct flow_create_args *);
59+
struct flow *flow_create(const struct flow_create_args *);
6060
void flow_delete(struct flow *);
6161
/* Adds duration to f's next event time. */
6262
void flow_update_next_event(struct flow *f, uint64_t duration);
6363

64+
/* Initialize RX zerocopy state for a flow. */
65+
void flow_init_rx_zerocopy(struct flow *f, int buffer_size,
66+
struct callbacks *cb);
67+
/* Perform a zerocopy receive. */
68+
ssize_t flow_recv_zerocopy(struct flow *f, void *copybuf, size_t copybuf_len);
69+
6470
#endif

lib.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ struct options {
9292
bool freebind;
9393
bool tcp_fastopen;
9494
bool skip_rx_copy;
95-
bool zerocopy;
95+
bool tx_zerocopy;
96+
bool rx_zerocopy;
9697
bool time_wait;
9798
double interval;
9899
long long max_pacing_rate;

psp_stream_main.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ int main(int argc, char **argv)
6161

6262
if (opts.skip_rx_copy)
6363
opts.recv_flags = MSG_TRUNC;
64-
if (opts.zerocopy)
64+
if (opts.tx_zerocopy)
6565
opts.send_flags = MSG_ZEROCOPY;
6666

6767
flags_parser_dump(fp);

rr.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ static ssize_t rr_fn_recv(struct flow *f, char *buf, size_t len)
101101
struct thread *t = flow_thread(f);
102102
ssize_t ret;
103103

104-
ret = recv(flow_fd(f), buf, len, 0);
104+
if (t->opts->rx_zerocopy) {
105+
ret = flow_recv_zerocopy(f, buf, len);
106+
} else {
107+
ret = recv(flow_fd(f), buf, len, 0);
108+
}
105109
t->io_stats.rx_ops++;
106110
t->io_stats.rx_bytes += ret > 0 ? ret : 0;
107111
return ret;

0 commit comments

Comments
 (0)