From ca2d566ec85bee81396f64325844cc760b3cf870 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 23 Nov 2019 16:24:01 -0600 Subject: [PATCH] replace ThreadParker with ResetEvent + WordLock mutex --- lib/std/mutex.zig | 136 ++++++++++++++++++++++------------ lib/std/parker.zig | 180 --------------------------------------------- lib/std/std.zig | 2 +- 3 files changed, 88 insertions(+), 230 deletions(-) delete mode 100644 lib/std/parker.zig diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig index e8f83a4a17..39cfab19ce 100644 --- a/lib/std/mutex.zig +++ b/lib/std/mutex.zig @@ -1,13 +1,12 @@ const std = @import("std.zig"); const builtin = @import("builtin"); const testing = std.testing; -const SpinLock = std.SpinLock; -const ThreadParker = std.ThreadParker; +const ResetEvent = std.ResetEvent; /// Lock may be held only once. If the same thread /// tries to acquire the same mutex twice, it deadlocks. -/// This type supports static initialization and is based off of Golang 1.13 runtime.lock_futex: -/// https://github.com/golang/go/blob/master/src/runtime/lock_futex.go +/// This type supports static initialization and is based off of Webkit's WTF Lock (via rust parking_lot) +/// https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs /// When an application is built in single threaded release mode, all the functions are /// no-ops. In single threaded debug mode, there is deadlock detection. pub const Mutex = if (builtin.single_threaded) @@ -39,80 +38,119 @@ pub const Mutex = if (builtin.single_threaded) } else struct { - state: State, // TODO: make this an enum - parker: ThreadParker, + state: usize, - const State = enum(u32) { - Unlocked, - Sleeping, - Locked, - }; + const MUTEX_LOCK: usize = 1 << 0; + const QUEUE_LOCK: usize = 1 << 1; + const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK); + const QueueNode = std.atomic.Stack(ResetEvent).Node; /// number of iterations to spin yielding the cpu const SPIN_CPU = 4; - /// number of iterations to perform in the cpu yield loop + /// number of iterations to spin in the cpu yield loop const SPIN_CPU_COUNT = 30; /// number of iterations to spin yielding the thread const SPIN_THREAD = 1; pub fn init() Mutex { - return Mutex{ - .state = .Unlocked, - .parker = ThreadParker.init(), - }; + return Mutex{ .state = 0 }; } pub fn deinit(self: *Mutex) void { - self.parker.deinit(); + self.* = undefined; } pub const Held = struct { mutex: *Mutex, pub fn release(self: Held) void { - switch (@atomicRmw(State, &self.mutex.state, .Xchg, .Unlocked, .Release)) { - .Locked => {}, - .Sleeping => self.mutex.parker.unpark(@ptrCast(*const u32, &self.mutex.state)), - .Unlocked => unreachable, // unlocking an unlocked mutex - else => unreachable, // should never be anything else + // since MUTEX_LOCK is the first bit, we can use (.Sub) instead of (.And, ~MUTEX_LOCK). + // this is because .Sub may be implemented more efficiently than the latter + // (e.g. `lock xadd` vs `cmpxchg` loop on x86) + const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release); + if ((state & QUEUE_MASK) != 0 and (state & QUEUE_LOCK) == 0) { + self.mutex.releaseSlow(state); } } }; pub fn acquire(self: *Mutex) Held { - // Try and speculatively grab the lock. - // If it fails, the state is either Locked or Sleeping - // depending on if theres a thread stuck sleeping below. - var state = @atomicRmw(State, &self.state, .Xchg, .Locked, .Acquire); - if (state == .Unlocked) - return Held{ .mutex = self }; + // fast path close to SpinLock fast path + if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic)) |current_state| { + self.acquireSlow(current_state); + } + return Held{ .mutex = self }; + } + + fn acquireSlow(self: *Mutex, current_state: usize) void { + var spin: usize = 0; + var state = current_state; + while (true) { + + // try and acquire the lock if unlocked + if ((state & MUTEX_LOCK) == 0) { + state = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; + continue; + } + + // spin only if the waiting queue isn't empty and when it hasn't spun too much already + if ((state & QUEUE_MASK) == 0 and spin < SPIN_CPU + SPIN_THREAD) { + if (spin < SPIN_CPU) { + std.SpinLock.yield(SPIN_CPU_COUNT); + } else { + std.os.sched_yield() catch std.time.sleep(0); + } + state = @atomicLoad(usize, &self.state, .Monotonic); + continue; + } + + // thread should block, try and add this event to the waiting queue + var node = QueueNode{ + .next = @intToPtr(?*QueueNode, state & QUEUE_MASK), + .data = ResetEvent.init(), + }; + defer node.data.deinit(); + const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK); + state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { + // node is in the queue, wait until a `held.release()` wakes us up. + _ = node.data.wait(null) catch unreachable; + spin = 0; + state = @atomicLoad(usize, &self.state, .Monotonic); + continue; + }; + } + } + + fn releaseSlow(self: *Mutex, current_state: usize) void { + // grab the QUEUE_LOCK in order to signal a waiting queue node's event. + var state = current_state; + while (true) { + if ((state & QUEUE_LOCK) != 0 or (state & QUEUE_MASK) == 0) + return; + state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break; + } while (true) { - // try and acquire the lock using cpu spinning on failure - var spin: usize = 0; - while (spin < SPIN_CPU) : (spin += 1) { - var value = @atomicLoad(State, &self.state, .Monotonic); - while (value == .Unlocked) - value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self }; - SpinLock.yield(SPIN_CPU_COUNT); + // barrier needed to observe incoming state changes + defer @fence(.Acquire); + + // the mutex is currently locked. try to unset the QUEUE_LOCK and let the locker wake up the next node. + // avoids waking up multiple sleeping threads which try to acquire the lock again which increases contention. + if ((state & MUTEX_LOCK) != 0) { + state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Monotonic) orelse return; + continue; } - // try and acquire the lock using thread rescheduling on failure - spin = 0; - while (spin < SPIN_THREAD) : (spin += 1) { - var value = @atomicLoad(State, &self.state, .Monotonic); - while (value == .Unlocked) - value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self }; - std.os.sched_yield() catch std.time.sleep(1); - } - - // failed to acquire the lock, go to sleep until woken up by `Held.release()` - if (@atomicRmw(State, &self.state, .Xchg, .Sleeping, .Acquire) == .Unlocked) - return Held{ .mutex = self }; - state = .Sleeping; - self.parker.park(@ptrCast(*const u32, &self.state), @enumToInt(State.Sleeping)); + // try to pop the top node on the waiting queue stack to wake it up + // while at the same time unsetting the QUEUE_LOCK. + const node = @intToPtr(*QueueNode, state & QUEUE_MASK); + const new_state = @ptrToInt(node.next) | (state & MUTEX_LOCK); + state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { + _ = node.data.set(false); + return; + }; } } }; diff --git a/lib/std/parker.zig b/lib/std/parker.zig deleted file mode 100644 index 4ba0100b9e..0000000000 --- a/lib/std/parker.zig +++ /dev/null @@ -1,180 +0,0 @@ -const std = @import("std.zig"); -const builtin = @import("builtin"); -const time = std.time; -const testing = std.testing; -const assert = std.debug.assert; -const SpinLock = std.SpinLock; -const linux = std.os.linux; -const windows = std.os.windows; - -pub const ThreadParker = switch (builtin.os) { - .linux => if (builtin.link_libc) PosixParker else LinuxParker, - .windows => WindowsParker, - else => if (builtin.link_libc) PosixParker else SpinParker, -}; - -const SpinParker = struct { - pub fn init() SpinParker { - return SpinParker{}; - } - pub fn deinit(self: *SpinParker) void {} - - pub fn unpark(self: *SpinParker, ptr: *const u32) void {} - - pub fn park(self: *SpinParker, ptr: *const u32, expected: u32) void { - var backoff = SpinLock.Backoff.init(); - while (@atomicLoad(u32, ptr, .Acquire) == expected) - backoff.yield(); - } -}; - -const LinuxParker = struct { - pub fn init() LinuxParker { - return LinuxParker{}; - } - pub fn deinit(self: *LinuxParker) void {} - - pub fn unpark(self: *LinuxParker, ptr: *const u32) void { - const rc = linux.futex_wake(@ptrCast(*const i32, ptr), linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); - assert(linux.getErrno(rc) == 0); - } - - pub fn park(self: *LinuxParker, ptr: *const u32, expected: u32) void { - const value = @intCast(i32, expected); - while (@atomicLoad(u32, ptr, .Acquire) == expected) { - const rc = linux.futex_wait(@ptrCast(*const i32, ptr), linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, value, null); - switch (linux.getErrno(rc)) { - 0, linux.EAGAIN => return, - linux.EINTR => continue, - linux.EINVAL => unreachable, - else => continue, - } - } - } -}; - -const WindowsParker = struct { - waiters: u32, - - pub fn init() WindowsParker { - return WindowsParker{ .waiters = 0 }; - } - pub fn deinit(self: *WindowsParker) void {} - - pub fn unpark(self: *WindowsParker, ptr: *const u32) void { - const key = @ptrCast(*const c_void, ptr); - const handle = getEventHandle() orelse return; - - var waiting = @atomicLoad(u32, &self.waiters, .Monotonic); - while (waiting != 0) { - waiting = @cmpxchgWeak(u32, &self.waiters, waiting, waiting - 1, .Acquire, .Monotonic) orelse { - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == 0); - return; - }; - } - } - - pub fn park(self: *WindowsParker, ptr: *const u32, expected: u32) void { - var spin = SpinLock.Backoff.init(); - const ev_handle = getEventHandle(); - const key = @ptrCast(*const c_void, ptr); - - while (@atomicLoad(u32, ptr, .Monotonic) == expected) { - if (ev_handle) |handle| { - _ = @atomicRmw(u32, &self.waiters, .Add, 1, .Release); - const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == 0); - } else { - spin.yield(); - } - } - } - - var event_handle = std.lazyInit(windows.HANDLE); - - fn getEventHandle() ?windows.HANDLE { - if (event_handle.get()) |handle_ptr| - return handle_ptr.*; - defer event_handle.resolve(); - - const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; - if (windows.ntdll.NtCreateKeyedEvent(&event_handle.data, access_mask, null, 0) != 0) - return null; - return event_handle.data; - } -}; - -const PosixParker = struct { - cond: c.pthread_cond_t, - mutex: c.pthread_mutex_t, - - const c = std.c; - - pub fn init() PosixParker { - return PosixParker{ - .cond = c.PTHREAD_COND_INITIALIZER, - .mutex = c.PTHREAD_MUTEX_INITIALIZER, - }; - } - - pub fn deinit(self: *PosixParker) void { - // On dragonfly, the destroy functions return EINVAL if they were initialized statically. - const retm = c.pthread_mutex_destroy(&self.mutex); - assert(retm == 0 or retm == (if (builtin.os == .dragonfly) os.EINVAL else 0)); - const retc = c.pthread_cond_destroy(&self.cond); - assert(retc == 0 or retc == (if (builtin.os == .dragonfly) os.EINVAL else 0)); - } - - pub fn unpark(self: *PosixParker, ptr: *const u32) void { - assert(c.pthread_mutex_lock(&self.mutex) == 0); - defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - assert(c.pthread_cond_signal(&self.cond) == 0); - } - - pub fn park(self: *PosixParker, ptr: *const u32, expected: u32) void { - assert(c.pthread_mutex_lock(&self.mutex) == 0); - defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - while (@atomicLoad(u32, ptr, .Acquire) == expected) - assert(c.pthread_cond_wait(&self.cond, &self.mutex) == 0); - } -}; - -test "std.ThreadParker" { - if (builtin.single_threaded) - return error.SkipZigTest; - - const Context = struct { - parker: ThreadParker, - data: u32, - - fn receiver(self: *@This()) void { - self.parker.park(&self.data, 0); // receives 1 - assert(@atomicRmw(u32, &self.data, .Xchg, 2, .SeqCst) == 1); // sends 2 - self.parker.unpark(&self.data); // wakes up waiters on 2 - self.parker.park(&self.data, 2); // receives 3 - assert(@atomicRmw(u32, &self.data, .Xchg, 4, .SeqCst) == 3); // sends 4 - self.parker.unpark(&self.data); // wakes up waiters on 4 - } - - fn sender(self: *@This()) void { - assert(@atomicRmw(u32, &self.data, .Xchg, 1, .SeqCst) == 0); // sends 1 - self.parker.unpark(&self.data); // wakes up waiters on 1 - self.parker.park(&self.data, 1); // receives 2 - assert(@atomicRmw(u32, &self.data, .Xchg, 3, .SeqCst) == 2); // sends 3 - self.parker.unpark(&self.data); // wakes up waiters on 3 - self.parker.park(&self.data, 3); // receives 4 - } - }; - - var context = Context{ - .parker = ThreadParker.init(), - .data = 0, - }; - defer context.parker.deinit(); - - var receiver = try std.Thread.spawn(&context, Context.receiver); - defer receiver.wait(); - - context.sender(); -} diff --git a/lib/std/std.zig b/lib/std/std.zig index 83b7ed6e94..09db489604 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -16,6 +16,7 @@ pub const PackedIntSlice = @import("packed_int_array.zig").PackedIntSlice; pub const PackedIntSliceEndian = @import("packed_int_array.zig").PackedIntSliceEndian; pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue; pub const Progress = @import("progress.zig").Progress; +pub const ResetEvent = @import("reset_event.zig").ResetEvent; pub const SegmentedList = @import("segmented_list.zig").SegmentedList; pub const SinglyLinkedList = @import("linked_list.zig").SinglyLinkedList; pub const SpinLock = @import("spinlock.zig").SpinLock; @@ -23,7 +24,6 @@ pub const StringHashMap = @import("hash_map.zig").StringHashMap; pub const TailQueue = @import("linked_list.zig").TailQueue; pub const Target = @import("target.zig").Target; pub const Thread = @import("thread.zig").Thread; -pub const ThreadParker = @import("parker.zig").ThreadParker; pub const atomic = @import("atomic.zig"); pub const base64 = @import("base64.zig");