diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig index 71188054f3..26f8e29dac 100644 --- a/lib/std/mutex.zig +++ b/lib/std/mutex.zig @@ -1,12 +1,13 @@ const std = @import("std.zig"); const builtin = @import("builtin"); +const os = std.os; const testing = std.testing; +const SpinLock = std.SpinLock; const ResetEvent = std.ResetEvent; /// Lock may be held only once. If the same thread /// tries to acquire the same mutex twice, it deadlocks. -/// This type supports static initialization and is based off of Webkit's WTF Lock (via rust parking_lot) -/// https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs +/// This type supports static initialization and is at most `@sizeOf(usize)` in size. /// When an application is built in single threaded release mode, all the functions are /// no-ops. In single threaded debug mode, there is deadlock detection. pub const Mutex = if (builtin.single_threaded) @@ -24,35 +25,114 @@ pub const Mutex = if (builtin.single_threaded) } } }; + pub fn init() Mutex { return Mutex{ .lock = lock_init }; } - pub fn deinit(self: *Mutex) void {} - pub fn acquire(self: *Mutex) Held { - if (std.debug.runtime_safety and self.lock) { - @panic("deadlock detected"); + pub fn deinit(self: *Mutex) void { + self.* = undefined; + } + + pub fn tryAcquire(self: *Mutex) ?Held { + if (std.debug.runtime_safety) { + if (self.lock) return null; + self.lock = true; } return Held{ .mutex = self }; } + + pub fn acquire(self: *Mutex) Held { + return self.tryAcquire() orelse @panic("deadlock detected"); + } } -else +else if (builtin.os == .windows) + // https://locklessinc.com/articles/keyed_events/ + extern union { + locked: u8, + waiters: u32, + + const WAKE = 1 << 8; + const WAIT = 1 << 9; + + pub fn init() Mutex { + return Mutex{ .waiters = 0 }; + } + + pub fn deinit(self: *Mutex) void { + self.* = undefined; + } + + pub fn tryAcquire(self: *Mutex) ?Held { + if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) != 0) + return null; + return Held{ .mutex = self }; + } + + pub fn acquire(self: *Mutex) Held { + return self.tryAcquire() orelse self.acquireSlow(); + } + + fn acquireSlow(self: *Mutex) Held { + @setCold(true); + while (true) : (SpinLock.loopHint(1)) { + const waiters = @atomicLoad(u32, &self.waiters, .Monotonic); + + // try and take lock if unlocked + if ((waiters & 1) == 0) { + if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) == 0) + return Held{ .mutex = self }; + + // otherwise, try and update the waiting count. + // then unset the WAKE bit so that another unlocker can wake up a thread. + } else if (@cmpxchgWeak(u32, &self.waiters, waiters, (waiters + WAIT) | 1, .Monotonic, .Monotonic) == null) { + ResetEvent.OsEvent.Futex.wait(@ptrCast(*i32, &self.waiters), undefined, null) catch unreachable; + _ = @atomicRmw(u32, &self.waiters, .Sub, WAKE, .Monotonic); + } + } + } + + pub const Held = struct { + mutex: *Mutex, + + pub fn release(self: Held) void { + // unlock without a rmw/cmpxchg instruction + @atomicStore(u8, @ptrCast(*u8, &self.mutex.locked), 0, .Release); + + while (true) : (SpinLock.loopHint(1)) { + const waiters = @atomicLoad(u32, &self.mutex.waiters, .Monotonic); + + // no one is waiting + if (waiters < WAIT) return; + // someone grabbed the lock and will do the wake instead + if (waiters & 1 != 0) return; + // someone else is currently waking up + if (waiters & WAKE != 0) return; + + // try to decrease the waiter count & set the WAKE bit meaning a thread is waking up + if (@cmpxchgWeak(u32, &self.mutex.waiters, waiters, waiters - WAIT + WAKE, .Release, .Monotonic) == null) + return ResetEvent.OsEvent.Futex.wake(@ptrCast(*i32, &self.mutex.waiters)); + } + } + }; + } +else if (builtin.link_libc or builtin.os == .linux) + // stack-based version of https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs struct { state: usize, + /// number of times to spin trying to acquire the lock. + /// https://webkit.org/blog/6161/locking-in-webkit/ + const SPIN_COUNT = 40; + const MUTEX_LOCK: usize = 1 << 0; const QUEUE_LOCK: usize = 1 << 1; const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK); - const QueueNode = std.atomic.Stack(ResetEvent).Node; - /// number of iterations to spin yielding the cpu - const SPIN_CPU = 4; - - /// number of iterations to spin in the cpu yield loop - const SPIN_CPU_COUNT = 30; - - /// number of iterations to spin yielding the thread - const SPIN_THREAD = 1; + const Node = struct { + next: ?*Node, + event: ResetEvent, + }; pub fn init() Mutex { return Mutex{ .state = 0 }; @@ -62,98 +142,112 @@ else self.* = undefined; } + pub fn tryAcquire(self: *Mutex) ?Held { + if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic) != null) + return null; + return Held{ .mutex = self }; + } + + pub fn acquire(self: *Mutex) Held { + return self.tryAcquire() orelse { + self.acquireSlow(); + return Held{ .mutex = self }; + }; + } + + fn acquireSlow(self: *Mutex) void { + // inlining the fast path and hiding *Slow() + // calls behind a @setCold(true) appears to + // improve performance in release builds. + @setCold(true); + while (true) { + + // try and spin for a bit to acquire the mutex if theres currently no queue + var spin_count: u32 = SPIN_COUNT; + var state = @atomicLoad(usize, &self.state, .Monotonic); + while (spin_count != 0) : (spin_count -= 1) { + if (state & MUTEX_LOCK == 0) { + _ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; + } else if (state & QUEUE_MASK == 0) { + break; + } + SpinLock.yield(); + state = @atomicLoad(usize, &self.state, .Monotonic); + } + + // create the ResetEvent node on the stack + // (faster than threadlocal on platforms like OSX) + var node: Node = undefined; + node.event = ResetEvent.init(); + defer node.event.deinit(); + + // we've spun too long, try and add our node to the LIFO queue. + // if the mutex becomes available in the process, try and grab it instead. + while (true) { + if (state & MUTEX_LOCK == 0) { + _ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; + } else { + node.next = @intToPtr(?*Node, state & QUEUE_MASK); + const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK); + _ = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { + node.event.wait(); + break; + }; + } + SpinLock.yield(); + state = @atomicLoad(usize, &self.state, .Monotonic); + } + } + } + pub const Held = struct { mutex: *Mutex, pub fn release(self: Held) void { - // since MUTEX_LOCK is the first bit, we can use (.Sub) instead of (.And, ~MUTEX_LOCK). - // this is because .Sub may be implemented more efficiently than the latter - // (e.g. `lock xadd` vs `cmpxchg` loop on x86) + // first, remove the lock bit so another possibly parallel acquire() can succeed. + // use .Sub since it can be usually compiled down more efficiency + // (`lock sub` on x86) vs .And ~MUTEX_LOCK (`lock cmpxchg` loop on x86) const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release); - if ((state & QUEUE_MASK) != 0 and (state & QUEUE_LOCK) == 0) { - self.mutex.releaseSlow(state); - } + + // if the LIFO queue isnt locked and it has a node, try and wake up the node. + if ((state & QUEUE_LOCK) == 0 and (state & QUEUE_MASK) != 0) + self.mutex.releaseSlow(); } }; - pub fn acquire(self: *Mutex) Held { - // fast path close to SpinLock fast path - if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic)) |current_state| { - self.acquireSlow(current_state); - } - return Held{ .mutex = self }; - } - - fn acquireSlow(self: *Mutex, current_state: usize) void { - var spin: usize = 0; - var state = current_state; - while (true) { - - // try and acquire the lock if unlocked - if ((state & MUTEX_LOCK) == 0) { - state = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; - continue; - } - - // spin only if the waiting queue isn't empty and when it hasn't spun too much already - if ((state & QUEUE_MASK) == 0 and spin < SPIN_CPU + SPIN_THREAD) { - if (spin < SPIN_CPU) { - std.SpinLock.yield(SPIN_CPU_COUNT); - } else { - std.os.sched_yield() catch std.time.sleep(0); - } - state = @atomicLoad(usize, &self.state, .Monotonic); - continue; - } - - // thread should block, try and add this event to the waiting queue - var node = QueueNode{ - .next = @intToPtr(?*QueueNode, state & QUEUE_MASK), - .data = ResetEvent.init(), - }; - defer node.data.deinit(); - const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK); - state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { - // node is in the queue, wait until a `held.release()` wakes us up. - _ = node.data.wait(null) catch unreachable; - spin = 0; - state = @atomicLoad(usize, &self.state, .Monotonic); - continue; - }; - } - } - - fn releaseSlow(self: *Mutex, current_state: usize) void { - // grab the QUEUE_LOCK in order to signal a waiting queue node's event. - var state = current_state; - while (true) { - if ((state & QUEUE_LOCK) != 0 or (state & QUEUE_MASK) == 0) + fn releaseSlow(self: *Mutex) void { + @setCold(true); + + // try and lock the LFIO queue to pop a node off, + // stopping altogether if its already locked or the queue is empty + var state = @atomicLoad(usize, &self.state, .Monotonic); + while (true) : (SpinLock.loopHint(1)) { + if (state & QUEUE_LOCK != 0 or state & QUEUE_MASK == 0) return; state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break; } - while (true) { - // barrier needed to observe incoming state changes - defer @fence(.Acquire); - - // the mutex is currently locked. try to unset the QUEUE_LOCK and let the locker wake up the next node. - // avoids waking up multiple sleeping threads which try to acquire the lock again which increases contention. + // acquired the QUEUE_LOCK, try and pop a node to wake it. + // if the mutex is locked, then unset QUEUE_LOCK and let + // the thread who holds the mutex do the wake-up on unlock() + while (true) : (SpinLock.loopHint(1)) { if ((state & MUTEX_LOCK) != 0) { - state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Monotonic) orelse return; - continue; + state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Acquire) orelse return; + } else { + const node = @intToPtr(*Node, state & QUEUE_MASK); + const new_state = @ptrToInt(node.next); + state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Acquire) orelse { + node.event.set(); + return; + }; } - - // try to pop the top node on the waiting queue stack to wake it up - // while at the same time unsetting the QUEUE_LOCK. - const node = @intToPtr(*QueueNode, state & QUEUE_MASK); - const new_state = @ptrToInt(node.next) | (state & MUTEX_LOCK); - state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { - _ = node.data.set(false); - return; - }; } } - }; + } + +// for platforms without a known OS blocking +// primitive, default to SpinLock for correctness +else SpinLock; const TestContext = struct { mutex: *Mutex, diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index 52235d4327..30e90641a2 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -1,8 +1,8 @@ const std = @import("std.zig"); const builtin = @import("builtin"); const testing = std.testing; +const SpinLock = std.SpinLock; const assert = std.debug.assert; -const Backoff = std.SpinLock.Backoff; const c = std.c; const os = std.os; const time = std.time; @@ -14,13 +14,20 @@ const windows = os.windows; pub const ResetEvent = struct { os_event: OsEvent, + pub const OsEvent = + if (builtin.single_threaded) + DebugEvent + else if (builtin.link_libc and builtin.os != .windows and builtin.os != .linux) + PosixEvent + else + AtomicEvent; + pub fn init() ResetEvent { return ResetEvent{ .os_event = OsEvent.init() }; } pub fn deinit(self: *ResetEvent) void { self.os_event.deinit(); - self.* = undefined; } /// Returns whether or not the event is currenetly set @@ -29,308 +36,116 @@ pub const ResetEvent = struct { } /// Sets the event if not already set and - /// wakes up AT LEAST one thread waiting the event. - /// Returns whether or not a thread was woken up. - pub fn set(self: *ResetEvent, auto_reset: bool) bool { - return self.os_event.set(auto_reset); + /// wakes up at least one thread waiting the event. + pub fn set(self: *ResetEvent) void { + return self.os_event.set(); } /// Resets the event to its original, unset state. - /// Returns whether or not the event was currently set before un-setting. - pub fn reset(self: *ResetEvent) bool { + pub fn reset(self: *ResetEvent) void { return self.os_event.reset(); } - const WaitError = error{ - /// The thread blocked longer than the maximum time specified. - TimedOut, - }; + /// Wait for the event to be set by blocking the current thread. + pub fn wait(self: *ResetEvent) void { + return self.os_event.wait(null) catch unreachable; + } /// Wait for the event to be set by blocking the current thread. - /// Optionally provided timeout in nanoseconds which throws an - /// `error.TimedOut` if the thread blocked AT LEAST longer than specified. - /// Returns whether or not the thread blocked from the event being unset at the time of calling. - pub fn wait(self: *ResetEvent, timeout_ns: ?u64) WaitError!bool { + /// A timeout in nanoseconds can be provided as a hint for how + /// long the thread should block on the unset event before throwind error.TimedOut. + pub fn timedWait(self: *ResetEvent, timeout_ns: u64) !void { return self.os_event.wait(timeout_ns); } }; -const OsEvent = if (builtin.single_threaded) DebugEvent else switch (builtin.os) { - .windows => WindowsEvent, - .linux => if (builtin.link_libc) PosixEvent else LinuxEvent, - else => if (builtin.link_libc) PosixEvent else SpinEvent, -}; - const DebugEvent = struct { - is_set: @TypeOf(set_init), + is_set: bool, - const set_init = if (std.debug.runtime_safety) false else {}; - - pub fn init() DebugEvent { - return DebugEvent{ .is_set = set_init }; + fn init() DebugEvent { + return DebugEvent{ .is_set = false }; } - pub fn deinit(self: *DebugEvent) void { + fn deinit(self: *DebugEvent) void { self.* = undefined; } - pub fn isSet(self: *DebugEvent) bool { - if (!std.debug.runtime_safety) - return true; + fn isSet(self: *DebugEvent) bool { return self.is_set; } - pub fn set(self: *DebugEvent, auto_reset: bool) bool { - if (std.debug.runtime_safety) - self.is_set = !auto_reset; - return false; - } - - pub fn reset(self: *DebugEvent) bool { - if (!std.debug.runtime_safety) - return false; - const was_set = self.is_set; + fn reset(self: *DebugEvent) void { self.is_set = false; - return was_set; } - pub fn wait(self: *DebugEvent, timeout: ?u64) ResetEvent.WaitError!bool { - if (std.debug.runtime_safety and !self.is_set) - @panic("deadlock detected"); - return ResetEvent.WaitError.TimedOut; + fn set(self: *DebugEvent) void { + self.is_set = true; + } + + fn wait(self: *DebugEvent, timeout: ?u64) !void { + if (self.is_set) + return; + if (timeout != null) + return error.TimedOut; + @panic("deadlock detected"); } }; -fn AtomicEvent(comptime FutexImpl: type) type { - return struct { - state: u32, - - const IS_SET: u32 = 1 << 0; - const WAIT_MASK = ~IS_SET; - - pub const Self = @This(); - pub const Futex = FutexImpl; - - pub fn init() Self { - return Self{ .state = 0 }; - } - - pub fn deinit(self: *Self) void { - self.* = undefined; - } - - pub fn isSet(self: *const Self) bool { - const state = @atomicLoad(u32, &self.state, .Acquire); - return (state & IS_SET) != 0; - } - - pub fn reset(self: *Self) bool { - const old_state = @atomicRmw(u32, &self.state, .Xchg, 0, .Monotonic); - return (old_state & IS_SET) != 0; - } - - pub fn set(self: *Self, auto_reset: bool) bool { - const new_state = if (auto_reset) 0 else IS_SET; - const old_state = @atomicRmw(u32, &self.state, .Xchg, new_state, .Release); - if ((old_state & WAIT_MASK) == 0) { - return false; - } - - Futex.wake(&self.state); - return true; - } - - pub fn wait(self: *Self, timeout: ?u64) ResetEvent.WaitError!bool { - var dummy_value: u32 = undefined; - const wait_token = @truncate(u32, @ptrToInt(&dummy_value)); - - var state = @atomicLoad(u32, &self.state, .Monotonic); - while (true) { - if ((state & IS_SET) != 0) - return false; - state = @cmpxchgWeak(u32, &self.state, state, wait_token, .Acquire, .Monotonic) orelse break; - } - - try Futex.wait(&self.state, wait_token, timeout); - return true; - } - }; -} - -const SpinEvent = AtomicEvent(struct { - fn wake(ptr: *const u32) void {} - - fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { - // TODO: handle platforms where time.Timer.start() fails - var spin = Backoff.init(); - var timer = if (timeout == null) null else time.Timer.start() catch unreachable; - while (@atomicLoad(u32, ptr, .Acquire) == expected) { - spin.yield(); - if (timeout) |timeout_ns| { - if (timer.?.read() > timeout_ns) - return ResetEvent.WaitError.TimedOut; - } - } - } -}); - -const LinuxEvent = AtomicEvent(struct { - fn wake(ptr: *const u32) void { - const key = @ptrCast(*const i32, ptr); - const rc = linux.futex_wake(key, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); - assert(linux.getErrno(rc) == 0); - } - - fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { - var ts: linux.timespec = undefined; - var ts_ptr: ?*linux.timespec = null; - if (timeout) |timeout_ns| { - ts_ptr = &ts; - ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); - ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); - } - - const key = @ptrCast(*const i32, ptr); - const key_expect = @bitCast(i32, expected); - while (@atomicLoad(i32, key, .Acquire) == key_expect) { - const rc = linux.futex_wait(key, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, key_expect, ts_ptr); - switch (linux.getErrno(rc)) { - 0, linux.EAGAIN => break, - linux.EINTR => continue, - linux.ETIMEDOUT => return ResetEvent.WaitError.TimedOut, - else => unreachable, - } - } - } -}); - -const WindowsEvent = AtomicEvent(struct { - fn wake(ptr: *const u32) void { - if (getEventHandle()) |handle| { - const key = @ptrCast(*const c_void, ptr); - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == 0); - } - } - - fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { - // fallback to spinlock if NT Keyed Events arent available - const handle = getEventHandle() orelse { - return SpinEvent.Futex.wait(ptr, expected, timeout); - }; - - // NT uses timeouts in units of 100ns with negative value being relative - var timeout_ptr: ?*windows.LARGE_INTEGER = null; - var timeout_value: windows.LARGE_INTEGER = undefined; - if (timeout) |timeout_ns| { - timeout_ptr = &timeout_value; - timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); - } - - // NtWaitForKeyedEvent doesnt have spurious wake-ups - if (@atomicLoad(u32, ptr, .Acquire) == expected) { - const key = @ptrCast(*const c_void, ptr); - const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); - switch (rc) { - 0 => {}, - windows.WAIT_TIMEOUT => return ResetEvent.WaitError.TimedOut, - else => unreachable, - } - } - } - - var keyed_state = State.Uninitialized; - var keyed_handle: ?windows.HANDLE = null; - - const State = enum(u8) { - Uninitialized, - Intializing, - Initialized, - }; - - fn getEventHandle() ?windows.HANDLE { - var spin = Backoff.init(); - var state = @atomicLoad(State, &keyed_state, .Monotonic); - - while (true) { - switch (state) { - .Initialized => { - return keyed_handle; - }, - .Intializing => { - spin.yield(); - state = @atomicLoad(State, &keyed_state, .Acquire); - }, - .Uninitialized => state = @cmpxchgWeak(State, &keyed_state, state, .Intializing, .Acquire, .Monotonic) orelse { - var handle: windows.HANDLE = undefined; - const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; - if (windows.ntdll.NtCreateKeyedEvent(&handle, access_mask, null, 0) == 0) - keyed_handle = handle; - @atomicStore(State, &keyed_state, .Initialized, .Release); - return keyed_handle; - }, - } - } - } -}); - const PosixEvent = struct { - state: u32, + is_set: bool, cond: c.pthread_cond_t, mutex: c.pthread_mutex_t, - const IS_SET: u32 = 1; - - pub fn init() PosixEvent { + fn init() PosixEvent { return PosixEvent{ - .state = 0, + .is_set = false, .cond = c.PTHREAD_COND_INITIALIZER, .mutex = c.PTHREAD_MUTEX_INITIALIZER, }; } - pub fn deinit(self: *PosixEvent) void { - // On dragonfly, the destroy functions return EINVAL if they were initialized statically. + fn deinit(self: *PosixEvent) void { + // on dragonfly, *destroy() functions can return EINVAL + // for statically initialized pthread structures + const err = if (builtin.os == .dragonfly) os.EINVAL else 0; + const retm = c.pthread_mutex_destroy(&self.mutex); - assert(retm == 0 or retm == (if (builtin.os == .dragonfly) os.EINVAL else 0)); + assert(retm == 0 or retm == err); const retc = c.pthread_cond_destroy(&self.cond); - assert(retc == 0 or retc == (if (builtin.os == .dragonfly) os.EINVAL else 0)); + assert(retc == 0 or retc == err); } - pub fn isSet(self: *PosixEvent) bool { + fn isSet(self: *PosixEvent) bool { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - return self.state == IS_SET; + return self.is_set; } - pub fn reset(self: *PosixEvent) bool { + fn reset(self: *PosixEvent) void { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - const was_set = self.state == IS_SET; - self.state = 0; - return was_set; + self.is_set = false; } - pub fn set(self: *PosixEvent, auto_reset: bool) bool { + fn set(self: *PosixEvent) void { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - const had_waiter = self.state > IS_SET; - self.state = if (auto_reset) 0 else IS_SET; - if (had_waiter) { + if (!self.is_set) { + self.is_set = true; assert(c.pthread_cond_signal(&self.cond) == 0); } - return had_waiter; } - pub fn wait(self: *PosixEvent, timeout: ?u64) ResetEvent.WaitError!bool { + fn wait(self: *PosixEvent, timeout: ?u64) !void { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - if (self.state == IS_SET) - return false; + // quick guard before possibly calling time syscalls below + if (self.is_set) + return; var ts: os.timespec = undefined; if (timeout) |timeout_ns| { @@ -349,85 +164,248 @@ const PosixEvent = struct { ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.second)); } - var dummy_value: u32 = undefined; - var wait_token = @truncate(u32, @ptrToInt(&dummy_value)); - self.state = wait_token; - - while (self.state == wait_token) { + while (!self.is_set) { const rc = switch (timeout == null) { true => c.pthread_cond_wait(&self.cond, &self.mutex), else => c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts), }; - // TODO: rc appears to be the positive error code making os.errno() always return 0 on linux - switch (std.math.max(@as(c_int, os.errno(rc)), rc)) { + switch (rc) { 0 => {}, - os.ETIMEDOUT => return ResetEvent.WaitError.TimedOut, + os.ETIMEDOUT => return error.TimedOut, os.EINVAL => unreachable, os.EPERM => unreachable, else => unreachable, } } - return true; } }; -test "std.ResetEvent" { - // TODO - if (builtin.single_threaded) - return error.SkipZigTest; +const AtomicEvent = struct { + state: State, + const State = enum(i32) { + Empty, + Waiting, + Signaled, + }; + + fn init() AtomicEvent { + return AtomicEvent{ .state = .Empty }; + } + + fn deinit(self: *AtomicEvent) void { + self.* = undefined; + } + + fn isSet(self: *AtomicEvent) bool { + return @atomicLoad(State, &self.state, .Acquire) == .Signaled; + } + + fn reset(self: *AtomicEvent) void { + @atomicStore(State, &self.state, .Empty, .Monotonic); + } + + fn set(self: *AtomicEvent) void { + if (@atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) == .Waiting) + Futex.wake(@ptrCast(*i32, &self.state)); + } + + fn wait(self: *AtomicEvent, timeout: ?u64) !void { + var state = @atomicLoad(State, &self.state, .Monotonic); + while (state == .Empty) { + state = @cmpxchgWeak(State, &self.state, .Empty, .Waiting, .Acquire, .Monotonic) orelse + return Futex.wait(@ptrCast(*i32, &self.state), @enumToInt(State.Waiting), timeout); + } + } + + pub const Futex = switch (builtin.os) { + .windows => WindowsFutex, + .linux => LinuxFutex, + else => SpinFutex, + }; + + const SpinFutex = struct { + fn wake(ptr: *i32) void {} + + fn wait(ptr: *i32, expected: i32, timeout: ?u64) !void { + // TODO: handle platforms where a monotonic timer isnt available + var timer: time.Timer = undefined; + if (timeout != null) + timer = time.Timer.start() catch unreachable; + + while (@atomicLoad(i32, ptr, .Acquire) == expected) { + SpinLock.yield(); + if (timeout) |timeout_ns| { + if (timer.read() >= timeout_ns) + return error.TimedOut; + } + } + } + }; + + const LinuxFutex = struct { + fn wake(ptr: *i32) void { + const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); + assert(linux.getErrno(rc) == 0); + } + + fn wait(ptr: *i32, expected: i32, timeout: ?u64) !void { + var ts: linux.timespec = undefined; + var ts_ptr: ?*linux.timespec = null; + if (timeout) |timeout_ns| { + ts_ptr = &ts; + ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); + ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); + } + + while (@atomicLoad(i32, ptr, .Acquire) == expected) { + const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr); + switch (linux.getErrno(rc)) { + 0 => continue, + os.ETIMEDOUT => return error.TimedOut, + os.EINTR => continue, + os.EAGAIN => return, + else => unreachable, + } + } + } + }; + + const WindowsFutex = struct { + pub fn wake(ptr: *i32) void { + const handle = getEventHandle() orelse return SpinFutex.wake(ptr); + const key = @ptrCast(*const c_void, ptr); + const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == 0); + } + + pub fn wait(ptr: *i32, expected: i32, timeout: ?u64) !void { + const handle = getEventHandle() orelse return SpinFutex.wait(ptr, expected, timeout); + + // NT uses timeouts in units of 100ns with negative value being relative + var timeout_ptr: ?*windows.LARGE_INTEGER = null; + var timeout_value: windows.LARGE_INTEGER = undefined; + if (timeout) |timeout_ns| { + timeout_ptr = &timeout_value; + timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); + } + + // NtWaitForKeyedEvent doesnt have spurious wake-ups + const key = @ptrCast(*const c_void, ptr); + const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); + switch (rc) { + windows.WAIT_TIMEOUT => return error.TimedOut, + windows.WAIT_OBJECT_0 => {}, + else => unreachable, + } + } + + var event_handle: usize = EMPTY; + const EMPTY = ~@as(usize, 0); + const LOADING = EMPTY - 1; + + pub fn getEventHandle() ?windows.HANDLE { + var handle = @atomicLoad(usize, &event_handle, .Monotonic); + while (true) { + switch (handle) { + EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { + const handle_ptr = @ptrCast(*windows.HANDLE, &handle); + const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; + if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != 0) + handle = 0; + @atomicStore(usize, &event_handle, handle, .Monotonic); + return @intToPtr(?windows.HANDLE, handle); + }, + LOADING => { + SpinLock.yield(); + handle = @atomicLoad(usize, &event_handle, .Monotonic); + }, + else => { + return @intToPtr(?windows.HANDLE, handle); + }, + } + } + } + }; +}; + +test "std.ResetEvent" { var event = ResetEvent.init(); defer event.deinit(); // test event setting testing.expect(event.isSet() == false); - testing.expect(event.set(false) == false); + event.set(); testing.expect(event.isSet() == true); // test event resetting - testing.expect(event.reset() == true); + event.reset(); testing.expect(event.isSet() == false); - testing.expect(event.reset() == false); - // test cross thread signaling + // test event waiting (non-blocking) + event.set(); + event.wait(); + try event.timedWait(1); + + // test cross-thread signaling + if (builtin.single_threaded) + return; + const Context = struct { - event: ResetEvent, + const Self = @This(); + value: u128, + in: ResetEvent, + out: ResetEvent, - fn receiver(self: *@This()) void { - // wait for the sender to notify us with updated value - assert(self.value == 0); - assert((self.event.wait(1 * time.second) catch unreachable) == true); - assert(self.value == 1); - - // wait for sender to sleep, then notify it of new value - time.sleep(50 * time.millisecond); - self.value = 2; - assert(self.event.set(false) == true); + fn init() Self { + return Self{ + .value = 0, + .in = ResetEvent.init(), + .out = ResetEvent.init(), + }; } - fn sender(self: *@This()) !void { - // wait for the receiver() to start wait()'ing - time.sleep(50 * time.millisecond); + fn deinit(self: *Self) void { + self.in.deinit(); + self.out.deinit(); + self.* = undefined; + } - // update value to 1 and notify the receiver() - assert(self.value == 0); + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); self.value = 1; - assert(self.event.set(true) == true); + self.in.set(); - // wait for the receiver to update the value & notify us - assert((try self.event.wait(1 * time.second)) == true); - assert(self.value == 2); + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); } }; - _ = event.reset(); - var context = Context{ - .event = event, - .value = 0, - }; - - var receiver = try std.Thread.spawn(&context, Context.receiver); + var context = Context.init(); + defer context.deinit(); + const receiver = try std.Thread.spawn(&context, Context.receiver); defer receiver.wait(); - try context.sender(); + context.sender(); } diff --git a/lib/std/spinlock.zig b/lib/std/spinlock.zig index bd811f709c..4efd244367 100644 --- a/lib/std/spinlock.zig +++ b/lib/std/spinlock.zig @@ -1,69 +1,77 @@ const std = @import("std.zig"); const builtin = @import("builtin"); -const assert = std.debug.assert; -const time = std.time; -const os = std.os; pub const SpinLock = struct { - lock: u8, // TODO use a bool or enum + state: State, + + const State = enum(u8) { + Unlocked, + Locked, + }; pub const Held = struct { spinlock: *SpinLock, pub fn release(self: Held) void { - @atomicStore(u8, &self.spinlock.lock, 0, .Release); + @atomicStore(State, &self.spinlock.state, .Unlocked, .Release); } }; pub fn init() SpinLock { - return SpinLock{ .lock = 0 }; + return SpinLock{ .state = .Unlocked }; + } + + pub fn deinit(self: *SpinLock) void { + self.* = undefined; + } + + pub fn tryAcquire(self: *SpinLock) ?Held { + return switch (@atomicRmw(State, &self.state, .Xchg, .Locked, .Acquire)) { + .Unlocked => Held{ .spinlock = self }, + .Locked => null, + }; } pub fn acquire(self: *SpinLock) Held { - var backoff = Backoff.init(); - while (@atomicRmw(u8, &self.lock, .Xchg, 1, .Acquire) != 0) - backoff.yield(); - return Held{ .spinlock = self }; + while (true) { + return self.tryAcquire() orelse { + yield(); + continue; + }; + } } - pub fn yield(iterations: usize) void { + pub fn yield() void { + // On native windows, SwitchToThread is too expensive, + // and yielding for 380-410 iterations was found to be + // a nice sweet spot. Posix systems on the other hand, + // especially linux, perform better by yielding the thread. + switch (builtin.os) { + .windows => loopHint(400), + else => std.os.sched_yield() catch loopHint(1), + } + } + + /// Hint to the cpu that execution is spinning + /// for the given amount of iterations. + pub fn loopHint(iterations: usize) void { var i = iterations; while (i != 0) : (i -= 1) { switch (builtin.arch) { - .i386, .x86_64 => asm volatile ("pause"), - .arm, .aarch64 => asm volatile ("yield"), - else => time.sleep(0), + // these instructions use a memory clobber as they + // flush the pipeline of any speculated reads/writes. + .i386, .x86_64 => asm volatile ("pause" ::: "memory"), + .arm, .aarch64 => asm volatile ("yield" ::: "memory"), + else => std.os.sched_yield() catch {}, } } } - - /// Provides a method to incrementally yield longer each time its called. - pub const Backoff = struct { - iteration: usize, - - pub fn init() @This() { - return @This(){ .iteration = 0 }; - } - - /// Modified hybrid yielding from - /// http://www.1024cores.net/home/lock-free-algorithms/tricks/spinning - pub fn yield(self: *@This()) void { - defer self.iteration +%= 1; - if (self.iteration < 20) { - SpinLock.yield(self.iteration); - } else if (self.iteration < 24) { - os.sched_yield() catch time.sleep(1); - } else if (self.iteration < 26) { - time.sleep(1 * time.millisecond); - } else { - time.sleep(10 * time.millisecond); - } - } - }; }; test "spinlock" { var lock = SpinLock.init(); + defer lock.deinit(); + const held = lock.acquire(); defer held.release(); }