Skip to content

Commit 198bbbb

Browse files
ergCloudef
authored andcommitted
coro: update for new linked list changes
zig version 0.15.0-dev.286+ddcf6fcdf
1 parent 2803432 commit 198bbbb

File tree

4 files changed

+47
-36
lines changed

4 files changed

+47
-36
lines changed

src/coro/Frame.zig

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const Scheduler = @import("Scheduler.zig");
44
const Link = @import("minilib").Link;
55
const log = std.log.scoped(.coro);
66

7-
pub const List = std.DoublyLinkedList(Link(@This(), "link", .double));
7+
pub const List = std.DoublyLinkedList;
88
pub const stack_alignment = Fiber.stack_alignment;
99
pub const Stack = Fiber.Stack;
1010

@@ -23,7 +23,7 @@ pub const Status = enum(u8) {
2323
}
2424
};
2525

26-
pub const WaitList = std.SinglyLinkedList(Link(@This(), "wait_link", .single));
26+
pub const WaitList = std.SinglyLinkedList;
2727

2828
fiber: *Fiber,
2929
stack: ?Fiber.Stack = null,
@@ -34,8 +34,8 @@ canceled: bool = false,
3434
cancelable: bool = true,
3535
status: Status = .active,
3636
yield_state: u8 = 0,
37-
link: List.Node = .{ .data = .{} },
38-
wait_link: WaitList.Node = .{ .data = .{} },
37+
link: List.Node = .{},
38+
wait_link: WaitList.Node = .{},
3939
completer: ?*@This() = null,
4040

