Skip to content

Commit 89f5037

Browse files
committed
aio: send/recv/sendmsg/recvmsg add flags
1 parent 28fb4e2 commit 89f5037

File tree

5 files changed

+71
-9
lines changed

5 files changed

+71
-9
lines changed

src/aio/Windows.zig

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
326326
const state = self.uringlator.ops.getOnePtr(.state, id);
327327
const ovl = self.uringlator.ops.getOnePtr(.ovl, id);
328328
self.iocp.associateSocket(id, state.recv.socket) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
329-
switch (wposix.recvEx(state.recv.socket, &win_state.wsabuf, 0, &ovl.overlapped) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe)) {
329+
switch (wposix.recvEx(state.recv.socket, &win_state.wsabuf, state.recv.flags.toInt(), &ovl.overlapped) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe)) {
330330
.pending => {},
331331
.transmitted => |bytes| {
332332
ovl.res = bytes;
@@ -339,7 +339,7 @@ pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
339339
const state = self.uringlator.ops.getOnePtr(.state, id);
340340
const ovl = self.uringlator.ops.getOnePtr(.ovl, id);
341341
self.iocp.associateSocket(id, state.send.socket) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
342-
switch (wposix.sendEx(state.send.socket, &win_state.wsabuf, 0, &ovl.overlapped) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe)) {
342+
switch (wposix.sendEx(state.send.socket, &win_state.wsabuf, state.send.flags.toInt(), &ovl.overlapped) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe)) {
343343
.pending => {},
344344
.transmitted => |bytes| {
345345
ovl.res = bytes;
@@ -351,7 +351,7 @@ pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
351351
const state = self.uringlator.ops.getOnePtr(.state, id);
352352
const ovl = self.uringlator.ops.getOnePtr(.ovl, id);
353353
self.iocp.associateSocket(id, state.recv_msg.socket) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
354-
switch (wposix.recvmsgEx(state.recv_msg.socket, state.recv_msg.out_msg, 0, &ovl.overlapped) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe)) {
354+
switch (wposix.recvmsgEx(state.recv_msg.socket, state.recv_msg.out_msg, state.recv_msg.flags.toInt(), &ovl.overlapped) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe)) {
355355
.pending => {},
356356
.transmitted => |bytes| {
357357
ovl.res = bytes;
@@ -363,7 +363,7 @@ pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
363363
const state = self.uringlator.ops.getOnePtr(.state, id);
364364
const ovl = self.uringlator.ops.getOnePtr(.ovl, id);
365365
self.iocp.associateSocket(id, state.send_msg.socket) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
366-
switch (wposix.sendmsgEx(state.send_msg.socket, @constCast(state.send_msg.msg), 0, &ovl.overlapped) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe)) {
366+
switch (wposix.sendmsgEx(state.send_msg.socket, @constCast(state.send_msg.msg), state.send_msg.flags.toInt(), &ovl.overlapped) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe)) {
367367
.pending => {},
368368
.transmitted => |bytes| {
369369
ovl.res = bytes;

src/aio/ops.zig

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,28 @@ pub const Connect = struct {
152152
/// std.posix.recv
153153
pub const Recv = struct {
154154
pub const Error = std.posix.RecvFromError || SharedError;
155+
156+
pub const Flags = packed struct {
157+
cmsg_cloexec: bool = true,
158+
err_queue: bool = false,
159+
oob: bool = false,
160+
peek: bool = false,
161+
trunc: bool = false,
162+
163+
pub fn toInt(self: @This()) u32 {
164+
var flags: u32 = 0;
165+
if (self.cmsg_cloexec and @hasDecl(posix.MSG, "CMSG_CLOEXEC")) flags |= posix.MSG.CMSG_CLOEXEC;
166+
if (self.err_queue and @hasDecl(posix.MSG, "ERRQUEUE")) flags |= posix.MSG.ERRQUEUE;
167+
if (self.oob and @hasDecl(posix.MSG, "OOB")) flags |= posix.MSG.OOB;
168+
if (self.peek and @hasDecl(posix.MSG, "PEEK")) flags |= posix.MSG.PEEK;
169+
if (self.trunc and @hasDecl(posix.MSG, "TRUNC")) flags |= posix.MSG.TRUNC;
170+
return flags;
171+
}
172+
};
173+
155174
socket: std.posix.socket_t,
156175
buffer: []u8,
176+
flags: Flags = .{},
157177
out_read: *usize,
158178
out_id: ?*Id = null,
159179
out_error: ?*Error = null,
@@ -163,8 +183,32 @@ pub const Recv = struct {
163183
/// std.posix.send
164184
pub const Send = struct {
165185
pub const Error = std.posix.SendError || SharedError;
186+
187+
pub const Flags = packed struct {
188+
confirm: bool = false,
189+
dont_route: bool = false,
190+
eor: bool = false,
191+
more: bool = false,
192+
oob: bool = false,
193+
fast_open: bool = false,
194+
zero_copy: bool = false,
195+
196+
pub fn toInt(self: @This()) u32 {
197+
var flags: u32 = 0;
198+
if (self.confirm and @hasDecl(posix.MSG, "CONFIRM")) flags |= posix.MSG.CONFIRM;
199+
if (self.dont_route and @hasDecl(posix.MSG, "DONTROUTE")) flags |= posix.MSG.DONTROUTE;
200+
if (self.eor and @hasDecl(posix.MSG, "EOR")) flags |= posix.MSG.EOR;
201+
if (self.more and @hasDecl(posix.MSG, "MORE")) flags |= posix.MSG.MORE;
202+
if (self.oob and @hasDecl(posix.MSG, "OOB")) flags |= posix.MSG.OOB;
203+
if (self.fast_open and @hasDecl(posix.MSG, "FASTOPEN")) flags |= posix.MSG.FASTOPEN;
204+
if (self.zero_copy and @hasDecl(posix.MSG, "ZEROCOPY")) flags |= posix.MSG.ZEROCOPY;
205+
return flags;
206+
}
207+
};
208+
166209
socket: std.posix.socket_t,
167210
buffer: []const u8,
211+
flags: Flags = .{},
168212
out_written: ?*usize = null,
169213
out_id: ?*Id = null,
170214
out_error: ?*Error = null,
@@ -182,6 +226,7 @@ pub const RecvMsg = struct {
182226
} || SharedError;
183227
socket: std.posix.socket_t,
184228
out_msg: *posix.msghdr,
229+
flags: Recv.Flags = .{},
185230
out_read: *usize,
186231
out_id: ?*Id = null,
187232
out_error: ?*Error = null,
@@ -193,6 +238,7 @@ pub const SendMsg = struct {
193238
pub const Error = std.posix.SendMsgError || SharedError;
194239
socket: std.posix.socket_t,
195240
msg: *const posix.msghdr_const,
241+
flags: Send.Flags = .{},
196242
out_written: ?*usize = null,
197243
out_id: ?*Id = null,
198244
out_error: ?*Error = null,

src/aio/posix/posix.zig

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -243,14 +243,14 @@ pub fn perform(comptime op_type: Operation, op: Operation.map.getAssertContains(
243243
},
244244
.accept => op.out_socket.* = try accept(op.socket, op.out_addr, op.inout_addrlen, 0),
245245
.connect => _ = try connect(op.socket, op.addr, op.addrlen),
246-
.recv => op.out_read.* = try recv(op.socket, op.buffer, MSG.DONTWAIT),
246+
.recv => op.out_read.* = try recv(op.socket, op.buffer, MSG.DONTWAIT | op.flags.toInt()),
247247
.send => {
248-
const written = try send(op.socket, op.buffer, MSG.DONTWAIT | MSG.NOSIGNAL);
248+
const written = try send(op.socket, op.buffer, MSG.DONTWAIT | MSG.NOSIGNAL | op.flags.toInt());
249249
if (op.out_written) |w| w.* = written;
250250
},
251-
.recv_msg => op.out_read.* = try recvmsg(op.socket, op.out_msg, MSG.DONTWAIT),
251+
.recv_msg => op.out_read.* = try recvmsg(op.socket, op.out_msg, MSG.DONTWAIT | op.flags.toInt()),
252252
.send_msg => {
253-
const written = try sendmsg(op.socket, op.msg, MSG.DONTWAIT | MSG.NOSIGNAL);
253+
const written = try sendmsg(op.socket, op.msg, MSG.DONTWAIT | MSG.NOSIGNAL | op.flags.toInt());
254254
if (op.out_written) |w| w.* = written;
255255
},
256256
.shutdown => try shutdown(op.socket, op.how),
@@ -309,7 +309,8 @@ pub const MSG = switch (builtin.target.os.tag) {
309309
pub const DONTWAIT = 0x00080;
310310
pub const NOSIGNAL = 0x20000;
311311
},
312-
.windows, .wasi => struct {
312+
.windows => windows.MSG,
313+
.wasi => struct {
313314
pub const DONTWAIT = 0x0;
314315
pub const NOSIGNAL = 0x0;
315316
},

src/aio/posix/windows.zig

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,17 @@ pub fn recvEx(sockfd: std.posix.socket_t, buf: [*]win_sock.WSABUF, flags: u32, o
262262
return .{ .transmitted = @intCast(read) };
263263
}
264264

265+
pub const MSG = struct {
266+
pub const DONTWAIT = 0x0;
267+
pub const NOSIGNAL = 0x0;
268+
pub const PEEK: u32 = @bitCast(win_sock.MSG_PEEK);
269+
pub const PARTIAL: u32 = @bitCast(win_sock.MSG_PARTIAL);
270+
pub const DONTROUTE: u32 = @bitCast(win_sock.MSG_DONTROUTE);
271+
pub const OOB: u32 = @bitCast(win_sock.MSG_OOB);
272+
pub const PUSH_IMMEDIATE: u32 = @bitCast(win_sock.MSG_PUSH_IMMEDIATE);
273+
pub const WAITALL: u32 = @bitCast(win_sock.WAITALL);
274+
};
275+
265276
pub const iovec = extern struct {
266277
len: u32,
267278
base: [*]u8,

src/aio/uringlator.zig

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,22 @@ const UringlatorOperation = struct {
4747
recv: struct {
4848
socket: std.posix.socket_t,
4949
buffer: []u8,
50+
flags: aio.Recv.Flags,
5051
},
5152
send: struct {
5253
socket: std.posix.socket_t,
5354
buffer: []const u8,
55+
flags: aio.Send.Flags,
5456
},
5557
recv_msg: struct {
5658
socket: std.posix.socket_t,
5759
out_msg: *posix.msghdr,
60+
flags: aio.Recv.Flags,
5861
},
5962
send_msg: struct {
6063
socket: std.posix.socket_t,
6164
msg: *const posix.msghdr_const,
65+
flags: aio.Send.Flags,
6266
},
6367
shutdown: struct {
6468
socket: std.posix.socket_t,

0 commit comments

Comments
 (0)