From ef208fee3cf7eb25ae08e1a896eba58b91e56d50 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 23 Nov 2019 14:04:31 -0600 Subject: [PATCH] Definition fixups & ResetEvent test cases --- lib/std/c.zig | 2 +- lib/std/reset_event.zig | 410 ++++++++++++++++++++++++---------------- 2 files changed, 249 insertions(+), 163 deletions(-) diff --git a/lib/std/c.zig b/lib/std/c.zig index 9e43359852..9e70ff988d 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -220,7 +220,7 @@ pub extern "c" fn pthread_mutex_destroy(mutex: *pthread_mutex_t) c_int; pub const PTHREAD_COND_INITIALIZER = pthread_cond_t{}; pub extern "c" fn pthread_cond_wait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t) c_int; -pub extern "c" fn pthread_cond_timedwait(noalias: cond: *pthread_cond_t, noalias: mutex: *pthread_mutex_t, noalias abstime: *const timespec) c_int; +pub extern "c" fn pthread_cond_timedwait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t, noalias abstime: *const timespec) c_int; pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) c_int; pub extern "c" fn pthread_cond_destroy(cond: *pthread_cond_t) c_int; diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index 35eab6ac13..59399d5e78 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -4,9 +4,10 @@ const testing = std.testing; const assert = std.debug.assert; const Backoff = std.SpinLock.Backoff; const c = std.c; +const os = std.os; const time = std.time; -const linux = std.os.linux; -const windows = std.os.windows; +const linux = os.linux; +const windows = os.windows; /// A resource object which supports blocking until signaled. /// Once finished, the `deinit()` method should be called for correctness. @@ -23,15 +24,15 @@ pub const ResetEvent = struct { } /// Returns whether or not the event is currenetly set - pub fn isSet(self: *const ResetEvent) bool { + pub fn isSet(self: *ResetEvent) bool { return self.os_event.isSet(); } /// Sets the event if not already set and /// wakes up AT LEAST one thread waiting the event. /// Returns whether or not a thread was woken up. - pub fn set(self: *ResetEvent) bool { - return self.os_event.set(); + pub fn set(self: *ResetEvent, auto_reset: bool) bool { + return self.os_event.set(auto_reset); } /// Resets the event to its original, unset state. @@ -73,15 +74,15 @@ const DebugEvent = struct { self.* = undefined; } - pub fn isSet(self: *const DebugEvent) bool { + pub fn isSet(self: *DebugEvent) bool { if (!std.debug.runtime_safety) return true; return self.is_set; } - pub fn set(self: *DebugEvent) bool { + pub fn set(self: *DebugEvent, auto_reset: bool) bool { if (std.debug.runtime_safety) - self.is_set = true; + self.is_set = !auto_reset; return false; } @@ -100,102 +101,87 @@ const DebugEvent = struct { } }; -fn EventState(comptime TagType: type) type { - return enum(TagType) { - Empty, - Waiting, - Signaled, +fn AtomicEvent(comptime FutexImpl: type) type { + return struct { + state: u32, + + const IS_SET: u32 = 1 << 0; + const WAIT_MASK = ~IS_SET; + + pub const Self = @This(); + pub const Futex = FutexImpl; + + pub fn init() Self { + return Self{ .state = 0 }; + } + + pub fn deinit(self: *Self) void { + self.* = undefined; + } + + pub fn isSet(self: *const Self) bool { + const state = @atomicLoad(u32, &self.state, .Acquire); + return (state & IS_SET) != 0; + } + + pub fn reset(self: *Self) bool { + const old_state = @atomicRmw(u32, &self.state, .Xchg, 0, .Monotonic); + return (old_state & IS_SET) != 0; + } + + pub fn set(self: *Self, auto_reset: bool) bool { + const new_state = if (auto_reset) 0 else IS_SET; + const old_state = @atomicRmw(u32, &self.state, .Xchg, new_state, .Release); + if ((old_state & WAIT_MASK) == 0) { + return false; + } + + Futex.wake(&self.state); + return true; + } + + pub fn wait(self: *Self, timeout: ?u64) ResetEvent.WaitError!bool { + var dummy_value: u32 = undefined; + const wait_token = @truncate(u32, @ptrToInt(&dummy_value)); + + var state = @atomicLoad(u32, &self.state, .Monotonic); + while (true) { + if ((state & IS_SET) != 0) + return false; + state = @cmpxchgWeak(u32, &self.state, state, wait_token, .Acquire, .Monotonic) orelse break; + } + + try Futex.wait(&self.state, wait_token, timeout); + return true; + } }; } -const SpinEvent = struct { - state: State, +const SpinEvent = AtomicEvent(struct { + fn wake(ptr: *const u32) void {} - const State = EventState(u8); - - pub fn init() SpinEvent { - return SpinEvent{ .state = .Empty }; - } - - pub fn deinit(self: *SpinEvent) void { - self.* = undefined; - } - - pub fn isSet(self: *const SpinEvent) bool { - return @atomicLoad(State, &self.state, .Acquire) == .Signaled; - } - - pub fn set(self: *SpinEvent) bool { - return @atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) == .Waiting; - } - - pub fn reset(self: *SpinEvent) bool { - return @atomicRmw(State, &self.state, .Xchg, .Empty, .Monotonic) == .Signaled; - } - - pub fn wait(self: *SpinEvent, timeout: ?u64) ResetEvent.WaitError!bool { - var state = @atomicLoad(State, &self.state, .Monotonic); - while (true) { - switch (state) { - .Empty => state = @cmpxchgWeak(State, &self.state, state, .Waiting, .Acquire, .Monotonic) orelse break, - .Waiting => break, - .Signaled => return false, - } - } - - // TODO: handle case for time.Timer.start() fails + fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { + // TODO: handle platforms where time.Timer.start() fails var spin = Backoff.init(); var timer = if (timeout == null) null else time.Timer.start() catch unreachable; - while (@atomicLoad(State, &self.state, .Monotonic) == .Waiting) { + while (@atomicLoad(u32, ptr, .Acquire) == expected) { spin.yield(); if (timeout) |timeout_ns| { if (timer.?.read() > timeout_ns) return ResetEvent.WaitError.TimedOut; } } - return true; } -}; +}); -const LinuxEvent = struct { - state: State, - - const State = EventState(i32); - - pub fn init() LinuxEvent { - return LinuxEvent{ .state = .Empty }; - } - - pub fn deinit(self: *LinuxEvent) void { - self.* = undefined; - } - - pub fn isSet(self: *const LinuxEvent) bool { - return @atomicLoad(State, &self.state, .Acquire) == .Signaled; - } - - pub fn set(self: *LinuxEvent) bool { - if (@atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) != .Waiting) - return false; - const rc = linux.futex_wake(@ptrCast(*const i32, &self.state), linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); +const LinuxEvent = AtomicEvent(struct { + fn wake(ptr: *const u32) void { + const key = @ptrCast(*const i32, ptr); + const rc = linux.futex_wake(key, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); assert(linux.getErrno(rc) == 0); - return true; } - pub fn reset(self: *LinuxEvent) bool { - return @atomicRmw(State, &self.state, .Xchg, .Empty, .Monotonic) == .Signaled; - } - - pub fn wait(self: *LinuxEvent, timeout: ?u64) ResetEvent.WaitError!bool { - var state = @atomicLoad(State, &self.state, .Monotonic); - while (true) { - switch (state) { - .Empty => state = @cmpxchgWeak(State, &self.state, .Empty, .Waiting, .Acquire, .Monotonic) orelse break, - .Waiting => break, - .Signaled => return false, - } - } - + fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { var ts: linux.timespec = undefined; var ts_ptr: ?*linux.timespec = null; if (timeout) |timeout_ns| { @@ -204,28 +190,94 @@ const LinuxEvent = struct { ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); } - while (@atomicLoad(State, &self.state, .Monotonic) == .Waiting) { - const rc = linux.futex_wait(@ptrCast(*const i32, &self.state), linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, @enumToInt(State.Waiting), ts_ptr); + const key = @ptrCast(*const i32, ptr); + const key_expect = @bitCast(i32, expected); + while (@atomicLoad(i32, key, .Acquire) == key_expect) { + const rc = linux.futex_wait(key, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, key_expect, ts_ptr); switch (linux.getErrno(rc)) { - 0, linux.EINTR => continue, - linux.EAGAIN => break, + 0, linux.EAGAIN => break, + linux.EINTR => continue, linux.ETIMEDOUT => return ResetEvent.WaitError.TimedOut, else => unreachable, } } } -}; +}); + +const WindowsEvent = AtomicEvent(struct { + fn wake(ptr: *const u32) void { + if (getEventHandle()) |handle| { + const key = @ptrCast(*const c_void, ptr); + const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == 0); + } + } + + fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { + // fallback to spinlock if NT Keyed Events arent available + const handle = getEventHandle() orelse { + return SpinEvent.Futex.wait(ptr, expected, timeout); + }; + + var timeout_ptr: ?*windows.LARGE_INTEGER = null; + var timeout_value: windows.LARGE_INTEGER = undefined; + if (timeout) |timeout_ns| { + timeout_ptr = &timeout_value; + timeout_value = @intCast(windows.LARGE_INTEGER, @divFloor(timeout_ns, time.millisecond)); + } + + const key = @ptrCast(*const c_void, ptr); + while (@atomicLoad(u32, ptr, .Acquire) == expected) { + const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); + assert(rc == 0); + } + } + + var keyed_state = State.Uninitialized; + var keyed_handle: ?windows.HANDLE = null; + + const State = enum(u8) { + Uninitialized, + Intializing, + Initialized, + }; + + fn getEventHandle() ?windows.HANDLE { + var spin = Backoff.init(); + var state = @atomicLoad(State, &keyed_state, .Monotonic); + + while (true) { + switch (state) { + .Initialized => { + return keyed_handle; + }, + .Intializing => { + spin.yield(); + state = @atomicLoad(State, &keyed_state, .Acquire); + }, + .Uninitialized => state = @cmpxchgWeak(State, &keyed_state, state, .Intializing, .Acquire, .Monotonic) orelse { + var handle: windows.HANDLE = undefined; + const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; + if (windows.ntdll.NtCreateKeyedEvent(&handle, access_mask, null, 0) == 0) + keyed_handle = handle; + @atomicStore(State, &keyed_state, .Initialized, .Release); + return keyed_handle; + }, + } + } + } +}); const PosixEvent = struct { - state: State, + state: u32, cond: c.pthread_cond_t, mutex: c.pthread_mutex_t, - const State = EventState(u8); + const IS_SET: u32 = 1; pub fn init() PosixEvent { return PosixEvent{ - .state = .Empty, + .state = .0, .cond = c.PTHREAD_COND_INITIALIZER, .mutex = c.PTHREAD_MUTEX_INITIALIZER, }; @@ -234,107 +286,141 @@ const PosixEvent = struct { pub fn deinit(self: *PosixEvent) void { // On dragonfly, the destroy functions return EINVAL if they were initialized statically. const retm = c.pthread_mutex_destroy(&self.mutex); - assert(retm == 0 or retm == (if (builtin.os == .dragonfly) std.os.EINVAL else 0)); + assert(retm == 0 or retm == (if (builtin.os == .dragonfly) os.EINVAL else 0)); const retc = c.pthread_cond_destroy(&self.cond); - assert(retc == 0 or retc == (if (builtin.os == .dragonfly) std.os.EINVAL else 0)); - self.* = undefined; + assert(retc == 0 or retc == (if (builtin.os == .dragonfly) os.EINVAL else 0)); } - pub fn isSet(self: *const PosixEvent) bool { + pub fn isSet(self: *PosixEvent) bool { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - return self.state == .Signaled; - } - - pub fn set(self: *PosixEvent) bool { - assert(c.pthread_mutex_lock(&self.mutex) == 0); - defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - - const woken = self.state == .Waiting; - self.state = .Signaled; - return woken; + return self.state == IS_SET; } pub fn reset(self: *PosixEvent) bool { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - const was_set = self.state == .Signaled; - self.state = .Empty; + const was_set = self.state == IS_SET; + self.state = 0; return was_set; } + pub fn set(self: *PosixEvent, auto_reset: bool) bool { + assert(c.pthread_mutex_lock(&self.mutex) == 0); + defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); + + const had_waiter = self.state > IS_SET; + self.state = if (auto_reset) 0 else IS_SET; + if (had_waiter) { + assert(c.pthread_cond_signal(&self.cond) == 0); + } + return had_waiter; + } + pub fn wait(self: *PosixEvent, timeout: ?u64) ResetEvent.WaitError!bool { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - if (self.state == .Signaled) + if (self.state == IS_SET) return false; - var ts: std.os.timespec = undefined; + var ts: os.timespec = undefined; var ts_ptr = &ts; if (timeout) |timeout_ns| { - var tv: std.os.timeval = undefined; + var tv: os.timeval = undefined; assert(c.gettimeofday(&tv, null) == 0); - ts.tv_sec = @intCast(isize, tv.tv_sec + (timeout_ns / time.ns_per_s)); - ts.tv_nsec = @intCast(isize, (tv.tv_usec * time.microsecond) + (timeout_ns % time.ns_per_s)); + ts.tv_sec = tv.tv_sec + @intCast(isize, timeout_ns / time.ns_per_s); + ts.tv_nsec = (tv.tv_usec * time.microsecond) + @intCast(isize, timeout_ns % time.ns_per_s); } - self.state = .Waiting; - while (self.state == .Waiting) { + var dummy_value: u32 = undefined; + var wait_token = @truncate(u32, @ptrToInt(&dummy_value)); + self.state = wait_token; + + while (self.state == wait_token) { const rc = switch (timeout == null) { true => c.pthread_cond_wait(&self.cond, &self.mutex), else => c.pthread_cond_timedwait(&self.cond, &self.mutex, ts_ptr), }; - assert(rc == 0); - } - } -}; - -const WindowsEvent = struct { - state: State, - - const State = EventState(u32); - - pub fn init() WindowsEvent { - return WindowsEvent{ .state = .Empty }; - } - - pub fn deinit(self: *WindowsEvent) void { - self.* = undefined; - } - - pub fn isSet(self: *const WindowsEvent) bool { - return @atomicLoad(State, &self.state, .Acquire) == .Signaled; - } - - pub fn set(self: *WindowsEvent) bool { - if (@atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) != .Waiting) - return false; - - if (getEventHandle()) |handle| { - const key = @ptrCast(*const c_void, &self.state); - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == 0); + // TODO: rc appears to be the positive error code making os.errno() always return 0 on linux + switch (std.math.max(@as(c_int, os.errno(rc)), rc)) { + 0 => {}, + os.ETIMEDOUT => return ResetEvent.WaitError.TimedOut, + os.EINVAL => unreachable, + os.EPERM => unreachable, + else => unreachable, + } } return true; } +}; - pub fn reset(self: *WindowsEvent) bool { - return @atomicRmw(State, &self.state, .Xchg, .Empty, .Monotonic) == .Signaled; - } +test "std.ResetEvent" { + // TODO + if (builtin.single_threaded) + return error.SkipZigTest; - pub fn wait(self: *WindowsEvent, timeout: ?u64) ResetEvent.WaitError!bool { - var state = @atomicLoad(State, &self.state, .Monotonic); - while (true) { - switch (state) { - .Empty => state = @cmpxchgWeak(State, &self.state, .Empty, .Waiting, .Acquire, .Monotonic) orelse break, - .Waiting => break, - .Signaled => return false, - } + var event = ResetEvent.init(); + defer event.deinit(); + + // test event setting + testing.expect(event.isSet() == false); + testing.expect(event.set(false) == false); + testing.expect(event.isSet() == true); + + // test event resetting + testing.expect(event.reset() == true); + testing.expect(event.isSet() == false); + testing.expect(event.reset() == false); + + // test waiting timeout + const delay = 100 * time.millisecond; + var timer = time.Timer.start() catch unreachable; + testing.expectError(ResetEvent.WaitError.TimedOut, event.wait(delay)); + const elapsed = timer.read(); + testing.expect(elapsed >= delay and elapsed < delay * 2); + + // test cross thread signaling + const Context = struct { + event: ResetEvent, + value: u128, + + fn receiver(self: *@This()) void { + // wait for the sender to notify us with updated value + assert(self.value == 0); + assert((self.event.wait(1 * time.second) catch unreachable) == true); + assert(self.value == 1); + + // wait for sender to sleep, then notify it of new value + time.sleep(50 * time.millisecond); + self.value = 2; + assert(self.event.set(false) == true); } - const timeout_ms = if (timeout @intCast(windows.LARGE_INTEGER, ) - } -}; + fn sender(self: *@This()) !void { + // wait for the receiver() to start wait()'ing + time.sleep(50 * time.millisecond); + + // update value to 1 and notify the receiver() + assert(self.value == 0); + self.value = 1; + assert(self.event.set(true) == true); + + // wait for the receiver to update the value & notify us + assert((try self.event.wait(1 * time.second)) == true); + assert(self.value == 2); + } + }; + + _ = event.reset(); + var context = Context{ + .event = event, + .value = 0, + }; + + var receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + try context.sender(); +} \ No newline at end of file