mirror of
https://github.com/ziglang/zig.git
synced 2024-11-27 15:42:49 +00:00
205 lines
7.4 KiB
Zig
205 lines
7.4 KiB
Zig
const std = @import("std.zig");
|
|
const builtin = @import("builtin");
|
|
const testing = std.testing;
|
|
const ResetEvent = std.ResetEvent;
|
|
|
|
/// Lock may be held only once. If the same thread
|
|
/// tries to acquire the same mutex twice, it deadlocks.
|
|
/// This type supports static initialization and is based off of Webkit's WTF Lock (via rust parking_lot)
|
|
/// https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs
|
|
/// When an application is built in single threaded release mode, all the functions are
|
|
/// no-ops. In single threaded debug mode, there is deadlock detection.
|
|
pub const Mutex = if (builtin.single_threaded)
|
|
struct {
|
|
lock: @typeOf(lock_init),
|
|
|
|
const lock_init = if (std.debug.runtime_safety) false else {};
|
|
|
|
pub const Held = struct {
|
|
mutex: *Mutex,
|
|
|
|
pub fn release(self: Held) void {
|
|
if (std.debug.runtime_safety) {
|
|
self.mutex.lock = false;
|
|
}
|
|
}
|
|
};
|
|
pub fn init() Mutex {
|
|
return Mutex{ .lock = lock_init };
|
|
}
|
|
pub fn deinit(self: *Mutex) void {}
|
|
|
|
pub fn acquire(self: *Mutex) Held {
|
|
if (std.debug.runtime_safety and self.lock) {
|
|
@panic("deadlock detected");
|
|
}
|
|
return Held{ .mutex = self };
|
|
}
|
|
}
|
|
else
|
|
struct {
|
|
state: usize,
|
|
|
|
const MUTEX_LOCK: usize = 1 << 0;
|
|
const QUEUE_LOCK: usize = 1 << 1;
|
|
const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK);
|
|
const QueueNode = std.atomic.Stack(ResetEvent).Node;
|
|
|
|
/// number of iterations to spin yielding the cpu
|
|
const SPIN_CPU = 4;
|
|
|
|
/// number of iterations to spin in the cpu yield loop
|
|
const SPIN_CPU_COUNT = 30;
|
|
|
|
/// number of iterations to spin yielding the thread
|
|
const SPIN_THREAD = 1;
|
|
|
|
pub fn init() Mutex {
|
|
return Mutex{ .state = 0 };
|
|
}
|
|
|
|
pub fn deinit(self: *Mutex) void {
|
|
self.* = undefined;
|
|
}
|
|
|
|
pub const Held = struct {
|
|
mutex: *Mutex,
|
|
|
|
pub fn release(self: Held) void {
|
|
// since MUTEX_LOCK is the first bit, we can use (.Sub) instead of (.And, ~MUTEX_LOCK).
|
|
// this is because .Sub may be implemented more efficiently than the latter
|
|
// (e.g. `lock xadd` vs `cmpxchg` loop on x86)
|
|
const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release);
|
|
if ((state & QUEUE_MASK) != 0 and (state & QUEUE_LOCK) == 0) {
|
|
self.mutex.releaseSlow(state);
|
|
}
|
|
}
|
|
};
|
|
|
|
pub fn acquire(self: *Mutex) Held {
|
|
// fast path close to SpinLock fast path
|
|
if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic)) |current_state| {
|
|
self.acquireSlow(current_state);
|
|
}
|
|
return Held{ .mutex = self };
|
|
}
|
|
|
|
fn acquireSlow(self: *Mutex, current_state: usize) void {
|
|
var spin: usize = 0;
|
|
var state = current_state;
|
|
while (true) {
|
|
|
|
// try and acquire the lock if unlocked
|
|
if ((state & MUTEX_LOCK) == 0) {
|
|
state = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
|
|
continue;
|
|
}
|
|
|
|
// spin only if the waiting queue isn't empty and when it hasn't spun too much already
|
|
if ((state & QUEUE_MASK) == 0 and spin < SPIN_CPU + SPIN_THREAD) {
|
|
if (spin < SPIN_CPU) {
|
|
std.SpinLock.yield(SPIN_CPU_COUNT);
|
|
} else {
|
|
std.os.sched_yield() catch std.time.sleep(0);
|
|
}
|
|
state = @atomicLoad(usize, &self.state, .Monotonic);
|
|
continue;
|
|
}
|
|
|
|
// thread should block, try and add this event to the waiting queue
|
|
var node = QueueNode{
|
|
.next = @intToPtr(?*QueueNode, state & QUEUE_MASK),
|
|
.data = ResetEvent.init(),
|
|
};
|
|
defer node.data.deinit();
|
|
const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK);
|
|
state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
|
|
// node is in the queue, wait until a `held.release()` wakes us up.
|
|
_ = node.data.wait(null) catch unreachable;
|
|
spin = 0;
|
|
state = @atomicLoad(usize, &self.state, .Monotonic);
|
|
continue;
|
|
};
|
|
}
|
|
}
|
|
|
|
fn releaseSlow(self: *Mutex, current_state: usize) void {
|
|
// grab the QUEUE_LOCK in order to signal a waiting queue node's event.
|
|
var state = current_state;
|
|
while (true) {
|
|
if ((state & QUEUE_LOCK) != 0 or (state & QUEUE_MASK) == 0)
|
|
return;
|
|
state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break;
|
|
}
|
|
|
|
while (true) {
|
|
// barrier needed to observe incoming state changes
|
|
defer @fence(.Acquire);
|
|
|
|
// the mutex is currently locked. try to unset the QUEUE_LOCK and let the locker wake up the next node.
|
|
// avoids waking up multiple sleeping threads which try to acquire the lock again which increases contention.
|
|
if ((state & MUTEX_LOCK) != 0) {
|
|
state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Monotonic) orelse return;
|
|
continue;
|
|
}
|
|
|
|
// try to pop the top node on the waiting queue stack to wake it up
|
|
// while at the same time unsetting the QUEUE_LOCK.
|
|
const node = @intToPtr(*QueueNode, state & QUEUE_MASK);
|
|
const new_state = @ptrToInt(node.next) | (state & MUTEX_LOCK);
|
|
state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
|
|
_ = node.data.set(false);
|
|
return;
|
|
};
|
|
}
|
|
}
|
|
};
|
|
|
|
const TestContext = struct {
|
|
mutex: *Mutex,
|
|
data: i128,
|
|
|
|
const incr_count = 10000;
|
|
};
|
|
|
|
test "std.Mutex" {
|
|
var plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024);
|
|
defer std.heap.page_allocator.free(plenty_of_memory);
|
|
|
|
var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
|
|
var a = &fixed_buffer_allocator.allocator;
|
|
|
|
var mutex = Mutex.init();
|
|
defer mutex.deinit();
|
|
|
|
var context = TestContext{
|
|
.mutex = &mutex,
|
|
.data = 0,
|
|
};
|
|
|
|
if (builtin.single_threaded) {
|
|
worker(&context);
|
|
testing.expect(context.data == TestContext.incr_count);
|
|
} else {
|
|
const thread_count = 10;
|
|
var threads: [thread_count]*std.Thread = undefined;
|
|
for (threads) |*t| {
|
|
t.* = try std.Thread.spawn(&context, worker);
|
|
}
|
|
for (threads) |t|
|
|
t.wait();
|
|
|
|
testing.expect(context.data == thread_count * TestContext.incr_count);
|
|
}
|
|
}
|
|
|
|
fn worker(ctx: *TestContext) void {
|
|
var i: usize = 0;
|
|
while (i != TestContext.incr_count) : (i += 1) {
|
|
const held = ctx.mutex.acquire();
|
|
defer held.release();
|
|
|
|
ctx.data += 1;
|
|
}
|
|
}
|