From 0874a5ba77a1d049a0e9e7f9f249605c109a731c Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 29 Jun 2018 14:45:01 -0400 Subject: [PATCH] std.atomic.queue - document limitation and add MPSC queue --- CMakeLists.txt | 3 +- std/atomic/index.zig | 8 +- std/atomic/queue_mpmc.zig | 214 +++++++++++++++++++++++ std/atomic/{queue.zig => queue_mpsc.zig} | 62 ++++--- 4 files changed, 254 insertions(+), 33 deletions(-) create mode 100644 std/atomic/queue_mpmc.zig rename std/atomic/{queue.zig => queue_mpsc.zig} (70%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 789da4a8a6..4838aeb797 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -431,7 +431,8 @@ set(ZIG_CPP_SOURCES set(ZIG_STD_FILES "array_list.zig" "atomic/index.zig" - "atomic/queue.zig" + "atomic/queue_mpmc.zig" + "atomic/queue_mpsc.zig" "atomic/stack.zig" "base64.zig" "buf_map.zig" diff --git a/std/atomic/index.zig b/std/atomic/index.zig index 9d556a6415..c0ea5be183 100644 --- a/std/atomic/index.zig +++ b/std/atomic/index.zig @@ -1,7 +1,9 @@ pub const Stack = @import("stack.zig").Stack; -pub const Queue = @import("queue.zig").Queue; +pub const QueueMpsc = @import("queue_mpsc.zig").QueueMpsc; +pub const QueueMpmc = @import("queue_mpmc.zig").QueueMpmc; test "std.atomic" { - _ = @import("stack.zig").Stack; - _ = @import("queue.zig").Queue; + _ = @import("stack.zig"); + _ = @import("queue_mpsc.zig"); + _ = @import("queue_mpmc.zig"); } diff --git a/std/atomic/queue_mpmc.zig b/std/atomic/queue_mpmc.zig new file mode 100644 index 0000000000..7ffc9f9ccb --- /dev/null +++ b/std/atomic/queue_mpmc.zig @@ -0,0 +1,214 @@ +const builtin = @import("builtin"); +const AtomicOrder = builtin.AtomicOrder; +const AtomicRmwOp = builtin.AtomicRmwOp; + +/// Many producer, many consumer, non-allocating, thread-safe, lock-free +/// This implementation has a crippling limitation - it hangs onto node +/// memory for 1 extra get() and 1 extra put() operation - when get() returns a node, that +/// node must not be freed until both the next get() and the next put() completes. +pub fn QueueMpmc(comptime T: type) type { + return struct { + head: *Node, + tail: *Node, + root: Node, + + pub const Self = this; + + pub const Node = struct { + next: ?*Node, + data: T, + }; + + /// TODO: well defined copy elision: https://github.com/ziglang/zig/issues/287 + pub fn init(self: *Self) void { + self.root.next = null; + self.head = &self.root; + self.tail = &self.root; + } + + pub fn put(self: *Self, node: *Node) void { + node.next = null; + + const tail = @atomicRmw(*Node, &self.tail, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst); + _ = @atomicRmw(?*Node, &tail.next, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst); + } + + /// node must not be freed until both the next get() and the next put() complete + pub fn get(self: *Self) ?*Node { + var head = @atomicLoad(*Node, &self.head, AtomicOrder.SeqCst); + while (true) { + const node = head.next orelse return null; + head = @cmpxchgWeak(*Node, &self.head, head, node, AtomicOrder.SeqCst, AtomicOrder.SeqCst) orelse return node; + } + } + + ///// This is a debug function that is not thread-safe. + pub fn dump(self: *Self) void { + std.debug.warn("head: "); + dumpRecursive(self.head, 0); + std.debug.warn("tail: "); + dumpRecursive(self.tail, 0); + } + + fn dumpRecursive(optional_node: ?*Node, indent: usize) void { + var stderr_file = std.io.getStdErr() catch return; + const stderr = &std.io.FileOutStream.init(&stderr_file).stream; + stderr.writeByteNTimes(' ', indent) catch return; + if (optional_node) |node| { + std.debug.warn("0x{x}={}\n", @ptrToInt(node), node.data); + dumpRecursive(node.next, indent + 1); + } else { + std.debug.warn("(null)\n"); + } + } + }; +} + +const std = @import("std"); +const assert = std.debug.assert; + +const Context = struct { + allocator: *std.mem.Allocator, + queue: *QueueMpmc(i32), + put_sum: isize, + get_sum: isize, + get_count: usize, + puts_done: u8, // TODO make this a bool +}; + +// TODO add lazy evaluated build options and then put puts_per_thread behind +// some option such as: "AggressiveMultithreadedFuzzTest". In the AppVeyor +// CI we would use a less aggressive setting since at 1 core, while we still +// want this test to pass, we need a smaller value since there is so much thrashing +// we would also use a less aggressive setting when running in valgrind +const puts_per_thread = 500; +const put_thread_count = 3; + +test "std.atomic.queue_mpmc" { + var direct_allocator = std.heap.DirectAllocator.init(); + defer direct_allocator.deinit(); + + var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 300 * 1024); + defer direct_allocator.allocator.free(plenty_of_memory); + + var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory); + var a = &fixed_buffer_allocator.allocator; + + var queue: QueueMpmc(i32) = undefined; + queue.init(); + var context = Context{ + .allocator = a, + .queue = &queue, + .put_sum = 0, + .get_sum = 0, + .puts_done = 0, + .get_count = 0, + }; + + var putters: [put_thread_count]*std.os.Thread = undefined; + for (putters) |*t| { + t.* = try std.os.spawnThread(&context, startPuts); + } + var getters: [put_thread_count]*std.os.Thread = undefined; + for (getters) |*t| { + t.* = try std.os.spawnThread(&context, startGets); + } + + for (putters) |t| + t.wait(); + _ = @atomicRmw(u8, &context.puts_done, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + for (getters) |t| + t.wait(); + + if (context.put_sum != context.get_sum) { + std.debug.panic("failure\nput_sum:{} != get_sum:{}", context.put_sum, context.get_sum); + } + + if (context.get_count != puts_per_thread * put_thread_count) { + std.debug.panic( + "failure\nget_count:{} != puts_per_thread:{} * put_thread_count:{}", + context.get_count, + u32(puts_per_thread), + u32(put_thread_count), + ); + } +} + +fn startPuts(ctx: *Context) u8 { + var put_count: usize = puts_per_thread; + var r = std.rand.DefaultPrng.init(0xdeadbeef); + while (put_count != 0) : (put_count -= 1) { + std.os.time.sleep(0, 1); // let the os scheduler be our fuzz + const x = @bitCast(i32, r.random.scalar(u32)); + const node = ctx.allocator.create(QueueMpmc(i32).Node{ + .next = undefined, + .data = x, + }) catch unreachable; + ctx.queue.put(node); + _ = @atomicRmw(isize, &ctx.put_sum, builtin.AtomicRmwOp.Add, x, AtomicOrder.SeqCst); + } + return 0; +} + +fn startGets(ctx: *Context) u8 { + while (true) { + const last = @atomicLoad(u8, &ctx.puts_done, builtin.AtomicOrder.SeqCst) == 1; + + while (ctx.queue.get()) |node| { + std.os.time.sleep(0, 1); // let the os scheduler be our fuzz + _ = @atomicRmw(isize, &ctx.get_sum, builtin.AtomicRmwOp.Add, node.data, builtin.AtomicOrder.SeqCst); + _ = @atomicRmw(usize, &ctx.get_count, builtin.AtomicRmwOp.Add, 1, builtin.AtomicOrder.SeqCst); + } + + if (last) return 0; + } +} + +test "std.atomic.queue_mpmc single-threaded" { + var queue: QueueMpmc(i32) = undefined; + queue.init(); + + var node_0 = QueueMpmc(i32).Node{ + .data = 0, + .next = undefined, + }; + queue.put(&node_0); + + var node_1 = QueueMpmc(i32).Node{ + .data = 1, + .next = undefined, + }; + queue.put(&node_1); + + assert(queue.get().?.data == 0); + + var node_2 = QueueMpmc(i32).Node{ + .data = 2, + .next = undefined, + }; + queue.put(&node_2); + + var node_3 = QueueMpmc(i32).Node{ + .data = 3, + .next = undefined, + }; + queue.put(&node_3); + + assert(queue.get().?.data == 1); + + assert(queue.get().?.data == 2); + + var node_4 = QueueMpmc(i32).Node{ + .data = 4, + .next = undefined, + }; + queue.put(&node_4); + + assert(queue.get().?.data == 3); + // if we were to set node_3.next to null here, it would cause this test + // to fail. this demonstrates the limitation of hanging on to extra memory. + + assert(queue.get().?.data == 4); + + assert(queue.get() == null); +} diff --git a/std/atomic/queue.zig b/std/atomic/queue_mpsc.zig similarity index 70% rename from std/atomic/queue.zig rename to std/atomic/queue_mpsc.zig index 16dc9f6cc3..66eb4573df 100644 --- a/std/atomic/queue.zig +++ b/std/atomic/queue_mpsc.zig @@ -1,49 +1,54 @@ +const std = @import("std"); +const assert = std.debug.assert; const builtin = @import("builtin"); const AtomicOrder = builtin.AtomicOrder; const AtomicRmwOp = builtin.AtomicRmwOp; -/// Many reader, many writer, non-allocating, thread-safe, lock-free -pub fn Queue(comptime T: type) type { +/// Many producer, single consumer, non-allocating, thread-safe, lock-free +pub fn QueueMpsc(comptime T: type) type { return struct { - head: *Node, - tail: *Node, - root: Node, + inboxes: [2]std.atomic.Stack(T), + outbox: std.atomic.Stack(T), + inbox_index: usize, pub const Self = this; - pub const Node = struct { - next: ?*Node, - data: T, - }; + pub const Node = std.atomic.Stack(T).Node; - // TODO: well defined copy elision: https://github.com/ziglang/zig/issues/287 - pub fn init(self: *Self) void { - self.root.next = null; - self.head = &self.root; - self.tail = &self.root; + pub fn init() Self { + return Self{ + .inboxes = []std.atomic.Stack(T){ + std.atomic.Stack(T).init(), + std.atomic.Stack(T).init(), + }, + .outbox = std.atomic.Stack(T).init(), + .inbox_index = 0, + }; } pub fn put(self: *Self, node: *Node) void { - node.next = null; - - const tail = @atomicRmw(*Node, &self.tail, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst); - _ = @atomicRmw(?*Node, &tail.next, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst); + const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst); + const inbox = &self.inboxes[inbox_index]; + inbox.push(node); } pub fn get(self: *Self) ?*Node { - var head = @atomicLoad(*Node, &self.head, AtomicOrder.SeqCst); - while (true) { - const node = head.next orelse return null; - head = @cmpxchgWeak(*Node, &self.head, head, node, AtomicOrder.SeqCst, AtomicOrder.SeqCst) orelse return node; + if (self.outbox.pop()) |node| { + return node; } + const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst); + const prev_inbox = &self.inboxes[prev_inbox_index]; + while (prev_inbox.pop()) |node| { + self.outbox.push(node); + } + return self.outbox.pop(); } }; } -const std = @import("std"); const Context = struct { allocator: *std.mem.Allocator, - queue: *Queue(i32), + queue: *QueueMpsc(i32), put_sum: isize, get_sum: isize, get_count: usize, @@ -58,7 +63,7 @@ const Context = struct { const puts_per_thread = 500; const put_thread_count = 3; -test "std.atomic.queue" { +test "std.atomic.queue_mpsc" { var direct_allocator = std.heap.DirectAllocator.init(); defer direct_allocator.deinit(); @@ -68,8 +73,7 @@ test "std.atomic.queue" { var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory); var a = &fixed_buffer_allocator.allocator; - var queue: Queue(i32) = undefined; - queue.init(); + var queue = QueueMpsc(i32).init(); var context = Context{ .allocator = a, .queue = &queue, @@ -83,7 +87,7 @@ test "std.atomic.queue" { for (putters) |*t| { t.* = try std.os.spawnThread(&context, startPuts); } - var getters: [put_thread_count]*std.os.Thread = undefined; + var getters: [1]*std.os.Thread = undefined; for (getters) |*t| { t.* = try std.os.spawnThread(&context, startGets); } @@ -114,7 +118,7 @@ fn startPuts(ctx: *Context) u8 { while (put_count != 0) : (put_count -= 1) { std.os.time.sleep(0, 1); // let the os scheduler be our fuzz const x = @bitCast(i32, r.random.scalar(u32)); - const node = ctx.allocator.create(Queue(i32).Node{ + const node = ctx.allocator.create(QueueMpsc(i32).Node{ .next = undefined, .data = x, }) catch unreachable;