2019-03-02 21:46:04 +00:00
const std = @import("../std.zig");
2018-07-10 03:22:44 +01:00
const builtin = @import("builtin");
const assert = std.debug.assert;
2019-02-08 23:18:47 +00:00
const testing = std.testing;
2018-07-10 03:22:44 +01:00
const Loop = std.event.Loop;
2018-08-02 22:04:17 +01:00
/// many producer, many consumer, thread-safe, runtime configurable buffer size
2018-07-10 03:22:44 +01:00
/// when buffer is empty, consumers suspend and are resumed by producers
/// when buffer is full, producers suspend and are resumed by consumers
pub fn Channel(comptime T: type) type {
2018-11-13 13:08:37 +00:00
return struct {
2018-07-10 03:22:44 +01:00
loop: *Loop,
2018-07-12 00:38:01 +01:00
getters: std.atomic.Queue(GetNode),
2018-08-02 22:04:17 +01:00
or_null_queue: std.atomic.Queue(*std.atomic.Queue(GetNode).Node),
2018-07-12 00:38:01 +01:00
putters: std.atomic.Queue(PutNode),
2018-07-10 03:22:44 +01:00
get_count: usize,
put_count: usize,
dispatch_lock: u8, // TODO make this a bool
need_dispatch: u8, // TODO make this a bool
// simple fixed size ring buffer
buffer_nodes: []T,
buffer_index: usize,
buffer_len: usize,
2018-09-13 21:34:33 +01:00
const SelfChannel = @This();
2018-11-13 13:08:37 +00:00
const GetNode = struct {
2018-07-10 03:22:44 +01:00
tick_node: *Loop.NextTickNode,
2018-08-02 22:04:17 +01:00
data: Data,
2018-11-13 13:08:37 +00:00
const Data = union(enum) {
2018-08-02 22:04:17 +01:00
Normal: Normal,
OrNull: OrNull,
2018-11-13 13:08:37 +00:00
const Normal = struct {
2018-08-02 22:04:17 +01:00
ptr: *T,
2018-11-13 13:08:37 +00:00
const OrNull = struct {
2018-08-02 22:04:17 +01:00
ptr: *?T,
or_null: *std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node,
2018-07-10 03:22:44 +01:00
2018-11-13 13:08:37 +00:00
const PutNode = struct {
2018-07-10 03:22:44 +01:00
data: T,
tick_node: *Loop.NextTickNode,
/// call destroy when done
pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
const buffer_nodes = try loop.allocator.alloc(T, capacity);
errdefer loop.allocator.free(buffer_nodes);
2019-02-03 21:13:28 +00:00
const self = try loop.allocator.create(SelfChannel);
self.* = SelfChannel{
2018-07-10 03:22:44 +01:00
.loop = loop,
.buffer_len = 0,
.buffer_nodes = buffer_nodes,
.buffer_index = 0,
.dispatch_lock = 0,
.need_dispatch = 0,
2018-07-12 00:38:01 +01:00
.getters = std.atomic.Queue(GetNode).init(),
.putters = std.atomic.Queue(PutNode).init(),
2018-08-02 22:04:17 +01:00
.or_null_queue = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).init(),
2018-07-10 03:22:44 +01:00
.get_count = 0,
.put_count = 0,
2019-02-03 21:13:28 +00:00
2018-07-10 03:22:44 +01:00
errdefer loop.allocator.destroy(self);
return self;
/// must be called when all calls to put and get have suspended and no more calls occur
pub fn destroy(self: *SelfChannel) void {
while (self.getters.get()) |get_node| {
2019-08-12 00:53:10 +01:00
resume get_node.data.tick_node.data;
2018-07-10 03:22:44 +01:00
while (self.putters.get()) |put_node| {
2019-08-12 00:53:10 +01:00
resume put_node.data.tick_node.data;
2018-07-10 03:22:44 +01:00
2019-08-12 00:53:10 +01:00
/// puts a data item in the channel. The function returns when the value has been added to the
2018-07-10 03:22:44 +01:00
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
2019-08-12 00:53:10 +01:00
/// Or when the channel is destroyed.
pub fn put(self: *SelfChannel, data: T) void {
2019-08-08 21:41:38 +01:00
var my_tick_node = Loop.NextTickNode.init(@frame());
2018-11-13 13:08:37 +00:00
var queue_node = std.atomic.Queue(PutNode).Node.init(PutNode{
2018-08-02 22:04:17 +01:00
.tick_node = &my_tick_node,
.data = data,
// TODO test canceling a put()
errdefer {
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
2018-08-02 22:04:17 +01:00
const need_dispatch = !self.putters.remove(&queue_node);
if (need_dispatch) {
// oops we made the put_count incorrect for a period of time. fix by dispatching.
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
2018-08-02 22:04:17 +01:00
2018-07-29 09:12:33 +01:00
suspend {
2018-07-10 03:22:44 +01:00
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
2018-07-10 03:22:44 +01:00
2018-07-25 02:28:54 +01:00
2018-07-10 03:22:44 +01:00
2019-08-12 00:53:10 +01:00
/// await this function to get an item from the channel. If the buffer is empty, the frame will
2018-07-10 03:22:44 +01:00
/// complete when the next item is put in the channel.
pub async fn get(self: *SelfChannel) T {
2019-08-12 03:35:12 +01:00
// TODO https://github.com/ziglang/zig/issues/2765
2018-07-10 03:22:44 +01:00
var result: T = undefined;
2019-08-08 21:41:38 +01:00
var my_tick_node = Loop.NextTickNode.init(@frame());
2018-11-13 13:08:37 +00:00
var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
2018-08-02 22:04:17 +01:00
.tick_node = &my_tick_node,
2018-11-13 13:08:37 +00:00
.data = GetNode.Data{
.Normal = GetNode.Normal{ .ptr = &result },
2018-08-02 22:04:17 +01:00
// TODO test canceling a get()
errdefer {
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
2018-08-02 22:04:17 +01:00
const need_dispatch = !self.getters.remove(&queue_node);
if (need_dispatch) {
// oops we made the get_count incorrect for a period of time. fix by dispatching.
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
2018-08-02 22:04:17 +01:00
2018-08-02 22:29:31 +01:00
suspend {
2018-07-10 03:22:44 +01:00
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
2018-07-10 03:22:44 +01:00
2018-07-25 02:28:54 +01:00
2018-07-10 03:22:44 +01:00
return result;
2018-08-02 22:04:17 +01:00
//pub async fn select(comptime EnumUnion: type, channels: ...) EnumUnion {
// assert(@memberCount(EnumUnion) == channels.len); // enum union and channels mismatch
// assert(channels.len != 0); // enum unions cannot have 0 fields
// if (channels.len == 1) {
// const result = await (async channels[0].get() catch unreachable);
// return @unionInit(EnumUnion, @memberName(EnumUnion, 0), result);
// }
/// Await this function to get an item from the channel. If the buffer is empty and there are no
/// puts waiting, this returns null.
/// Await is necessary for locking purposes. The function will be resumed after checking the channel
/// for data and will not wait for data to be available.
pub async fn getOrNull(self: *SelfChannel) ?T {
// TODO integrate this function with named return values
// so we can get rid of this extra result copy
var result: ?T = null;
2019-08-08 21:41:38 +01:00
var my_tick_node = Loop.NextTickNode.init(@frame());
2018-08-02 22:04:17 +01:00
var or_null_node = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node.init(undefined);
2018-11-13 13:08:37 +00:00
var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
2018-08-02 22:04:17 +01:00
.tick_node = &my_tick_node,
2018-11-13 13:08:37 +00:00
.data = GetNode.Data{
.OrNull = GetNode.OrNull{
2018-08-02 22:04:17 +01:00
.ptr = &result,
.or_null = &or_null_node,
or_null_node.data = &queue_node;
// TODO test canceling getOrNull
errdefer {
_ = self.or_null_queue.remove(&or_null_node);
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
2018-08-02 22:04:17 +01:00
const need_dispatch = !self.getters.remove(&queue_node);
if (need_dispatch) {
// oops we made the get_count incorrect for a period of time. fix by dispatching.
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
2018-08-02 22:04:17 +01:00
2018-08-02 22:29:31 +01:00
suspend {
2018-08-02 22:04:17 +01:00
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
2018-08-02 22:04:17 +01:00
return result;
2018-08-01 21:26:37 +01:00
2018-07-25 02:28:54 +01:00
fn dispatch(self: *SelfChannel) void {
2018-07-10 03:22:44 +01:00
// set the "need dispatch" flag
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(u8, &self.need_dispatch, .Xchg, 1, .SeqCst);
2018-07-10 03:22:44 +01:00
lock: while (true) {
// set the lock flag
2019-08-12 03:35:12 +01:00
const prev_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 1, .SeqCst);
2018-07-10 03:22:44 +01:00
if (prev_lock != 0) return;
// clear the need_dispatch flag since we're about to do it
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(u8, &self.need_dispatch, .Xchg, 0, .SeqCst);
2018-07-10 03:22:44 +01:00
while (true) {
one_dispatch: {
// later we correct these extra subtractions
2019-08-12 03:35:12 +01:00
var get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
var put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
2018-07-10 03:22:44 +01:00
// transfer self.buffer to self.getters
while (self.buffer_len != 0) {
if (get_count == 0) break :one_dispatch;
const get_node = &self.getters.get().?.data;
2018-08-02 22:04:17 +01:00
switch (get_node.data) {
GetNode.Data.Normal => |info| {
info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
GetNode.Data.OrNull => |info| {
_ = self.or_null_queue.remove(info.or_null);
info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
2018-07-10 03:22:44 +01:00
self.buffer_len -= 1;
2019-08-12 03:35:12 +01:00
get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
2018-07-10 03:22:44 +01:00
// direct transfer self.putters to self.getters
while (get_count != 0 and put_count != 0) {
const get_node = &self.getters.get().?.data;
const put_node = &self.putters.get().?.data;
2018-08-02 22:04:17 +01:00
switch (get_node.data) {
GetNode.Data.Normal => |info| {
info.ptr.* = put_node.data;
GetNode.Data.OrNull => |info| {
_ = self.or_null_queue.remove(info.or_null);
info.ptr.* = put_node.data;
2018-07-10 03:22:44 +01:00
2019-08-12 03:35:12 +01:00
get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
2018-07-10 03:22:44 +01:00
// transfer self.putters to self.buffer
while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
const put_node = &self.putters.get().?.data;
self.buffer_nodes[self.buffer_index] = put_node.data;
self.buffer_index +%= 1;
self.buffer_len += 1;
2019-08-12 03:35:12 +01:00
put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
2018-07-10 03:22:44 +01:00
// undo the extra subtractions
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
_ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
2018-07-10 03:22:44 +01:00
2018-08-02 22:04:17 +01:00
// All the "get or null" functions should resume now.
var remove_count: usize = 0;
while (self.or_null_queue.get()) |or_null_node| {
remove_count += @boolToInt(self.getters.remove(or_null_node.data));
if (remove_count != 0) {
2019-08-12 03:35:12 +01:00
_ = @atomicRmw(usize, &self.get_count, .Sub, remove_count, .SeqCst);
2018-08-02 22:04:17 +01:00
2018-07-10 03:22:44 +01:00
// clear need-dispatch flag
2019-08-12 03:35:12 +01:00
const need_dispatch = @atomicRmw(u8, &self.need_dispatch, .Xchg, 0, .SeqCst);
2018-07-10 03:22:44 +01:00
if (need_dispatch != 0) continue;
2019-08-12 03:35:12 +01:00
const my_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 0, .SeqCst);
2018-07-10 03:22:44 +01:00
assert(my_lock != 0);
// we have to check again now that we unlocked
2019-08-12 03:35:12 +01:00
if (@atomicLoad(u8, &self.need_dispatch, .SeqCst) != 0) continue :lock;
2018-07-10 03:22:44 +01:00
test "std.event.Channel" {
2019-02-01 22:49:29 +00:00
// https://github.com/ziglang/zig/issues/1908
if (builtin.single_threaded) return error.SkipZigTest;
2019-09-19 18:45:54 +01:00
// https://github.com/ziglang/zig/issues/3251
2019-10-24 06:06:03 +01:00
if (builtin.os == .freebsd) return error.SkipZigTest;
2019-02-01 22:49:29 +00:00
2018-07-10 03:22:44 +01:00
var loop: Loop = undefined;
// TODO make a multi threaded test
2019-08-13 17:44:30 +01:00
try loop.initSingleThreaded(std.heap.direct_allocator);
2018-07-10 03:22:44 +01:00
defer loop.deinit();
const channel = try Channel(i32).create(&loop, 0);
defer channel.destroy();
2019-08-08 21:41:38 +01:00
const handle = async testChannelGetter(&loop, channel);
const putter = async testChannelPutter(channel);
2018-07-10 03:22:44 +01:00
async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
2019-08-12 03:35:12 +01:00
const value1 = channel.get();
2019-02-08 23:18:47 +00:00
testing.expect(value1 == 1234);
2018-07-10 03:22:44 +01:00
2019-08-12 03:35:12 +01:00
const value2 = channel.get();
2019-02-08 23:18:47 +00:00
testing.expect(value2 == 4567);
2018-08-02 22:04:17 +01:00
2019-08-12 03:35:12 +01:00
const value3 = channel.getOrNull();
2019-02-08 23:18:47 +00:00
testing.expect(value3 == null);
2018-08-02 22:04:17 +01:00
2019-08-16 16:27:29 +01:00
var last_put = async testPut(channel, 4444);
2019-08-08 21:41:38 +01:00
const value4 = channel.getOrNull();
2019-02-08 23:18:47 +00:00
testing.expect(value4.? == 4444);
2018-08-02 22:04:17 +01:00
await last_put;
2018-07-10 03:22:44 +01:00
async fn testChannelPutter(channel: *Channel(i32)) void {
2019-08-08 21:41:38 +01:00
2018-07-10 03:22:44 +01:00
2018-08-02 22:04:17 +01:00
async fn testPut(channel: *Channel(i32), value: i32) void {
2019-08-08 21:41:38 +01:00
2018-08-02 22:04:17 +01:00