4141
pub fn current() ?*@This() {

src/coro/Scheduler.zig

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ pub fn deinit(self: *@This()) void {
2727
var next = self.completed.first;
2828
while (next) |node| {
2929
next = node.next;
30-
node.data.cast().deinit();
30+
const frame: *Frame = @fieldParentPtr("link", node);
31+
frame.deinit();
3132
}
3233
while (self.running.popFirst()) |node| {
33-
var frame = node.data.cast();
34+
const frame: *Frame = @fieldParentPtr("link", node);
3435
frame.status = .completed;
3536
self.completed.append(&frame.link);
3637
frame.deinit();
@@ -83,7 +84,7 @@ pub fn tick(self: *@This(), mode: aio.CompletionMode) aio.Error!usize {
8384
var next: ?*Frame.List.Node = first;
8485
while (next) |node| {
8586
next = node.next;
86-
var frame = node.data.cast();
87+
const frame: *Frame = @fieldParentPtr("link", node);
8788
if (frame.detached) {
8889
std.debug.assert(frame.completer == null);
8990
frame.deinit();
@@ -93,7 +94,7 @@ pub fn tick(self: *@This(), mode: aio.CompletionMode) aio.Error!usize {
9394
}
9495
}
9596
_ = try self.io.complete(mode, self);
96-
return self.running.len;
97+
return self.running.len();
9798
}
9899

99100
pub const CompleteMode = Frame.CompleteMode;
@@ -104,7 +105,8 @@ pub fn run(self: *@This(), mode: CompleteMode) aio.Error!void {
104105
// start canceling tasks starting from the most recent one
105106
while (self.running.last) |node| {
106107
if (self.state == .tear_down) return error.Unexpected;
107-
node.data.cast().complete(.cancel, void);
108+
const frame: *Frame = @fieldParentPtr("link", node);
109+
frame.complete(.cancel, void);
108110
}
109111
} else {
110112
while (self.state != .tear_down) {

src/coro/sync.zig

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ fn wakeupWaiters(list: *Frame.WaitList, status: anytype) void {
88
var next = list.first;
99
while (next) |node| {
1010
next = node.next;
11-
node.data.cast().wakeup(status);
11+
const frame: *Frame = @fieldParentPtr("wait_link", node);
12+
frame.wakeup(status);
1213
}
1314
}
1415

@@ -465,14 +466,19 @@ test "RwLock.Cancel" {
465466
/// Only one consumer receives each sent data, regardless of the number of consumers.
466467
pub fn Queue(comptime T: type) type {
467468
return struct {
468-
const QueueList = std.DoublyLinkedList(T);
469-
const MemoryPool = std.heap.MemoryPool(QueueList.Node);
469+
const QueueList = std.DoublyLinkedList;
470+
const MemoryPool = std.heap.MemoryPool(QueueNode);
470471

471472
pool: MemoryPool,
472473
queue: QueueList = .{},
473474
mutex: Mutex,
474475
semaphore: aio.EventSource,
475476

477+
const QueueNode = struct {
478+
data: T,
479+
node: QueueList.Node = .{},
480+
};
481+
476482
pub fn init(allocator: std.mem.Allocator, preheat_size: usize) !@This() {
477483
return .{
478484
.pool = try MemoryPool.initPreheated(allocator, preheat_size),
@@ -495,12 +501,11 @@ pub fn Queue(comptime T: type) type {
495501
try self.mutex.lock();
496502
defer self.mutex.unlock();
497503

498-
const node = try self.pool.create();
499-
errdefer self.pool.destroy(node);
500-
501-
node.data = data;
504+
const queue_node = try self.pool.create();
505+
errdefer self.pool.destroy(queue_node);
502506

503-
self.queue.prepend(node);
507+
queue_node.* = .{ .data = data };
508+
self.queue.append(&queue_node.node);
504509

505510
self.semaphore.notify();
506511
}
@@ -513,9 +518,10 @@ pub fn Queue(comptime T: type) type {
513518
error.WouldBlock => return null,
514519
};
515520

516-
if (self.queue.pop()) |node| {
517-
const data = node.data;
518-
self.pool.destroy(node);
521+
if (self.queue.popFirst()) |node| {
522+
const queue_node: *QueueNode = @fieldParentPtr("node", node);
523+
const data = queue_node.data;
524+
self.pool.destroy(queue_node);
519525
return data;
520526
}
521527
}
@@ -528,9 +534,10 @@ pub fn Queue(comptime T: type) type {
528534
try self.mutex.lock();
529535
defer self.mutex.unlock();
530536

531-
if (self.queue.pop()) |node| {
532-
const data = node.data;
533-
self.pool.destroy(node);
537+
if (self.queue.popFirst()) |node| {
538+
const queue_node: *QueueNode = @fieldParentPtr("node", node);
539+
const data = queue_node.data;
540+
self.pool.destroy(queue_node);
534541
return data;
535542
}
536543

@@ -543,8 +550,9 @@ pub fn Queue(comptime T: type) type {
543550

544551
while (true) self.semaphore.waitNonBlocking() catch break;
545552

546-
while (self.queue.pop()) |node| {
547-
self.pool.destroy(node);
553+
while (self.queue.popFirst()) |node| {
554+
const queue_node: *QueueNode = @fieldParentPtr("node", node);
555+
self.pool.destroy(queue_node);
548556
}
549557
}
550558
};
@@ -603,7 +611,7 @@ test "Queue" {
603611

604612
// check if it has returned to its initial state
605613
try std.testing.expectEqual(null, queue.tryRecv());
606-
try std.testing.expectEqual(0, queue.queue.len);
614+
try std.testing.expectEqual(0, queue.queue.len());
607615

608616
var threads: [2]std.Thread = undefined;
609617

@@ -616,5 +624,5 @@ test "Queue" {
616624

617625
// check if it has returned to its initial state
618626
try std.testing.expectEqual(null, queue.tryRecv());
619-
try std.testing.expectEqual(0, queue.queue.len);
627+
try std.testing.expectEqual(0, queue.queue.len());
620628
}

src/minilib/dynamic_thread_pool.zig

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const DefaultImpl = struct {
1515
mutex: std.Thread.Mutex = .{},
1616
cond: std.Thread.Condition = .{},
1717
threads: []DynamicThread = &.{},
18-
run_queue: RunQueue = .{},
18+
run_queue: std.SinglyLinkedList = .{},
1919
idling_threads: u32 = 0,
2020
active_threads: u32 = 0,
2121
timeout: u64,
@@ -24,8 +24,10 @@ const DefaultImpl = struct {
2424
name: ?[]const u8,
2525
stack_size: usize,
2626

27-
const RunQueue = std.SinglyLinkedList(Runnable);
28-
const Runnable = struct { runFn: RunProto };
27+
const Runnable = struct {
28+
runFn: RunProto,
29+
node: std.SinglyLinkedList.Node = .{},
30+
};
2931
const RunProto = *const fn (*@This(), *Runnable) void;
3032

3133
pub const Options = struct {
@@ -97,11 +99,10 @@ const DefaultImpl = struct {
9799
const ThreadPool = @This();
98100
const Closure = struct {
99101
arguments: Args,
100-
run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } },
102+
runnable: Runnable = .{ .runFn = runFn },
101103

102104
fn runFn(pool: *ThreadPool, runnable: *Runnable) void {
103-
const run_node: *RunQueue.Node = @fieldParentPtr("data", runnable);
104-
const closure: *@This() = @alignCast(@fieldParentPtr("run_node", run_node));
105+
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
105106
@call(.auto, func, closure.arguments);
106107
// The thread pool's allocator is protected by the mutex.
107108
pool.mutex.lock();
@@ -136,7 +137,7 @@ const DefaultImpl = struct {
136137
// Closures are often same size, so they can be bucketed and reused
137138
const closure = try self.arena.allocator().create(Closure);
138139
closure.* = .{ .arguments = args };
139-
self.run_queue.prepend(&closure.run_node);
140+
self.run_queue.prepend(&closure.runnable.node);
140141
}
141142

142143
// Notify waiting threads outside the lock to try and keep the critical section small.
@@ -192,8 +193,8 @@ const DefaultImpl = struct {
192193
if (self.run_queue.popFirst()) |run_node| {
193194
self.mutex.unlock();
194195
defer self.mutex.lock();
195-
const runFn = run_node.data.runFn;
196-
runFn(self, &run_node.data);
196+
const runnable: *Runnable = @fieldParentPtr("node", run_node);
197+
runnable.runFn(self, runnable);
197198
timer.reset();
198199
} else break;
199200
}

0 commit comments

Comments
 (0)