Skip to content

Commit b900fd7

Browse files
committed
aio: add readv/writev ops
1 parent 5e6d1c9 commit b900fd7

File tree

8 files changed

+275
-35
lines changed

8 files changed

+275
-35
lines changed

src/aio.zig

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,7 @@ test "Write" {
368368
defer f.close();
369369
try single(.write, .{ .file = f, .buffer = "foobar", .offset = 0, .out_written = &len });
370370
try std.testing.expectEqual("foobar".len, len);
371-
try f.seekTo(0); // required for windows
372-
const read = try f.readAll(&buf);
371+
const read = try f.preadAll(&buf, 0);
373372
try std.testing.expectEqualSlices(u8, "foobar", buf[0..read]);
374373
}
375374
{
@@ -382,6 +381,75 @@ test "Write" {
382381
}
383382
}
384383

384+
test "Writev" {
385+
var tmp = std.testing.tmpDir(.{});
386+
defer tmp.cleanup();
387+
var buf: [64]u8 = undefined;
388+
var len: usize = undefined;
389+
{
390+
var f = try tmp.dir.createFile("test", .{ .read = true });
391+
defer f.close();
392+
try single(.writev, .{ .file = f, .iov = &.{
393+
.{ .base = "foo", .len = "foo".len },
394+
.{ .base = "bar", .len = "bar".len },
395+
}, .offset = 0, .out_written = &len });
396+
if (len < "foobar".len) {
397+
try std.testing.expectEqual("foo".len, len);
398+
const read = try f.preadAll(&buf, 0);
399+
try std.testing.expectEqualSlices(u8, "foo", buf[0..read]);
400+
} else {
401+
try std.testing.expectEqual("foobar".len, len);
402+
const read = try f.preadAll(&buf, 0);
403+
try std.testing.expectEqualSlices(u8, "foobar", buf[0..read]);
404+
}
405+
}
406+
{
407+
var f = try tmp.dir.openFile("test", .{});
408+
defer f.close();
409+
try std.testing.expectError(
410+
error.NotOpenForWriting,
411+
single(.writev, .{ .file = f, .iov = &.{
412+
.{ .base = "foo", .len = "foo".len },
413+
.{ .base = "bar", .len = "bar".len },
414+
}, .offset = 0, .out_written = &len }),
415+
);
416+
}
417+
}
418+
419+
test "Readv" {
420+
var tmp = std.testing.tmpDir(.{});
421+
defer tmp.cleanup();
422+
var buf: [64]u8 = undefined;
423+
var len: usize = undefined;
424+
{
425+
var f = try tmp.dir.createFile("test", .{ .read = true });
426+
defer f.close();
427+
try f.writeAll("foobar");
428+
try single(.readv, .{ .file = f, .iov = &.{
429+
.{ .base = buf[0..].ptr, .len = "foo".len },
430+
.{ .base = buf["foo".len..].ptr, .len = "bar".len },
431+
}, .offset = 0, .out_read = &len });
432+
if (len < "foobar".len) {
433+
try std.testing.expectEqual("foo".len, len);
434+
try std.testing.expectEqualSlices(u8, "foo", buf[0..len]);
435+
} else {
436+
try std.testing.expectEqual("foobar".len, len);
437+
try std.testing.expectEqualSlices(u8, "foobar", buf[0..len]);
438+
}
439+
}
440+
{
441+
var f = try tmp.dir.createFile("test", .{});
442+
defer f.close();
443+
try std.testing.expectError(
444+
error.NotOpenForReading,
445+
single(.readv, .{ .file = f, .iov = &.{
446+
.{ .base = buf[0..].ptr, .len = "foo".len },
447+
.{ .base = buf["foo".len..].ptr, .len = "bar".len },
448+
}, .offset = 0, .out_read = &len }),
449+
);
450+
}
451+
}
452+
385453
test "OpenAt" {
386454
var tmp = std.testing.tmpDir(.{});
387455
defer tmp.cleanup();

src/aio/IoUring.zig

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ pub fn isSupported(op_types: []const Operation) bool {
9292
.poll, .child_exit => .POLL_ADD, // 5.1 (child_exit uses waitid if available 6.5)
9393
.read_tty, .read, .wait_event_source => .READ, // 5.6
9494
.write, .notify_event_source => .WRITE, // 5.6
95+
.readv => .READV, // 5.6
96+
.writev => .WRITEV, // 5.6
9597
.accept => .ACCEPT, // 5.5
9698
.connect => .CONNECT, // 5.5
9799
.recv => .RECV, // 5.6
@@ -224,14 +226,14 @@ pub fn complete(self: *@This(), mode: aio.CompletionMode, handler: anytype) aio.
224226
op.out_socket = self.ops.getOne(.out_result, id).cast(*std.posix.socket_t);
225227
break :blk uring_handle_completion(tag, op, undefined, cqe);
226228
},
227-
inline .read, .read_tty, .recv, .recv_msg => |tag| blk: {
229+
inline .read, .readv, .read_tty, .recv, .recv_msg => |tag| blk: {
228230
var op: Operation.map.getAssertContains(tag) = undefined;
229231
op.out_id = self.ops.getOne(.out_id, id);
230232
op.out_error = @ptrCast(self.ops.getOne(.out_error, id));
231233
op.out_read = self.ops.getOne(.out_result, id).cast(*usize);
232234
break :blk uring_handle_completion(tag, op, undefined, cqe);
233235
},
234-
inline .write, .send, .send_msg, .splice => |tag| blk: {
236+
inline .write, .writev, .send, .send_msg, .splice => |tag| blk: {
235237
var op: Operation.map.getAssertContains(tag) = undefined;
236238
op.out_id = self.ops.getOne(.out_id, id);
237239
op.out_error = @ptrCast(self.ops.getOne(.out_error, id));
@@ -453,6 +455,13 @@ fn uring_queue(io: *std.os.linux.IoUring, comptime op_type: Operation, op: Opera
453455
.read_tty => try io.read(user_data, op.tty.handle, .{ .buffer = op.buffer }, 0),
454456
.read => try io.read(user_data, op.file.handle, .{ .buffer = op.buffer }, op.offset),
455457
.write => try io.write(user_data, op.file.handle, op.buffer, op.offset),
458+
.readv => blk: {
459+
const sqe = try io.get_sqe();
460+
sqe.prep_readv(op.file.handle, op.iov, op.offset);
461+
sqe.user_data = user_data;
462+
break :blk sqe;
463+
},
464+
.writev => try io.writev(user_data, op.file.handle, op.iov, op.offset),
456465
.accept => try io.accept(user_data, op.socket, op.out_addr, op.inout_addrlen, 0),
457466
.connect => try io.connect(user_data, op.socket, op.addr, op.addrlen),
458467
.bind => blk: {
@@ -596,7 +605,7 @@ fn uring_handle_completion(comptime op_type: Operation, op: Operation.map.getAss
596605
.CANCELED => error.Canceled,
597606
else => std.posix.unexpectedErrno(err),
598607
},
599-
.read_tty, .read => switch (err) {
608+
.read_tty, .read, .readv => switch (err) {
600609
.SUCCESS, .INTR, .INVAL, .FAULT, .AGAIN, .ISDIR => unreachable,
601610
.CANCELED => error.Canceled,
602611
.BADF => error.NotOpenForReading,
@@ -611,7 +620,7 @@ fn uring_handle_completion(comptime op_type: Operation, op: Operation.map.getAss
611620
.OPNOTSUPP => error.OperationNotSupported,
612621
else => std.posix.unexpectedErrno(err),
613622
},
614-
.write => switch (err) {
623+
.write, .writev => switch (err) {
615624
.SUCCESS, .INTR, .INVAL, .FAULT, .AGAIN, .DESTADDRREQ => unreachable,
616625
.CANCELED => error.Canceled,
617626
.DQUOT => error.DiskQuota,
@@ -977,8 +986,8 @@ fn uring_handle_completion(comptime op_type: Operation, op: Operation.map.getAss
977986
.poll => {},
978987
.accept => op.out_socket.* = cqe.res,
979988
.connect, .bind, .listen => {},
980-
.read_tty, .read, .recv, .recv_msg => op.out_read.* = @intCast(cqe.res),
981-
.write, .send, .send_msg, .splice => if (op.out_written) |w| {
989+
.read_tty, .read, .readv, .recv, .recv_msg => op.out_read.* = @intCast(cqe.res),
990+
.write, .writev, .send, .send_msg, .splice => if (op.out_written) |w| {
982991
w.* = @intCast(cqe.res);
983992
},
984993
.shutdown => {},

src/aio/Posix.zig

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,12 +318,12 @@ fn openReadiness(comptime op_type: Operation, op: Operation.map.getAssertContain
318318
.nop => .{},
319319
.fsync => .{},
320320
.poll => .{ .fd = op.fd, .events = op.events },
321-
.write => .{ .fd = op.file.handle, .events = .{ .out = true } },
321+
.write, .writev => .{ .fd = op.file.handle, .events = .{ .out = true } },
322322
.read_tty => switch (builtin.target.os.tag) {
323323
.macos, .ios, .watchos, .visionos, .tvos => .{},
324324
else => .{ .fd = op.tty.handle, .events = .{ .in = true } },
325325
},
326-
.read => .{ .fd = op.file.handle, .events = .{ .in = true } },
326+
.read, .readv => .{ .fd = op.file.handle, .events = .{ .in = true } },
327327
.accept, .recv, .recv_msg => .{ .fd = op.socket, .events = .{ .in = true } },
328328
.socket, .connect, .bind, .listen, .shutdown => .{},
329329
.send, .send_msg => .{ .fd = op.socket, .events = .{ .out = true } },
@@ -470,18 +470,26 @@ pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
470470

471471
if (comptime builtin.target.os.tag == .wasi) {
472472
switch (op_type) {
473-
.read => {
473+
inline .read, .readv => |tag| {
474474
var stat: std.os.wasi.fdstat_t = undefined;
475475
const state = self.uringlator.ops.getOnePtr(.state, id);
476-
std.debug.assert(std.os.wasi.fd_fdstat_get(state.read.file.handle, &stat) == .SUCCESS);
476+
switch (tag) {
477+
.read => if (std.os.wasi.fd_fdstat_get(state.read.file.handle, &stat) != .SUCCESS) unreachable, // should not happen
478+
.readv => if (std.os.wasi.fd_fdstat_get(state.readv.file.handle, &stat) != .SUCCESS) unreachable, // should not happen
479+
else => unreachable,
480+
}
477481
if (!stat.fs_rights_base.FD_READ) {
478482
return self.uringlator.finish(self, id, error.NotOpenForReading, .thread_unsafe);
479483
}
480484
},
481-
.write => {
485+
inline .write, .writev => |tag| {
482486
var stat: std.os.wasi.fdstat_t = undefined;
483487
const state = self.uringlator.ops.getOnePtr(.state, id);
484-
std.debug.assert(std.os.wasi.fd_fdstat_get(state.write.file.handle, &stat) == .SUCCESS);
488+
switch (tag) {
489+
.write => if (std.os.wasi.fd_fdstat_get(state.write.file.handle, &stat) != .SUCCESS) unreachable, // should not happen
490+
.writev => if (std.os.wasi.fd_fdstat_get(state.writev.file.handle, &stat) != .SUCCESS) unreachable, // should not happen
491+
else => unreachable,
492+
}
485493
if (!stat.fs_rights_base.FD_WRITE) {
486494
return self.uringlator.finish(self, id, error.NotOpenForWriting, .thread_unsafe);
487495
}

src/aio/Windows.zig

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,26 @@ pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
288288
self.uringlator.finish(self, id, error.Success, .thread_unsafe);
289289
}
290290
},
291+
.readv => {
292+
const state = self.uringlator.ops.getOnePtr(.state, id);
293+
const ovl = self.uringlator.ops.getOnePtr(.ovl, id);
294+
if (state.readv.iov.len == 0) {
295+
ovl.res = 0;
296+
return self.uringlator.finish(self, id, error.Success, .thread_unsafe);
297+
}
298+
const flags = try getHandleAccessInfo(state.readv.file.handle);
299+
if (flags.FILE_READ_DATA != 1) return self.uringlator.finish(self, id, error.NotOpenForReading, .thread_unsafe);
300+
const h = fs.ReOpenFile(state.readv.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED);
301+
_ = wtry(h != null and h.? != INVALID_HANDLE) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
302+
self.iocp.associateHandle(id, h.?) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
303+
ovl.* = .{ .overlapped = ovlOff(state.readv.offset), .owned = .{ .handle = h.? } };
304+
var read: u32 = undefined;
305+
const ret = wtry(fs.ReadFile(h.?, state.readv.iov[0].base, @intCast(state.readv.iov[0].len), &read, &ovl.overlapped)) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
306+
if (ret != 0) {
307+
ovl.res = read;
308+
self.uringlator.finish(self, id, error.Success, .thread_unsafe);
309+
}
310+
},
291311
.write => {
292312
const state = self.uringlator.ops.getOnePtr(.state, id);
293313
const ovl = self.uringlator.ops.getOnePtr(.ovl, id);
@@ -304,6 +324,26 @@ pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
304324
self.uringlator.finish(self, id, error.Success, .thread_unsafe);
305325
}
306326
},
327+
.writev => {
328+
const state = self.uringlator.ops.getOnePtr(.state, id);
329+
const ovl = self.uringlator.ops.getOnePtr(.ovl, id);
330+
if (state.writev.iov.len == 0) {
331+
ovl.res = 0;
332+
return self.uringlator.finish(self, id, error.Success, .thread_unsafe);
333+
}
334+
const flags = try getHandleAccessInfo(state.writev.file.handle);
335+
if (flags.FILE_WRITE_DATA != 1) return self.uringlator.finish(self, id, error.NotOpenForWriting, .thread_unsafe);
336+
const h = fs.ReOpenFile(state.writev.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED);
337+
_ = wtry(h != null and h.? != INVALID_HANDLE) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
338+
self.iocp.associateHandle(id, h.?) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
339+
ovl.* = .{ .overlapped = ovlOff(state.writev.offset), .owned = .{ .handle = h.? } };
340+
var written: u32 = undefined;
341+
const ret = wtry(fs.WriteFile(h.?, state.writev.iov[0].base, @intCast(state.writev.iov[0].len), &written, &ovl.overlapped)) catch |err| return self.uringlator.finish(self, id, err, .thread_unsafe);
342+
if (ret != 0) {
343+
ovl.res = written;
344+
self.uringlator.finish(self, id, error.Success, .thread_unsafe);
345+
}
346+
},
307347
.accept => {
308348
const out_socket = self.uringlator.ops.getOne(.out_result, id).cast(*std.posix.socket_t);
309349
const win_state = self.uringlator.ops.getOnePtr(.win_state, id);
@@ -427,7 +467,7 @@ pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
427467

428468
pub fn uringlator_cancel(self: *@This(), id: aio.Id, op_type: Operation, err: Operation.Error) bool {
429469
switch (op_type) {
430-
.read, .write => {
470+
.read, .write, .readv, .writev => {
431471
const ovl = self.uringlator.ops.getOnePtr(.ovl, id);
432472
if (io.CancelIoEx(ovl.owned.handle, &ovl.overlapped) != 0) {
433473
self.uringlator.finish(self, id, err, .thread_unsafe);
@@ -486,11 +526,11 @@ pub fn uringlator_complete(self: *@This(), id: aio.Id, op_type: Operation, failu
486526
if (state.accept.out_addr) |ad| @memcpy(std.mem.asBytes(ad)[0..@intCast(addrlen)], std.mem.asBytes(addr_ptr)[0..@intCast(addrlen)]);
487527
}
488528
},
489-
.read, .recv, .recv_msg => {
529+
.read, .readv, .recv, .recv_msg => {
490530
const out_read = self.uringlator.ops.getOne(.out_result, id).cast(*usize);
491531
out_read.* = ovl.res;
492532
},
493-
.write, .send, .send_msg => {
533+
.write, .writev, .send, .send_msg => {
494534
const out_written = self.uringlator.ops.getOne(.out_result, id).cast(?*usize);
495535
if (out_written) |w| w.* = ovl.res;
496536
},

src/aio/ops.zig

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@ pub const Write = struct {
127127
userdata: usize = 0,
128128
};
129129

130+
/// std.fs.File.readv
131+
pub const Readv = struct {
132+
pub const Error = std.posix.PReadError || SharedError;
133+
file: std.fs.File,
134+
iov: []const posix.iovec,
135+
offset: u64 = OFFSET_CURRENT_POS,
136+
out_read: *usize,
137+
out_id: ?*Id = null,
138+
out_error: ?*Error = null,
139+
userdata: usize = 0,
140+
};
141+
142+
/// std.fs.File.writev
143+
pub const Writev = struct {
144+
pub const Error = std.posix.PWriteError || SharedError;
145+
file: std.fs.File,
146+
iov: []const posix.iovec_const,
147+
offset: u64 = OFFSET_CURRENT_POS,
148+
out_written: ?*usize = null,
149+
out_id: ?*Id = null,
150+
out_error: ?*Error = null,
151+
userdata: usize = 0,
152+
};
153+
130154
/// std.posix.accept
131155
pub const Accept = struct {
132156
pub const Error = std.posix.AcceptError || SharedError;
@@ -502,6 +526,8 @@ pub const Operation = enum {
502526
read_tty,
503527
read,
504528
write,
529+
readv,
530+
writev,
505531
accept,
506532
connect,
507533
bind,
@@ -536,6 +562,8 @@ pub const Operation = enum {
536562
.read_tty = ReadTty,
537563
.read = Read,
538564
.write = Write,
565+
.readv = Readv,
566+
.writev = Writev,
539567
.accept = Accept,
540568
.connect = Connect,
541569
.bind = Bind,
@@ -565,10 +593,10 @@ pub const Operation = enum {
565593
});
566594

567595
pub const required: []const @This() = &.{
568-
.nop, .fsync, .poll, .read_tty, .read, .write, .accept, .connect,
569-
.bind, .listen, .recv, .send, .recv_msg, .send_msg, .shutdown, .open_at,
570-
.close_file, .close_dir, .timeout, .link_timeout, .cancel, .rename_at, .mkdir_at, .symlink_at,
571-
.child_exit, .socket, .close_socket, .notify_event_source, .wait_event_source, .close_event_source,
596+
.nop, .fsync, .poll, .read_tty, .read, .write, .readv, .writev, .accept, .connect,
597+
.bind, .listen, .recv, .send, .recv_msg, .send_msg, .shutdown, .open_at, .close_file, .close_dir,
598+
.timeout, .link_timeout, .cancel, .rename_at, .mkdir_at, .symlink_at, .child_exit, .socket, .close_socket, .notify_event_source,
599+
.wait_event_source, .close_event_source,
572600
};
573601

574602
pub const Error = blk: {
@@ -606,8 +634,8 @@ pub const Operation = enum {
606634
.close_event_source,
607635
.notify_event_source,
608636
=> undefined,
609-
.read, .read_tty, .recv, .recv_msg => @ptrCast(op.out_read),
610-
.write, .send, .send_msg, .splice => @ptrCast(op.out_written),
637+
.read, .readv, .read_tty, .recv, .recv_msg => @ptrCast(op.out_read),
638+
.write, .writev, .send, .send_msg, .splice => @ptrCast(op.out_written),
611639
.socket, .accept => @ptrCast(op.out_socket),
612640
.open_at => @ptrCast(op.out_file),
613641
.child_exit => @ptrCast(op.out_term),
@@ -638,8 +666,8 @@ pub const Operation = enum {
638666
.close_event_source,
639667
.notify_event_source,
640668
=> undefined,
641-
.read, .read_tty, .recv, .recv_msg => op.out_read = self.cast(*usize),
642-
.write, .send, .send_msg, .splice => op.out_written = self.cast(?*usize),
669+
.read, .readv, .read_tty, .recv, .recv_msg => op.out_read = self.cast(*usize),
670+
.write, .writev, .send, .send_msg, .splice => op.out_written = self.cast(?*usize),
643671
.socket, .accept => op.out_socket = self.cast(*std.posix.socket_t),
644672
.open_at => op.out_file = self.cast(*std.fs.File),
645673
.child_exit => op.out_term = self.cast(?*std.process.Child.Term),

0 commit comments

Comments
 (0)