mirror of
https://github.com/ziglang/zig.git
synced 2024-11-28 08:02:32 +00:00
remove std.event
This commit is contained in:
parent
8d11ade6a7
commit
b0bea72588
@ -233,9 +233,6 @@ set(ZIG_STAGE2_SOURCES
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/dwarf/OP.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/dwarf/TAG.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/elf.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/event.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/event/batch.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/event/loop.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/fifo.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/fmt.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/fmt/errol.zig"
|
||||
|
@ -1,23 +0,0 @@
|
||||
pub const Channel = @import("event/channel.zig").Channel;
|
||||
pub const Future = @import("event/future.zig").Future;
|
||||
pub const Group = @import("event/group.zig").Group;
|
||||
pub const Batch = @import("event/batch.zig").Batch;
|
||||
pub const Lock = @import("event/lock.zig").Lock;
|
||||
pub const Locked = @import("event/locked.zig").Locked;
|
||||
pub const RwLock = @import("event/rwlock.zig").RwLock;
|
||||
pub const RwLocked = @import("event/rwlocked.zig").RwLocked;
|
||||
pub const Loop = @import("event/loop.zig").Loop;
|
||||
pub const WaitGroup = @import("event/wait_group.zig").WaitGroup;
|
||||
|
||||
test {
|
||||
_ = @import("event/channel.zig");
|
||||
_ = @import("event/future.zig");
|
||||
_ = @import("event/group.zig");
|
||||
_ = @import("event/batch.zig");
|
||||
_ = @import("event/lock.zig");
|
||||
_ = @import("event/locked.zig");
|
||||
_ = @import("event/rwlock.zig");
|
||||
_ = @import("event/rwlocked.zig");
|
||||
_ = @import("event/loop.zig");
|
||||
_ = @import("event/wait_group.zig");
|
||||
}
|
@ -1,141 +0,0 @@
|
||||
const std = @import("../std.zig");
|
||||
const testing = std.testing;
|
||||
|
||||
/// Performs multiple async functions in parallel, without heap allocation.
|
||||
/// Async function frames are managed externally to this abstraction, and
|
||||
/// passed in via the `add` function. Once all the jobs are added, call `wait`.
|
||||
/// This API is *not* thread-safe. The object must be accessed from one thread at
|
||||
/// a time, however, it need not be the same thread.
|
||||
pub fn Batch(
|
||||
/// The return value for each job.
|
||||
/// If a job slot was re-used due to maxed out concurrency, then its result
|
||||
/// value will be overwritten. The values can be accessed with the `results` field.
|
||||
comptime Result: type,
|
||||
/// How many jobs to run in parallel.
|
||||
comptime max_jobs: comptime_int,
|
||||
/// Controls whether the `add` and `wait` functions will be async functions.
|
||||
comptime async_behavior: enum {
|
||||
/// Observe the value of `std.io.is_async` to decide whether `add`
|
||||
/// and `wait` will be async functions. Asserts that the jobs do not suspend when
|
||||
/// `std.options.io_mode == .blocking`. This is a generally safe assumption, and the
|
||||
/// usual recommended option for this parameter.
|
||||
auto_async,
|
||||
|
||||
/// Always uses the `nosuspend` keyword when using `await` on the jobs,
|
||||
/// making `add` and `wait` non-async functions. Asserts that the jobs do not suspend.
|
||||
never_async,
|
||||
|
||||
/// `add` and `wait` use regular `await` keyword, making them async functions.
|
||||
always_async,
|
||||
},
|
||||
) type {
|
||||
return struct {
|
||||
jobs: [max_jobs]Job,
|
||||
next_job_index: usize,
|
||||
collected_result: CollectedResult,
|
||||
|
||||
const Job = struct {
|
||||
frame: ?anyframe->Result,
|
||||
result: Result,
|
||||
};
|
||||
|
||||
const Self = @This();
|
||||
|
||||
const CollectedResult = switch (@typeInfo(Result)) {
|
||||
.ErrorUnion => Result,
|
||||
else => void,
|
||||
};
|
||||
|
||||
const async_ok = switch (async_behavior) {
|
||||
.auto_async => std.io.is_async,
|
||||
.never_async => false,
|
||||
.always_async => true,
|
||||
};
|
||||
|
||||
pub fn init() Self {
|
||||
return Self{
|
||||
.jobs = [1]Job{
|
||||
.{
|
||||
.frame = null,
|
||||
.result = undefined,
|
||||
},
|
||||
} ** max_jobs,
|
||||
.next_job_index = 0,
|
||||
.collected_result = {},
|
||||
};
|
||||
}
|
||||
|
||||
/// Add a frame to the Batch. If all jobs are in-flight, then this function
|
||||
/// waits until one completes.
|
||||
/// This function is *not* thread-safe. It must be called from one thread at
|
||||
/// a time, however, it need not be the same thread.
|
||||
/// TODO: "select" language feature to use the next available slot, rather than
|
||||
/// awaiting the next index.
|
||||
pub fn add(self: *Self, frame: anyframe->Result) void {
|
||||
const job = &self.jobs[self.next_job_index];
|
||||
self.next_job_index = (self.next_job_index + 1) % max_jobs;
|
||||
if (job.frame) |existing| {
|
||||
job.result = if (async_ok) await existing else nosuspend await existing;
|
||||
if (CollectedResult != void) {
|
||||
job.result catch |err| {
|
||||
self.collected_result = err;
|
||||
};
|
||||
}
|
||||
}
|
||||
job.frame = frame;
|
||||
}
|
||||
|
||||
/// Wait for all the jobs to complete.
|
||||
/// Safe to call any number of times.
|
||||
/// If `Result` is an error union, this function returns the last error that occurred, if any.
|
||||
/// Unlike the `results` field, the return value of `wait` will report any error that occurred;
|
||||
/// hitting max parallelism will not compromise the result.
|
||||
/// This function is *not* thread-safe. It must be called from one thread at
|
||||
/// a time, however, it need not be the same thread.
|
||||
pub fn wait(self: *Self) CollectedResult {
|
||||
for (self.jobs) |*job|
|
||||
if (job.frame) |f| {
|
||||
job.result = if (async_ok) await f else nosuspend await f;
|
||||
if (CollectedResult != void) {
|
||||
job.result catch |err| {
|
||||
self.collected_result = err;
|
||||
};
|
||||
}
|
||||
job.frame = null;
|
||||
};
|
||||
return self.collected_result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test "std.event.Batch" {
|
||||
if (true) return error.SkipZigTest;
|
||||
var count: usize = 0;
|
||||
var batch = Batch(void, 2, .auto_async).init();
|
||||
batch.add(&async sleepALittle(&count));
|
||||
batch.add(&async increaseByTen(&count));
|
||||
batch.wait();
|
||||
try testing.expect(count == 11);
|
||||
|
||||
var another = Batch(anyerror!void, 2, .auto_async).init();
|
||||
another.add(&async somethingElse());
|
||||
another.add(&async doSomethingThatFails());
|
||||
try testing.expectError(error.ItBroke, another.wait());
|
||||
}
|
||||
|
||||
fn sleepALittle(count: *usize) void {
|
||||
std.time.sleep(1 * std.time.ns_per_ms);
|
||||
_ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
|
||||
}
|
||||
|
||||
fn increaseByTen(count: *usize) void {
|
||||
var i: usize = 0;
|
||||
while (i < 10) : (i += 1) {
|
||||
_ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
fn doSomethingThatFails() anyerror!void {}
|
||||
fn somethingElse() anyerror!void {
|
||||
return error.ItBroke;
|
||||
}
|
@ -1,334 +0,0 @@
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const assert = std.debug.assert;
|
||||
const testing = std.testing;
|
||||
const Loop = std.event.Loop;
|
||||
|
||||
/// Many producer, many consumer, thread-safe, runtime configurable buffer size.
|
||||
/// When buffer is empty, consumers suspend and are resumed by producers.
|
||||
/// When buffer is full, producers suspend and are resumed by consumers.
|
||||
pub fn Channel(comptime T: type) type {
|
||||
return struct {
|
||||
getters: std.atomic.Queue(GetNode),
|
||||
or_null_queue: std.atomic.Queue(*std.atomic.Queue(GetNode).Node),
|
||||
putters: std.atomic.Queue(PutNode),
|
||||
get_count: usize,
|
||||
put_count: usize,
|
||||
dispatch_lock: bool,
|
||||
need_dispatch: bool,
|
||||
|
||||
// simple fixed size ring buffer
|
||||
buffer_nodes: []T,
|
||||
buffer_index: usize,
|
||||
buffer_len: usize,
|
||||
|
||||
const SelfChannel = @This();
|
||||
const GetNode = struct {
|
||||
tick_node: *Loop.NextTickNode,
|
||||
data: Data,
|
||||
|
||||
const Data = union(enum) {
|
||||
Normal: Normal,
|
||||
OrNull: OrNull,
|
||||
};
|
||||
|
||||
const Normal = struct {
|
||||
ptr: *T,
|
||||
};
|
||||
|
||||
const OrNull = struct {
|
||||
ptr: *?T,
|
||||
or_null: *std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node,
|
||||
};
|
||||
};
|
||||
const PutNode = struct {
|
||||
data: T,
|
||||
tick_node: *Loop.NextTickNode,
|
||||
};
|
||||
|
||||
const global_event_loop = Loop.instance orelse
|
||||
@compileError("std.event.Channel currently only works with event-based I/O");
|
||||
|
||||
/// Call `deinit` to free resources when done.
|
||||
/// `buffer` must live until `deinit` is called.
|
||||
/// For a zero length buffer, use `[0]T{}`.
|
||||
/// TODO https://github.com/ziglang/zig/issues/2765
|
||||
pub fn init(self: *SelfChannel, buffer: []T) void {
|
||||
// The ring buffer implementation only works with power of 2 buffer sizes
|
||||
// because of relying on subtracting across zero. For example (0 -% 1) % 10 == 5
|
||||
assert(buffer.len == 0 or @popCount(buffer.len) == 1);
|
||||
|
||||
self.* = SelfChannel{
|
||||
.buffer_len = 0,
|
||||
.buffer_nodes = buffer,
|
||||
.buffer_index = 0,
|
||||
.dispatch_lock = false,
|
||||
.need_dispatch = false,
|
||||
.getters = std.atomic.Queue(GetNode).init(),
|
||||
.putters = std.atomic.Queue(PutNode).init(),
|
||||
.or_null_queue = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).init(),
|
||||
.get_count = 0,
|
||||
.put_count = 0,
|
||||
};
|
||||
}
|
||||
|
||||
/// Must be called when all calls to put and get have suspended and no more calls occur.
|
||||
/// This can be omitted if caller can guarantee that the suspended putters and getters
|
||||
/// do not need to be run to completion. Note that this may leave awaiters hanging.
|
||||
pub fn deinit(self: *SelfChannel) void {
|
||||
while (self.getters.get()) |get_node| {
|
||||
resume get_node.data.tick_node.data;
|
||||
}
|
||||
while (self.putters.get()) |put_node| {
|
||||
resume put_node.data.tick_node.data;
|
||||
}
|
||||
self.* = undefined;
|
||||
}
|
||||
|
||||
/// puts a data item in the channel. The function returns when the value has been added to the
|
||||
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
|
||||
/// Or when the channel is destroyed.
|
||||
pub fn put(self: *SelfChannel, data: T) void {
|
||||
var my_tick_node = Loop.NextTickNode{ .data = @frame() };
|
||||
var queue_node = std.atomic.Queue(PutNode).Node{
|
||||
.data = PutNode{
|
||||
.tick_node = &my_tick_node,
|
||||
.data = data,
|
||||
},
|
||||
};
|
||||
|
||||
suspend {
|
||||
self.putters.put(&queue_node);
|
||||
_ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
|
||||
|
||||
self.dispatch();
|
||||
}
|
||||
}
|
||||
|
||||
/// await this function to get an item from the channel. If the buffer is empty, the frame will
|
||||
/// complete when the next item is put in the channel.
|
||||
pub fn get(self: *SelfChannel) callconv(.Async) T {
|
||||
// TODO https://github.com/ziglang/zig/issues/2765
|
||||
var result: T = undefined;
|
||||
var my_tick_node = Loop.NextTickNode{ .data = @frame() };
|
||||
var queue_node = std.atomic.Queue(GetNode).Node{
|
||||
.data = GetNode{
|
||||
.tick_node = &my_tick_node,
|
||||
.data = GetNode.Data{
|
||||
.Normal = GetNode.Normal{ .ptr = &result },
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
suspend {
|
||||
self.getters.put(&queue_node);
|
||||
_ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
|
||||
|
||||
self.dispatch();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
//pub async fn select(comptime EnumUnion: type, channels: ...) EnumUnion {
|
||||
// assert(@memberCount(EnumUnion) == channels.len); // enum union and channels mismatch
|
||||
// assert(channels.len != 0); // enum unions cannot have 0 fields
|
||||
// if (channels.len == 1) {
|
||||
// const result = await (async channels[0].get() catch unreachable);
|
||||
// return @unionInit(EnumUnion, @memberName(EnumUnion, 0), result);
|
||||
// }
|
||||
//}
|
||||
|
||||
/// Get an item from the channel. If the buffer is empty and there are no
|
||||
/// puts waiting, this returns `null`.
|
||||
pub fn getOrNull(self: *SelfChannel) ?T {
|
||||
// TODO integrate this function with named return values
|
||||
// so we can get rid of this extra result copy
|
||||
var result: ?T = null;
|
||||
var my_tick_node = Loop.NextTickNode{ .data = @frame() };
|
||||
var or_null_node = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node{ .data = undefined };
|
||||
var queue_node = std.atomic.Queue(GetNode).Node{
|
||||
.data = GetNode{
|
||||
.tick_node = &my_tick_node,
|
||||
.data = GetNode.Data{
|
||||
.OrNull = GetNode.OrNull{
|
||||
.ptr = &result,
|
||||
.or_null = &or_null_node,
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
or_null_node.data = &queue_node;
|
||||
|
||||
suspend {
|
||||
self.getters.put(&queue_node);
|
||||
_ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
|
||||
self.or_null_queue.put(&or_null_node);
|
||||
|
||||
self.dispatch();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
fn dispatch(self: *SelfChannel) void {
|
||||
// set the "need dispatch" flag
|
||||
@atomicStore(bool, &self.need_dispatch, true, .SeqCst);
|
||||
|
||||
lock: while (true) {
|
||||
// set the lock flag
|
||||
if (@atomicRmw(bool, &self.dispatch_lock, .Xchg, true, .SeqCst)) return;
|
||||
|
||||
// clear the need_dispatch flag since we're about to do it
|
||||
@atomicStore(bool, &self.need_dispatch, false, .SeqCst);
|
||||
|
||||
while (true) {
|
||||
one_dispatch: {
|
||||
// later we correct these extra subtractions
|
||||
var get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
|
||||
var put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
|
||||
|
||||
// transfer self.buffer to self.getters
|
||||
while (self.buffer_len != 0) {
|
||||
if (get_count == 0) break :one_dispatch;
|
||||
|
||||
const get_node = &self.getters.get().?.data;
|
||||
switch (get_node.data) {
|
||||
GetNode.Data.Normal => |info| {
|
||||
info.ptr.* = self.buffer_nodes[(self.buffer_index -% self.buffer_len) % self.buffer_nodes.len];
|
||||
},
|
||||
GetNode.Data.OrNull => |info| {
|
||||
_ = self.or_null_queue.remove(info.or_null);
|
||||
info.ptr.* = self.buffer_nodes[(self.buffer_index -% self.buffer_len) % self.buffer_nodes.len];
|
||||
},
|
||||
}
|
||||
global_event_loop.onNextTick(get_node.tick_node);
|
||||
self.buffer_len -= 1;
|
||||
|
||||
get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
|
||||
}
|
||||
|
||||
// direct transfer self.putters to self.getters
|
||||
while (get_count != 0 and put_count != 0) {
|
||||
const get_node = &self.getters.get().?.data;
|
||||
const put_node = &self.putters.get().?.data;
|
||||
|
||||
switch (get_node.data) {
|
||||
GetNode.Data.Normal => |info| {
|
||||
info.ptr.* = put_node.data;
|
||||
},
|
||||
GetNode.Data.OrNull => |info| {
|
||||
_ = self.or_null_queue.remove(info.or_null);
|
||||
info.ptr.* = put_node.data;
|
||||
},
|
||||
}
|
||||
global_event_loop.onNextTick(get_node.tick_node);
|
||||
global_event_loop.onNextTick(put_node.tick_node);
|
||||
|
||||
get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
|
||||
put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
|
||||
}
|
||||
|
||||
// transfer self.putters to self.buffer
|
||||
while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
|
||||
const put_node = &self.putters.get().?.data;
|
||||
|
||||
self.buffer_nodes[self.buffer_index % self.buffer_nodes.len] = put_node.data;
|
||||
global_event_loop.onNextTick(put_node.tick_node);
|
||||
self.buffer_index +%= 1;
|
||||
self.buffer_len += 1;
|
||||
|
||||
put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
// undo the extra subtractions
|
||||
_ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
|
||||
_ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
|
||||
|
||||
// All the "get or null" functions should resume now.
|
||||
var remove_count: usize = 0;
|
||||
while (self.or_null_queue.get()) |or_null_node| {
|
||||
remove_count += @intFromBool(self.getters.remove(or_null_node.data));
|
||||
global_event_loop.onNextTick(or_null_node.data.data.tick_node);
|
||||
}
|
||||
if (remove_count != 0) {
|
||||
_ = @atomicRmw(usize, &self.get_count, .Sub, remove_count, .SeqCst);
|
||||
}
|
||||
|
||||
// clear need-dispatch flag
|
||||
if (@atomicRmw(bool, &self.need_dispatch, .Xchg, false, .SeqCst)) continue;
|
||||
|
||||
assert(@atomicRmw(bool, &self.dispatch_lock, .Xchg, false, .SeqCst));
|
||||
|
||||
// we have to check again now that we unlocked
|
||||
if (@atomicLoad(bool, &self.need_dispatch, .SeqCst)) continue :lock;
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test "std.event.Channel" {
|
||||
if (!std.io.is_async) return error.SkipZigTest;
|
||||
|
||||
// https://github.com/ziglang/zig/issues/1908
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
// https://github.com/ziglang/zig/issues/3251
|
||||
if (builtin.os.tag == .freebsd) return error.SkipZigTest;
|
||||
|
||||
var channel: Channel(i32) = undefined;
|
||||
channel.init(&[0]i32{});
|
||||
defer channel.deinit();
|
||||
|
||||
var handle = async testChannelGetter(&channel);
|
||||
var putter = async testChannelPutter(&channel);
|
||||
|
||||
await handle;
|
||||
await putter;
|
||||
}
|
||||
|
||||
test "std.event.Channel wraparound" {
|
||||
|
||||
// TODO provide a way to run tests in evented I/O mode
|
||||
if (!std.io.is_async) return error.SkipZigTest;
|
||||
|
||||
const channel_size = 2;
|
||||
|
||||
var buf: [channel_size]i32 = undefined;
|
||||
var channel: Channel(i32) = undefined;
|
||||
channel.init(&buf);
|
||||
defer channel.deinit();
|
||||
|
||||
// add items to channel and pull them out until
|
||||
// the buffer wraps around, make sure it doesn't crash.
|
||||
channel.put(5);
|
||||
try testing.expectEqual(@as(i32, 5), channel.get());
|
||||
channel.put(6);
|
||||
try testing.expectEqual(@as(i32, 6), channel.get());
|
||||
channel.put(7);
|
||||
try testing.expectEqual(@as(i32, 7), channel.get());
|
||||
}
|
||||
fn testChannelGetter(channel: *Channel(i32)) callconv(.Async) void {
|
||||
const value1 = channel.get();
|
||||
try testing.expect(value1 == 1234);
|
||||
|
||||
const value2 = channel.get();
|
||||
try testing.expect(value2 == 4567);
|
||||
|
||||
const value3 = channel.getOrNull();
|
||||
try testing.expect(value3 == null);
|
||||
|
||||
var last_put = async testPut(channel, 4444);
|
||||
const value4 = channel.getOrNull();
|
||||
try testing.expect(value4.? == 4444);
|
||||
await last_put;
|
||||
}
|
||||
fn testChannelPutter(channel: *Channel(i32)) callconv(.Async) void {
|
||||
channel.put(1234);
|
||||
channel.put(4567);
|
||||
}
|
||||
fn testPut(channel: *Channel(i32), value: i32) callconv(.Async) void {
|
||||
channel.put(value);
|
||||
}
|
@ -1,115 +0,0 @@
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const assert = std.debug.assert;
|
||||
const testing = std.testing;
|
||||
const Lock = std.event.Lock;
|
||||
|
||||
/// This is a value that starts out unavailable, until resolve() is called.
|
||||
/// While it is unavailable, functions suspend when they try to get() it,
|
||||
/// and then are resumed when resolve() is called.
|
||||
/// At this point the value remains forever available, and another resolve() is not allowed.
|
||||
pub fn Future(comptime T: type) type {
|
||||
return struct {
|
||||
lock: Lock,
|
||||
data: T,
|
||||
available: Available,
|
||||
|
||||
const Available = enum(u8) {
|
||||
NotStarted,
|
||||
Started,
|
||||
Finished,
|
||||
};
|
||||
|
||||
const Self = @This();
|
||||
const Queue = std.atomic.Queue(anyframe);
|
||||
|
||||
pub fn init() Self {
|
||||
return Self{
|
||||
.lock = Lock.initLocked(),
|
||||
.available = .NotStarted,
|
||||
.data = undefined,
|
||||
};
|
||||
}
|
||||
|
||||
/// Obtain the value. If it's not available, wait until it becomes
|
||||
/// available.
|
||||
/// Thread-safe.
|
||||
pub fn get(self: *Self) callconv(.Async) *T {
|
||||
if (@atomicLoad(Available, &self.available, .SeqCst) == .Finished) {
|
||||
return &self.data;
|
||||
}
|
||||
const held = self.lock.acquire();
|
||||
held.release();
|
||||
|
||||
return &self.data;
|
||||
}
|
||||
|
||||
/// Gets the data without waiting for it. If it's available, a pointer is
|
||||
/// returned. Otherwise, null is returned.
|
||||
pub fn getOrNull(self: *Self) ?*T {
|
||||
if (@atomicLoad(Available, &self.available, .SeqCst) == .Finished) {
|
||||
return &self.data;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/// If someone else has started working on the data, wait for them to complete
|
||||
/// and return a pointer to the data. Otherwise, return null, and the caller
|
||||
/// should start working on the data.
|
||||
/// It's not required to call start() before resolve() but it can be useful since
|
||||
/// this method is thread-safe.
|
||||
pub fn start(self: *Self) callconv(.Async) ?*T {
|
||||
const state = @cmpxchgStrong(Available, &self.available, .NotStarted, .Started, .SeqCst, .SeqCst) orelse return null;
|
||||
switch (state) {
|
||||
.Started => {
|
||||
const held = self.lock.acquire();
|
||||
held.release();
|
||||
return &self.data;
|
||||
},
|
||||
.Finished => return &self.data,
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
|
||||
/// Make the data become available. May be called only once.
|
||||
/// Before calling this, modify the `data` property.
|
||||
pub fn resolve(self: *Self) void {
|
||||
const prev = @atomicRmw(Available, &self.available, .Xchg, .Finished, .SeqCst);
|
||||
assert(prev != .Finished); // resolve() called twice
|
||||
Lock.Held.release(Lock.Held{ .lock = &self.lock });
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test "std.event.Future" {
|
||||
// https://github.com/ziglang/zig/issues/1908
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
// https://github.com/ziglang/zig/issues/3251
|
||||
if (builtin.os.tag == .freebsd) return error.SkipZigTest;
|
||||
// TODO provide a way to run tests in evented I/O mode
|
||||
if (!std.io.is_async) return error.SkipZigTest;
|
||||
|
||||
testFuture();
|
||||
}
|
||||
|
||||
fn testFuture() void {
|
||||
var future = Future(i32).init();
|
||||
|
||||
var a = async waitOnFuture(&future);
|
||||
var b = async waitOnFuture(&future);
|
||||
resolveFuture(&future);
|
||||
|
||||
const result = (await a) + (await b);
|
||||
|
||||
try testing.expect(result == 12);
|
||||
}
|
||||
|
||||
fn waitOnFuture(future: *Future(i32)) i32 {
|
||||
return future.get().*;
|
||||
}
|
||||
|
||||
fn resolveFuture(future: *Future(i32)) void {
|
||||
future.data = 6;
|
||||
future.resolve();
|
||||
}
|
@ -1,160 +0,0 @@
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const Lock = std.event.Lock;
|
||||
const testing = std.testing;
|
||||
const Allocator = std.mem.Allocator;
|
||||
|
||||
/// ReturnType must be `void` or `E!void`
|
||||
/// TODO This API was created back with the old design of async/await, when calling any
|
||||
/// async function required an allocator. There is an ongoing experiment to transition
|
||||
/// all uses of this API to the simpler and more resource-aware `std.event.Batch` API.
|
||||
/// If the transition goes well, all usages of `Group` will be gone, and this API
|
||||
/// will be deleted.
|
||||
pub fn Group(comptime ReturnType: type) type {
|
||||
return struct {
|
||||
frame_stack: Stack,
|
||||
alloc_stack: AllocStack,
|
||||
lock: Lock,
|
||||
allocator: Allocator,
|
||||
|
||||
const Self = @This();
|
||||
|
||||
const Error = switch (@typeInfo(ReturnType)) {
|
||||
.ErrorUnion => |payload| payload.error_set,
|
||||
else => void,
|
||||
};
|
||||
const Stack = std.atomic.Stack(anyframe->ReturnType);
|
||||
const AllocStack = std.atomic.Stack(Node);
|
||||
|
||||
pub const Node = struct {
|
||||
bytes: []const u8 = &[0]u8{},
|
||||
handle: anyframe->ReturnType,
|
||||
};
|
||||
|
||||
pub fn init(allocator: Allocator) Self {
|
||||
return Self{
|
||||
.frame_stack = Stack.init(),
|
||||
.alloc_stack = AllocStack.init(),
|
||||
.lock = .{},
|
||||
.allocator = allocator,
|
||||
};
|
||||
}
|
||||
|
||||
/// Add a frame to the group. Thread-safe.
|
||||
pub fn add(self: *Self, handle: anyframe->ReturnType) (error{OutOfMemory}!void) {
|
||||
const node = try self.allocator.create(AllocStack.Node);
|
||||
node.* = AllocStack.Node{
|
||||
.next = undefined,
|
||||
.data = Node{
|
||||
.handle = handle,
|
||||
},
|
||||
};
|
||||
self.alloc_stack.push(node);
|
||||
}
|
||||
|
||||
/// Add a node to the group. Thread-safe. Cannot fail.
|
||||
/// `node.data` should be the frame handle to add to the group.
|
||||
/// The node's memory should be in the function frame of
|
||||
/// the handle that is in the node, or somewhere guaranteed to live
|
||||
/// at least as long.
|
||||
pub fn addNode(self: *Self, node: *Stack.Node) void {
|
||||
self.frame_stack.push(node);
|
||||
}
|
||||
|
||||
/// This is equivalent to adding a frame to the group but the memory of its frame is
|
||||
/// allocated by the group and freed by `wait`.
|
||||
/// `func` must be async and have return type `ReturnType`.
|
||||
/// Thread-safe.
|
||||
pub fn call(self: *Self, comptime func: anytype, args: anytype) error{OutOfMemory}!void {
|
||||
const frame = try self.allocator.create(@TypeOf(@call(.{ .modifier = .async_kw }, func, args)));
|
||||
errdefer self.allocator.destroy(frame);
|
||||
const node = try self.allocator.create(AllocStack.Node);
|
||||
errdefer self.allocator.destroy(node);
|
||||
node.* = AllocStack.Node{
|
||||
.next = undefined,
|
||||
.data = Node{
|
||||
.handle = frame,
|
||||
.bytes = std.mem.asBytes(frame),
|
||||
},
|
||||
};
|
||||
frame.* = @call(.{ .modifier = .async_kw }, func, args);
|
||||
self.alloc_stack.push(node);
|
||||
}
|
||||
|
||||
/// Wait for all the calls and promises of the group to complete.
|
||||
/// Thread-safe.
|
||||
/// Safe to call any number of times.
|
||||
pub fn wait(self: *Self) callconv(.Async) ReturnType {
|
||||
const held = self.lock.acquire();
|
||||
defer held.release();
|
||||
|
||||
var result: ReturnType = {};
|
||||
|
||||
while (self.frame_stack.pop()) |node| {
|
||||
if (Error == void) {
|
||||
await node.data;
|
||||
} else {
|
||||
(await node.data) catch |err| {
|
||||
result = err;
|
||||
};
|
||||
}
|
||||
}
|
||||
while (self.alloc_stack.pop()) |node| {
|
||||
const handle = node.data.handle;
|
||||
if (Error == void) {
|
||||
await handle;
|
||||
} else {
|
||||
(await handle) catch |err| {
|
||||
result = err;
|
||||
};
|
||||
}
|
||||
self.allocator.free(node.data.bytes);
|
||||
self.allocator.destroy(node);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test "std.event.Group" {
|
||||
// https://github.com/ziglang/zig/issues/1908
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
if (!std.io.is_async) return error.SkipZigTest;
|
||||
|
||||
// TODO this file has bit-rotted. repair it
|
||||
if (true) return error.SkipZigTest;
|
||||
|
||||
_ = async testGroup(std.heap.page_allocator);
|
||||
}
|
||||
fn testGroup(allocator: Allocator) callconv(.Async) void {
|
||||
var count: usize = 0;
|
||||
var group = Group(void).init(allocator);
|
||||
var sleep_a_little_frame = async sleepALittle(&count);
|
||||
group.add(&sleep_a_little_frame) catch @panic("memory");
|
||||
var increase_by_ten_frame = async increaseByTen(&count);
|
||||
group.add(&increase_by_ten_frame) catch @panic("memory");
|
||||
group.wait();
|
||||
try testing.expect(count == 11);
|
||||
|
||||
var another = Group(anyerror!void).init(allocator);
|
||||
var something_else_frame = async somethingElse();
|
||||
another.add(&something_else_frame) catch @panic("memory");
|
||||
var something_that_fails_frame = async doSomethingThatFails();
|
||||
another.add(&something_that_fails_frame) catch @panic("memory");
|
||||
try testing.expectError(error.ItBroke, another.wait());
|
||||
}
|
||||
fn sleepALittle(count: *usize) callconv(.Async) void {
|
||||
std.time.sleep(1 * std.time.ns_per_ms);
|
||||
_ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
|
||||
}
|
||||
fn increaseByTen(count: *usize) callconv(.Async) void {
|
||||
var i: usize = 0;
|
||||
while (i < 10) : (i += 1) {
|
||||
_ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
|
||||
}
|
||||
}
|
||||
fn doSomethingThatFails() callconv(.Async) anyerror!void {}
|
||||
fn somethingElse() callconv(.Async) anyerror!void {
|
||||
return error.ItBroke;
|
||||
}
|
@ -1,162 +0,0 @@
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const assert = std.debug.assert;
|
||||
const testing = std.testing;
|
||||
const mem = std.mem;
|
||||
const Loop = std.event.Loop;
|
||||
|
||||
/// Thread-safe async/await lock.
|
||||
/// Functions which are waiting for the lock are suspended, and
|
||||
/// are resumed when the lock is released, in order.
|
||||
/// Allows only one actor to hold the lock.
|
||||
/// TODO: make this API also work in blocking I/O mode.
|
||||
pub const Lock = struct {
|
||||
mutex: std.Thread.Mutex = std.Thread.Mutex{},
|
||||
head: usize = UNLOCKED,
|
||||
|
||||
const UNLOCKED = 0;
|
||||
const LOCKED = 1;
|
||||
|
||||
const global_event_loop = Loop.instance orelse
|
||||
@compileError("std.event.Lock currently only works with event-based I/O");
|
||||
|
||||
const Waiter = struct {
|
||||
// forced Waiter alignment to ensure it doesn't clash with LOCKED
|
||||
next: ?*Waiter align(2),
|
||||
tail: *Waiter,
|
||||
node: Loop.NextTickNode,
|
||||
};
|
||||
|
||||
pub fn initLocked() Lock {
|
||||
return Lock{ .head = LOCKED };
|
||||
}
|
||||
|
||||
pub fn acquire(self: *Lock) Held {
|
||||
self.mutex.lock();
|
||||
|
||||
// self.head transitions from multiple stages depending on the value:
|
||||
// UNLOCKED -> LOCKED:
|
||||
// acquire Lock ownership when there are no waiters
|
||||
// LOCKED -> <Waiter head ptr>:
|
||||
// Lock is already owned, enqueue first Waiter
|
||||
// <head ptr> -> <head ptr>:
|
||||
// Lock is owned with pending waiters. Push our waiter to the queue.
|
||||
|
||||
if (self.head == UNLOCKED) {
|
||||
self.head = LOCKED;
|
||||
self.mutex.unlock();
|
||||
return Held{ .lock = self };
|
||||
}
|
||||
|
||||
var waiter: Waiter = undefined;
|
||||
waiter.next = null;
|
||||
waiter.tail = &waiter;
|
||||
|
||||
const head = switch (self.head) {
|
||||
UNLOCKED => unreachable,
|
||||
LOCKED => null,
|
||||
else => @as(*Waiter, @ptrFromInt(self.head)),
|
||||
};
|
||||
|
||||
if (head) |h| {
|
||||
h.tail.next = &waiter;
|
||||
h.tail = &waiter;
|
||||
} else {
|
||||
self.head = @intFromPtr(&waiter);
|
||||
}
|
||||
|
||||
suspend {
|
||||
waiter.node = Loop.NextTickNode{
|
||||
.prev = undefined,
|
||||
.next = undefined,
|
||||
.data = @frame(),
|
||||
};
|
||||
self.mutex.unlock();
|
||||
}
|
||||
|
||||
return Held{ .lock = self };
|
||||
}
|
||||
|
||||
pub const Held = struct {
|
||||
lock: *Lock,
|
||||
|
||||
pub fn release(self: Held) void {
|
||||
const waiter = blk: {
|
||||
self.lock.mutex.lock();
|
||||
defer self.lock.mutex.unlock();
|
||||
|
||||
// self.head goes through the reverse transition from acquire():
|
||||
// <head ptr> -> <new head ptr>:
|
||||
// pop a waiter from the queue to give Lock ownership when there are still others pending
|
||||
// <head ptr> -> LOCKED:
|
||||
// pop the laster waiter from the queue, while also giving it lock ownership when awaken
|
||||
// LOCKED -> UNLOCKED:
|
||||
// last lock owner releases lock while no one else is waiting for it
|
||||
|
||||
switch (self.lock.head) {
|
||||
UNLOCKED => {
|
||||
unreachable; // Lock unlocked while unlocking
|
||||
},
|
||||
LOCKED => {
|
||||
self.lock.head = UNLOCKED;
|
||||
break :blk null;
|
||||
},
|
||||
else => {
|
||||
const waiter = @as(*Waiter, @ptrFromInt(self.lock.head));
|
||||
self.lock.head = if (waiter.next == null) LOCKED else @intFromPtr(waiter.next);
|
||||
if (waiter.next) |next|
|
||||
next.tail = waiter.tail;
|
||||
break :blk waiter;
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
if (waiter) |w| {
|
||||
global_event_loop.onNextTick(&w.node);
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
test "std.event.Lock" {
|
||||
if (!std.io.is_async) return error.SkipZigTest;
|
||||
|
||||
// TODO https://github.com/ziglang/zig/issues/1908
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
// TODO https://github.com/ziglang/zig/issues/3251
|
||||
if (builtin.os.tag == .freebsd) return error.SkipZigTest;
|
||||
|
||||
var lock = Lock{};
|
||||
testLock(&lock);
|
||||
|
||||
const expected_result = [1]i32{3 * @as(i32, @intCast(shared_test_data.len))} ** shared_test_data.len;
|
||||
try testing.expectEqualSlices(i32, &expected_result, &shared_test_data);
|
||||
}
|
||||
fn testLock(lock: *Lock) void {
|
||||
var handle1 = async lockRunner(lock);
|
||||
var handle2 = async lockRunner(lock);
|
||||
var handle3 = async lockRunner(lock);
|
||||
|
||||
await handle1;
|
||||
await handle2;
|
||||
await handle3;
|
||||
}
|
||||
|
||||
var shared_test_data = [1]i32{0} ** 10;
|
||||
var shared_test_index: usize = 0;
|
||||
|
||||
fn lockRunner(lock: *Lock) void {
|
||||
Lock.global_event_loop.yield();
|
||||
|
||||
var i: usize = 0;
|
||||
while (i < shared_test_data.len) : (i += 1) {
|
||||
const handle = lock.acquire();
|
||||
defer handle.release();
|
||||
|
||||
shared_test_index = 0;
|
||||
while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
|
||||
shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
const std = @import("../std.zig");
|
||||
const Lock = std.event.Lock;
|
||||
|
||||
/// Thread-safe async/await lock that protects one piece of data.
|
||||
/// Functions which are waiting for the lock are suspended, and
|
||||
/// are resumed when the lock is released, in order.
|
||||
pub fn Locked(comptime T: type) type {
|
||||
return struct {
|
||||
lock: Lock,
|
||||
private_data: T,
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub const HeldLock = struct {
|
||||
value: *T,
|
||||
held: Lock.Held,
|
||||
|
||||
pub fn release(self: HeldLock) void {
|
||||
self.held.release();
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(data: T) Self {
|
||||
return Self{
|
||||
.lock = .{},
|
||||
.private_data = data,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
self.lock.deinit();
|
||||
}
|
||||
|
||||
pub fn acquire(self: *Self) callconv(.Async) HeldLock {
|
||||
return HeldLock{
|
||||
// TODO guaranteed allocation elision
|
||||
.held = self.lock.acquire(),
|
||||
.value = &self.private_data,
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -1,292 +0,0 @@
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const assert = std.debug.assert;
|
||||
const testing = std.testing;
|
||||
const mem = std.mem;
|
||||
const Loop = std.event.Loop;
|
||||
const Allocator = std.mem.Allocator;
|
||||
|
||||
/// Thread-safe async/await lock.
|
||||
/// Functions which are waiting for the lock are suspended, and
|
||||
/// are resumed when the lock is released, in order.
|
||||
/// Many readers can hold the lock at the same time; however locking for writing is exclusive.
|
||||
/// When a read lock is held, it will not be released until the reader queue is empty.
|
||||
/// When a write lock is held, it will not be released until the writer queue is empty.
|
||||
/// TODO: make this API also work in blocking I/O mode
|
||||
pub const RwLock = struct {
|
||||
shared_state: State,
|
||||
writer_queue: Queue,
|
||||
reader_queue: Queue,
|
||||
writer_queue_empty: bool,
|
||||
reader_queue_empty: bool,
|
||||
reader_lock_count: usize,
|
||||
|
||||
const State = enum(u8) {
|
||||
Unlocked,
|
||||
WriteLock,
|
||||
ReadLock,
|
||||
};
|
||||
|
||||
const Queue = std.atomic.Queue(anyframe);
|
||||
|
||||
const global_event_loop = Loop.instance orelse
|
||||
@compileError("std.event.RwLock currently only works with event-based I/O");
|
||||
|
||||
pub const HeldRead = struct {
|
||||
lock: *RwLock,
|
||||
|
||||
pub fn release(self: HeldRead) void {
|
||||
// If other readers still hold the lock, we're done.
|
||||
if (@atomicRmw(usize, &self.lock.reader_lock_count, .Sub, 1, .SeqCst) != 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
@atomicStore(bool, &self.lock.reader_queue_empty, true, .SeqCst);
|
||||
if (@cmpxchgStrong(State, &self.lock.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) {
|
||||
// Didn't unlock. Someone else's problem.
|
||||
return;
|
||||
}
|
||||
|
||||
self.lock.commonPostUnlock();
|
||||
}
|
||||
};
|
||||
|
||||
pub const HeldWrite = struct {
|
||||
lock: *RwLock,
|
||||
|
||||
pub fn release(self: HeldWrite) void {
|
||||
// See if we can leave it locked for writing, and pass the lock to the next writer
|
||||
// in the queue to grab the lock.
|
||||
if (self.lock.writer_queue.get()) |node| {
|
||||
global_event_loop.onNextTick(node);
|
||||
return;
|
||||
}
|
||||
|
||||
// We need to release the write lock. Check if any readers are waiting to grab the lock.
|
||||
if (!@atomicLoad(bool, &self.lock.reader_queue_empty, .SeqCst)) {
|
||||
// Switch to a read lock.
|
||||
@atomicStore(State, &self.lock.shared_state, .ReadLock, .SeqCst);
|
||||
while (self.lock.reader_queue.get()) |node| {
|
||||
global_event_loop.onNextTick(node);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@atomicStore(bool, &self.lock.writer_queue_empty, true, .SeqCst);
|
||||
@atomicStore(State, &self.lock.shared_state, .Unlocked, .SeqCst);
|
||||
|
||||
self.lock.commonPostUnlock();
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init() RwLock {
|
||||
return .{
|
||||
.shared_state = .Unlocked,
|
||||
.writer_queue = Queue.init(),
|
||||
.writer_queue_empty = true,
|
||||
.reader_queue = Queue.init(),
|
||||
.reader_queue_empty = true,
|
||||
.reader_lock_count = 0,
|
||||
};
|
||||
}
|
||||
|
||||
/// Must be called when not locked. Not thread safe.
|
||||
/// All calls to acquire() and release() must complete before calling deinit().
|
||||
pub fn deinit(self: *RwLock) void {
|
||||
assert(self.shared_state == .Unlocked);
|
||||
while (self.writer_queue.get()) |node| resume node.data;
|
||||
while (self.reader_queue.get()) |node| resume node.data;
|
||||
}
|
||||
|
||||
pub fn acquireRead(self: *RwLock) callconv(.Async) HeldRead {
|
||||
_ = @atomicRmw(usize, &self.reader_lock_count, .Add, 1, .SeqCst);
|
||||
|
||||
suspend {
|
||||
var my_tick_node = Loop.NextTickNode{
|
||||
.data = @frame(),
|
||||
.prev = undefined,
|
||||
.next = undefined,
|
||||
};
|
||||
|
||||
self.reader_queue.put(&my_tick_node);
|
||||
|
||||
// At this point, we are in the reader_queue, so we might have already been resumed.
|
||||
|
||||
// We set this bit so that later we can rely on the fact, that if reader_queue_empty == true,
|
||||
// some actor will attempt to grab the lock.
|
||||
@atomicStore(bool, &self.reader_queue_empty, false, .SeqCst);
|
||||
|
||||
// Here we don't care if we are the one to do the locking or if it was already locked for reading.
|
||||
const have_read_lock = if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == .ReadLock else true;
|
||||
if (have_read_lock) {
|
||||
// Give out all the read locks.
|
||||
if (self.reader_queue.get()) |first_node| {
|
||||
while (self.reader_queue.get()) |node| {
|
||||
global_event_loop.onNextTick(node);
|
||||
}
|
||||
resume first_node.data;
|
||||
}
|
||||
}
|
||||
}
|
||||
return HeldRead{ .lock = self };
|
||||
}
|
||||
|
||||
pub fn acquireWrite(self: *RwLock) callconv(.Async) HeldWrite {
|
||||
suspend {
|
||||
var my_tick_node = Loop.NextTickNode{
|
||||
.data = @frame(),
|
||||
.prev = undefined,
|
||||
.next = undefined,
|
||||
};
|
||||
|
||||
self.writer_queue.put(&my_tick_node);
|
||||
|
||||
// At this point, we are in the writer_queue, so we might have already been resumed.
|
||||
|
||||
// We set this bit so that later we can rely on the fact, that if writer_queue_empty == true,
|
||||
// some actor will attempt to grab the lock.
|
||||
@atomicStore(bool, &self.writer_queue_empty, false, .SeqCst);
|
||||
|
||||
// Here we must be the one to acquire the write lock. It cannot already be locked.
|
||||
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) == null) {
|
||||
// We now have a write lock.
|
||||
if (self.writer_queue.get()) |node| {
|
||||
// Whether this node is us or someone else, we tail resume it.
|
||||
resume node.data;
|
||||
}
|
||||
}
|
||||
}
|
||||
return HeldWrite{ .lock = self };
|
||||
}
|
||||
|
||||
fn commonPostUnlock(self: *RwLock) void {
|
||||
while (true) {
|
||||
// There might be a writer_queue item or a reader_queue item
|
||||
// If we check and both are empty, we can be done, because the other actors will try to
|
||||
// obtain the lock.
|
||||
// But if there's a writer_queue item or a reader_queue item,
|
||||
// we are the actor which must loop and attempt to grab the lock again.
|
||||
if (!@atomicLoad(bool, &self.writer_queue_empty, .SeqCst)) {
|
||||
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) != null) {
|
||||
// We did not obtain the lock. Great, the queues are someone else's problem.
|
||||
return;
|
||||
}
|
||||
// If there's an item in the writer queue, give them the lock, and we're done.
|
||||
if (self.writer_queue.get()) |node| {
|
||||
global_event_loop.onNextTick(node);
|
||||
return;
|
||||
}
|
||||
// Release the lock again.
|
||||
@atomicStore(bool, &self.writer_queue_empty, true, .SeqCst);
|
||||
@atomicStore(State, &self.shared_state, .Unlocked, .SeqCst);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!@atomicLoad(bool, &self.reader_queue_empty, .SeqCst)) {
|
||||
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst) != null) {
|
||||
// We did not obtain the lock. Great, the queues are someone else's problem.
|
||||
return;
|
||||
}
|
||||
// If there are any items in the reader queue, give out all the reader locks, and we're done.
|
||||
if (self.reader_queue.get()) |first_node| {
|
||||
global_event_loop.onNextTick(first_node);
|
||||
while (self.reader_queue.get()) |node| {
|
||||
global_event_loop.onNextTick(node);
|
||||
}
|
||||
return;
|
||||
}
|
||||
// Release the lock again.
|
||||
@atomicStore(bool, &self.reader_queue_empty, true, .SeqCst);
|
||||
if (@cmpxchgStrong(State, &self.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) {
|
||||
// Didn't unlock. Someone else's problem.
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
test "std.event.RwLock" {
|
||||
// https://github.com/ziglang/zig/issues/2377
|
||||
if (true) return error.SkipZigTest;
|
||||
|
||||
// https://github.com/ziglang/zig/issues/1908
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
// TODO provide a way to run tests in evented I/O mode
|
||||
if (!std.io.is_async) return error.SkipZigTest;
|
||||
|
||||
var lock = RwLock.init();
|
||||
defer lock.deinit();
|
||||
|
||||
_ = testLock(std.heap.page_allocator, &lock);
|
||||
|
||||
const expected_result = [1]i32{shared_it_count * @as(i32, @intCast(shared_test_data.len))} ** shared_test_data.len;
|
||||
try testing.expectEqualSlices(i32, expected_result, shared_test_data);
|
||||
}
|
||||
fn testLock(allocator: Allocator, lock: *RwLock) callconv(.Async) void {
|
||||
var read_nodes: [100]Loop.NextTickNode = undefined;
|
||||
for (&read_nodes) |*read_node| {
|
||||
const frame = allocator.create(@Frame(readRunner)) catch @panic("memory");
|
||||
read_node.data = frame;
|
||||
frame.* = async readRunner(lock);
|
||||
Loop.instance.?.onNextTick(read_node);
|
||||
}
|
||||
|
||||
var write_nodes: [shared_it_count]Loop.NextTickNode = undefined;
|
||||
for (&write_nodes) |*write_node| {
|
||||
const frame = allocator.create(@Frame(writeRunner)) catch @panic("memory");
|
||||
write_node.data = frame;
|
||||
frame.* = async writeRunner(lock);
|
||||
Loop.instance.?.onNextTick(write_node);
|
||||
}
|
||||
|
||||
for (&write_nodes) |*write_node| {
|
||||
const casted = @as(*const @Frame(writeRunner), @ptrCast(write_node.data));
|
||||
await casted;
|
||||
allocator.destroy(casted);
|
||||
}
|
||||
for (&read_nodes) |*read_node| {
|
||||
const casted = @as(*const @Frame(readRunner), @ptrCast(read_node.data));
|
||||
await casted;
|
||||
allocator.destroy(casted);
|
||||
}
|
||||
}
|
||||
|
||||
const shared_it_count = 10;
|
||||
var shared_test_data = [1]i32{0} ** 10;
|
||||
var shared_test_index: usize = 0;
|
||||
var shared_count: usize = 0;
|
||||
fn writeRunner(lock: *RwLock) callconv(.Async) void {
|
||||
suspend {} // resumed by onNextTick
|
||||
|
||||
var i: usize = 0;
|
||||
while (i < shared_test_data.len) : (i += 1) {
|
||||
std.time.sleep(100 * std.time.microsecond);
|
||||
const lock_promise = async lock.acquireWrite();
|
||||
const handle = await lock_promise;
|
||||
defer handle.release();
|
||||
|
||||
shared_count += 1;
|
||||
while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
|
||||
shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
|
||||
}
|
||||
shared_test_index = 0;
|
||||
}
|
||||
}
|
||||
fn readRunner(lock: *RwLock) callconv(.Async) void {
|
||||
suspend {} // resumed by onNextTick
|
||||
std.time.sleep(1);
|
||||
|
||||
var i: usize = 0;
|
||||
while (i < shared_test_data.len) : (i += 1) {
|
||||
const lock_promise = async lock.acquireRead();
|
||||
const handle = await lock_promise;
|
||||
defer handle.release();
|
||||
|
||||
try testing.expect(shared_test_index == 0);
|
||||
try testing.expect(shared_test_data[i] == @as(i32, @intCast(shared_count)));
|
||||
}
|
||||
}
|
@ -1,57 +0,0 @@
|
||||
const std = @import("../std.zig");
|
||||
const RwLock = std.event.RwLock;
|
||||
|
||||
/// Thread-safe async/await RW lock that protects one piece of data.
|
||||
/// Functions which are waiting for the lock are suspended, and
|
||||
/// are resumed when the lock is released, in order.
|
||||
pub fn RwLocked(comptime T: type) type {
|
||||
return struct {
|
||||
lock: RwLock,
|
||||
locked_data: T,
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub const HeldReadLock = struct {
|
||||
value: *const T,
|
||||
held: RwLock.HeldRead,
|
||||
|
||||
pub fn release(self: HeldReadLock) void {
|
||||
self.held.release();
|
||||
}
|
||||
};
|
||||
|
||||
pub const HeldWriteLock = struct {
|
||||
value: *T,
|
||||
held: RwLock.HeldWrite,
|
||||
|
||||
pub fn release(self: HeldWriteLock) void {
|
||||
self.held.release();
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(data: T) Self {
|
||||
return Self{
|
||||
.lock = RwLock.init(),
|
||||
.locked_data = data,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
self.lock.deinit();
|
||||
}
|
||||
|
||||
pub fn acquireRead(self: *Self) callconv(.Async) HeldReadLock {
|
||||
return HeldReadLock{
|
||||
.held = self.lock.acquireRead(),
|
||||
.value = &self.locked_data,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn acquireWrite(self: *Self) callconv(.Async) HeldWriteLock {
|
||||
return HeldWriteLock{
|
||||
.held = self.lock.acquireWrite(),
|
||||
.value = &self.locked_data,
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
@ -1,115 +0,0 @@
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const Loop = std.event.Loop;
|
||||
|
||||
/// A WaitGroup keeps track and waits for a group of async tasks to finish.
|
||||
/// Call `begin` when creating new tasks, and have tasks call `finish` when done.
|
||||
/// You can provide a count for both operations to perform them in bulk.
|
||||
/// Call `wait` to suspend until all tasks are completed.
|
||||
/// Multiple waiters are supported.
|
||||
///
|
||||
/// WaitGroup is an instance of WaitGroupGeneric, which takes in a bitsize
|
||||
/// for the internal counter. WaitGroup defaults to a `usize` counter.
|
||||
/// It's also possible to define a max value for the counter so that
|
||||
/// `begin` will return error.Overflow when the limit is reached, even
|
||||
/// if the integer type has not has not overflowed.
|
||||
/// By default `max_value` is set to std.math.maxInt(CounterType).
|
||||
pub const WaitGroup = WaitGroupGeneric(@bitSizeOf(usize));
|
||||
|
||||
pub fn WaitGroupGeneric(comptime counter_size: u16) type {
|
||||
const CounterType = std.meta.Int(.unsigned, counter_size);
|
||||
|
||||
const global_event_loop = Loop.instance orelse
|
||||
@compileError("std.event.WaitGroup currently only works with event-based I/O");
|
||||
|
||||
return struct {
|
||||
counter: CounterType = 0,
|
||||
max_counter: CounterType = std.math.maxInt(CounterType),
|
||||
mutex: std.Thread.Mutex = .{},
|
||||
waiters: ?*Waiter = null,
|
||||
const Waiter = struct {
|
||||
next: ?*Waiter,
|
||||
tail: *Waiter,
|
||||
node: Loop.NextTickNode,
|
||||
};
|
||||
|
||||
const Self = @This();
|
||||
pub fn begin(self: *Self, count: CounterType) error{Overflow}!void {
|
||||
self.mutex.lock();
|
||||
defer self.mutex.unlock();
|
||||
|
||||
const new_counter = try std.math.add(CounterType, self.counter, count);
|
||||
if (new_counter > self.max_counter) return error.Overflow;
|
||||
self.counter = new_counter;
|
||||
}
|
||||
|
||||
pub fn finish(self: *Self, count: CounterType) void {
|
||||
var waiters = blk: {
|
||||
self.mutex.lock();
|
||||
defer self.mutex.unlock();
|
||||
self.counter = std.math.sub(CounterType, self.counter, count) catch unreachable;
|
||||
if (self.counter == 0) {
|
||||
const temp = self.waiters;
|
||||
self.waiters = null;
|
||||
break :blk temp;
|
||||
}
|
||||
break :blk null;
|
||||
};
|
||||
|
||||
// We don't need to hold the lock to reschedule any potential waiter.
|
||||
while (waiters) |w| {
|
||||
const temp_w = w;
|
||||
waiters = w.next;
|
||||
global_event_loop.onNextTick(&temp_w.node);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(self: *Self) void {
|
||||
self.mutex.lock();
|
||||
|
||||
if (self.counter == 0) {
|
||||
self.mutex.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
var self_waiter: Waiter = undefined;
|
||||
self_waiter.node.data = @frame();
|
||||
if (self.waiters) |head| {
|
||||
head.tail.next = &self_waiter;
|
||||
head.tail = &self_waiter;
|
||||
} else {
|
||||
self.waiters = &self_waiter;
|
||||
self_waiter.tail = &self_waiter;
|
||||
self_waiter.next = null;
|
||||
}
|
||||
suspend {
|
||||
self.mutex.unlock();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test "basic WaitGroup usage" {
|
||||
if (!std.io.is_async) return error.SkipZigTest;
|
||||
|
||||
// TODO https://github.com/ziglang/zig/issues/1908
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
// TODO https://github.com/ziglang/zig/issues/3251
|
||||
if (builtin.os.tag == .freebsd) return error.SkipZigTest;
|
||||
|
||||
var initial_wg = WaitGroup{};
|
||||
var final_wg = WaitGroup{};
|
||||
|
||||
try initial_wg.begin(1);
|
||||
try final_wg.begin(1);
|
||||
var task_frame = async task(&initial_wg, &final_wg);
|
||||
initial_wg.finish(1);
|
||||
final_wg.wait();
|
||||
await task_frame;
|
||||
}
|
||||
|
||||
fn task(wg_i: *WaitGroup, wg_f: *WaitGroup) void {
|
||||
wg_i.wait();
|
||||
wg_f.finish(1);
|
||||
}
|
@ -31,8 +31,6 @@ pub const realpathW = os.realpathW;
|
||||
pub const getAppDataDir = @import("fs/get_app_data_dir.zig").getAppDataDir;
|
||||
pub const GetAppDataDirError = @import("fs/get_app_data_dir.zig").GetAppDataDirError;
|
||||
|
||||
pub const Watch = @import("fs/watch.zig").Watch;
|
||||
|
||||
/// This represents the maximum size of a UTF-8 encoded file path that the
|
||||
/// operating system will accept. Paths, including those returned from file
|
||||
/// system operations, may be longer than this length, but such paths cannot
|
||||
@ -641,5 +639,4 @@ test {
|
||||
_ = &path;
|
||||
_ = @import("fs/test.zig");
|
||||
_ = @import("fs/get_app_data_dir.zig");
|
||||
_ = @import("fs/watch.zig");
|
||||
}
|
||||
|
@ -1,719 +0,0 @@
|
||||
const std = @import("std");
|
||||
const builtin = @import("builtin");
|
||||
const event = std.event;
|
||||
const assert = std.debug.assert;
|
||||
const testing = std.testing;
|
||||
const os = std.os;
|
||||
const mem = std.mem;
|
||||
const windows = os.windows;
|
||||
const Loop = event.Loop;
|
||||
const fd_t = os.fd_t;
|
||||
const File = std.fs.File;
|
||||
const Allocator = mem.Allocator;
|
||||
|
||||
const global_event_loop = Loop.instance orelse
|
||||
@compileError("std.fs.Watch currently only works with event-based I/O");
|
||||
|
||||
const WatchEventId = enum {
|
||||
CloseWrite,
|
||||
Delete,
|
||||
};
|
||||
|
||||
const WatchEventError = error{
|
||||
UserResourceLimitReached,
|
||||
SystemResources,
|
||||
AccessDenied,
|
||||
Unexpected, // TODO remove this possibility
|
||||
};
|
||||
|
||||
pub fn Watch(comptime V: type) type {
|
||||
return struct {
|
||||
channel: event.Channel(Event.Error!Event),
|
||||
os_data: OsData,
|
||||
allocator: Allocator,
|
||||
|
||||
const OsData = switch (builtin.os.tag) {
|
||||
// TODO https://github.com/ziglang/zig/issues/3778
|
||||
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => KqOsData,
|
||||
.linux => LinuxOsData,
|
||||
.windows => WindowsOsData,
|
||||
|
||||
else => @compileError("Unsupported OS"),
|
||||
};
|
||||
|
||||
const KqOsData = struct {
|
||||
table_lock: event.Lock,
|
||||
file_table: FileTable,
|
||||
|
||||
const FileTable = std.StringHashMapUnmanaged(*Put);
|
||||
const Put = struct {
|
||||
putter_frame: @Frame(kqPutEvents),
|
||||
cancelled: bool = false,
|
||||
value: V,
|
||||
};
|
||||
};
|
||||
|
||||
const WindowsOsData = struct {
|
||||
table_lock: event.Lock,
|
||||
dir_table: DirTable,
|
||||
cancelled: bool = false,
|
||||
|
||||
const DirTable = std.StringHashMapUnmanaged(*Dir);
|
||||
const FileTable = std.StringHashMapUnmanaged(V);
|
||||
|
||||
const Dir = struct {
|
||||
putter_frame: @Frame(windowsDirReader),
|
||||
file_table: FileTable,
|
||||
dir_handle: os.windows.HANDLE,
|
||||
};
|
||||
};
|
||||
|
||||
const LinuxOsData = struct {
|
||||
putter_frame: @Frame(linuxEventPutter),
|
||||
inotify_fd: i32,
|
||||
wd_table: WdTable,
|
||||
table_lock: event.Lock,
|
||||
cancelled: bool = false,
|
||||
|
||||
const WdTable = std.AutoHashMapUnmanaged(i32, Dir);
|
||||
const FileTable = std.StringHashMapUnmanaged(V);
|
||||
|
||||
const Dir = struct {
|
||||
dirname: []const u8,
|
||||
file_table: FileTable,
|
||||
};
|
||||
};
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub const Event = struct {
|
||||
id: Id,
|
||||
data: V,
|
||||
dirname: []const u8,
|
||||
basename: []const u8,
|
||||
|
||||
pub const Id = WatchEventId;
|
||||
pub const Error = WatchEventError;
|
||||
};
|
||||
|
||||
pub fn init(allocator: Allocator, event_buf_count: usize) !*Self {
|
||||
const self = try allocator.create(Self);
|
||||
errdefer allocator.destroy(self);
|
||||
|
||||
switch (builtin.os.tag) {
|
||||
.linux => {
|
||||
const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC);
|
||||
errdefer os.close(inotify_fd);
|
||||
|
||||
self.* = Self{
|
||||
.allocator = allocator,
|
||||
.channel = undefined,
|
||||
.os_data = OsData{
|
||||
.putter_frame = undefined,
|
||||
.inotify_fd = inotify_fd,
|
||||
.wd_table = OsData.WdTable.init(allocator),
|
||||
.table_lock = event.Lock{},
|
||||
},
|
||||
};
|
||||
|
||||
const buf = try allocator.alloc(Event.Error!Event, event_buf_count);
|
||||
self.channel.init(buf);
|
||||
self.os_data.putter_frame = async self.linuxEventPutter();
|
||||
return self;
|
||||
},
|
||||
|
||||
.windows => {
|
||||
self.* = Self{
|
||||
.allocator = allocator,
|
||||
.channel = undefined,
|
||||
.os_data = OsData{
|
||||
.table_lock = event.Lock{},
|
||||
.dir_table = OsData.DirTable.init(allocator),
|
||||
},
|
||||
};
|
||||
|
||||
const buf = try allocator.alloc(Event.Error!Event, event_buf_count);
|
||||
self.channel.init(buf);
|
||||
return self;
|
||||
},
|
||||
|
||||
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
|
||||
self.* = Self{
|
||||
.allocator = allocator,
|
||||
.channel = undefined,
|
||||
.os_data = OsData{
|
||||
.table_lock = event.Lock{},
|
||||
.file_table = OsData.FileTable.init(allocator),
|
||||
},
|
||||
};
|
||||
|
||||
const buf = try allocator.alloc(Event.Error!Event, event_buf_count);
|
||||
self.channel.init(buf);
|
||||
return self;
|
||||
},
|
||||
else => @compileError("Unsupported OS"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
switch (builtin.os.tag) {
|
||||
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
|
||||
var it = self.os_data.file_table.iterator();
|
||||
while (it.next()) |entry| {
|
||||
const key = entry.key_ptr.*;
|
||||
const value = entry.value_ptr.*;
|
||||
value.cancelled = true;
|
||||
// @TODO Close the fd here?
|
||||
await value.putter_frame;
|
||||
self.allocator.free(key);
|
||||
self.allocator.destroy(value);
|
||||
}
|
||||
},
|
||||
.linux => {
|
||||
self.os_data.cancelled = true;
|
||||
{
|
||||
// Remove all directory watches linuxEventPutter will take care of
|
||||
// cleaning up the memory and closing the inotify fd.
|
||||
var dir_it = self.os_data.wd_table.keyIterator();
|
||||
while (dir_it.next()) |wd_key| {
|
||||
const rc = os.linux.inotify_rm_watch(self.os_data.inotify_fd, wd_key.*);
|
||||
// Errno can only be EBADF, EINVAL if either the inotify fs or the wd are invalid
|
||||
std.debug.assert(rc == 0);
|
||||
}
|
||||
}
|
||||
await self.os_data.putter_frame;
|
||||
},
|
||||
.windows => {
|
||||
self.os_data.cancelled = true;
|
||||
var dir_it = self.os_data.dir_table.iterator();
|
||||
while (dir_it.next()) |dir_entry| {
|
||||
if (windows.kernel32.CancelIoEx(dir_entry.value.dir_handle, null) != 0) {
|
||||
// We canceled the pending ReadDirectoryChangesW operation, but our
|
||||
// frame is still suspending, now waiting indefinitely.
|
||||
// Thus, it is safe to resume it ourslves
|
||||
resume dir_entry.value.putter_frame;
|
||||
} else {
|
||||
std.debug.assert(windows.kernel32.GetLastError() == .NOT_FOUND);
|
||||
// We are at another suspend point, we can await safely for the
|
||||
// function to exit the loop
|
||||
await dir_entry.value.putter_frame;
|
||||
}
|
||||
|
||||
self.allocator.free(dir_entry.key_ptr.*);
|
||||
var file_it = dir_entry.value.file_table.keyIterator();
|
||||
while (file_it.next()) |file_entry| {
|
||||
self.allocator.free(file_entry.*);
|
||||
}
|
||||
dir_entry.value.file_table.deinit(self.allocator);
|
||||
self.allocator.destroy(dir_entry.value_ptr.*);
|
||||
}
|
||||
self.os_data.dir_table.deinit(self.allocator);
|
||||
},
|
||||
else => @compileError("Unsupported OS"),
|
||||
}
|
||||
self.allocator.free(self.channel.buffer_nodes);
|
||||
self.channel.deinit();
|
||||
self.allocator.destroy(self);
|
||||
}
|
||||
|
||||
pub fn addFile(self: *Self, file_path: []const u8, value: V) !?V {
|
||||
switch (builtin.os.tag) {
|
||||
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => return addFileKEvent(self, file_path, value),
|
||||
.linux => return addFileLinux(self, file_path, value),
|
||||
.windows => return addFileWindows(self, file_path, value),
|
||||
else => @compileError("Unsupported OS"),
|
||||
}
|
||||
}
|
||||
|
||||
fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V {
|
||||
var realpath_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
|
||||
const realpath = try os.realpath(file_path, &realpath_buf);
|
||||
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
|
||||
const gop = try self.os_data.file_table.getOrPut(self.allocator, realpath);
|
||||
errdefer assert(self.os_data.file_table.remove(realpath));
|
||||
if (gop.found_existing) {
|
||||
const prev_value = gop.value_ptr.value;
|
||||
gop.value_ptr.value = value;
|
||||
return prev_value;
|
||||
}
|
||||
|
||||
gop.key_ptr.* = try self.allocator.dupe(u8, realpath);
|
||||
errdefer self.allocator.free(gop.key_ptr.*);
|
||||
gop.value_ptr.* = try self.allocator.create(OsData.Put);
|
||||
errdefer self.allocator.destroy(gop.value_ptr.*);
|
||||
gop.value_ptr.* = .{
|
||||
.putter_frame = undefined,
|
||||
.value = value,
|
||||
};
|
||||
|
||||
// @TODO Can I close this fd and get an error from bsdWaitKev?
|
||||
const flags = if (comptime builtin.target.isDarwin()) os.O.SYMLINK | os.O.EVTONLY else 0;
|
||||
const fd = try os.open(realpath, flags, 0);
|
||||
gop.value_ptr.putter_frame = async self.kqPutEvents(fd, gop.key_ptr.*, gop.value_ptr.*);
|
||||
return null;
|
||||
}
|
||||
|
||||
fn kqPutEvents(self: *Self, fd: os.fd_t, file_path: []const u8, put: *OsData.Put) void {
|
||||
global_event_loop.beginOneEvent();
|
||||
defer {
|
||||
global_event_loop.finishOneEvent();
|
||||
// @TODO: Remove this if we force close otherwise
|
||||
os.close(fd);
|
||||
}
|
||||
|
||||
// We need to manually do a bsdWaitKev to access the fflags.
|
||||
var resume_node = event.Loop.ResumeNode.Basic{
|
||||
.base = .{
|
||||
.id = .Basic,
|
||||
.handle = @frame(),
|
||||
.overlapped = event.Loop.ResumeNode.overlapped_init,
|
||||
},
|
||||
.kev = undefined,
|
||||
};
|
||||
|
||||
var kevs = [1]os.Kevent{undefined};
|
||||
const kev = &kevs[0];
|
||||
|
||||
while (!put.cancelled) {
|
||||
kev.* = os.Kevent{
|
||||
.ident = @as(usize, @intCast(fd)),
|
||||
.filter = os.EVFILT_VNODE,
|
||||
.flags = os.EV_ADD | os.EV_ENABLE | os.EV_CLEAR | os.EV_ONESHOT |
|
||||
os.NOTE_WRITE | os.NOTE_DELETE | os.NOTE_REVOKE,
|
||||
.fflags = 0,
|
||||
.data = 0,
|
||||
.udata = @intFromPtr(&resume_node.base),
|
||||
};
|
||||
suspend {
|
||||
global_event_loop.beginOneEvent();
|
||||
errdefer global_event_loop.finishOneEvent();
|
||||
|
||||
const empty_kevs = &[0]os.Kevent{};
|
||||
_ = os.kevent(global_event_loop.os_data.kqfd, &kevs, empty_kevs, null) catch |err| switch (err) {
|
||||
error.EventNotFound,
|
||||
error.ProcessNotFound,
|
||||
error.Overflow,
|
||||
=> unreachable,
|
||||
error.AccessDenied, error.SystemResources => |e| {
|
||||
self.channel.put(e);
|
||||
continue;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
if (kev.flags & os.EV_ERROR != 0) {
|
||||
self.channel.put(os.unexpectedErrno(os.errno(kev.data)));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (kev.fflags & os.NOTE_DELETE != 0 or kev.fflags & os.NOTE_REVOKE != 0) {
|
||||
self.channel.put(Self.Event{
|
||||
.id = .Delete,
|
||||
.data = put.value,
|
||||
.dirname = std.fs.path.dirname(file_path) orelse "/",
|
||||
.basename = std.fs.path.basename(file_path),
|
||||
});
|
||||
} else if (kev.fflags & os.NOTE_WRITE != 0) {
|
||||
self.channel.put(Self.Event{
|
||||
.id = .CloseWrite,
|
||||
.data = put.value,
|
||||
.dirname = std.fs.path.dirname(file_path) orelse "/",
|
||||
.basename = std.fs.path.basename(file_path),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V {
|
||||
const dirname = std.fs.path.dirname(file_path) orelse if (file_path[0] == '/') "/" else ".";
|
||||
const basename = std.fs.path.basename(file_path);
|
||||
|
||||
const wd = try os.inotify_add_watch(
|
||||
self.os_data.inotify_fd,
|
||||
dirname,
|
||||
os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_DELETE | os.linux.IN_EXCL_UNLINK,
|
||||
);
|
||||
// wd is either a newly created watch or an existing one.
|
||||
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
|
||||
const gop = try self.os_data.wd_table.getOrPut(self.allocator, wd);
|
||||
errdefer assert(self.os_data.wd_table.remove(wd));
|
||||
if (!gop.found_existing) {
|
||||
gop.value_ptr.* = OsData.Dir{
|
||||
.dirname = try self.allocator.dupe(u8, dirname),
|
||||
.file_table = OsData.FileTable.init(self.allocator),
|
||||
};
|
||||
}
|
||||
|
||||
const dir = gop.value_ptr;
|
||||
const file_table_gop = try dir.file_table.getOrPut(self.allocator, basename);
|
||||
errdefer assert(dir.file_table.remove(basename));
|
||||
if (file_table_gop.found_existing) {
|
||||
const prev_value = file_table_gop.value_ptr.*;
|
||||
file_table_gop.value_ptr.* = value;
|
||||
return prev_value;
|
||||
} else {
|
||||
file_table_gop.key_ptr.* = try self.allocator.dupe(u8, basename);
|
||||
file_table_gop.value_ptr.* = value;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V {
|
||||
// TODO we might need to convert dirname and basename to canonical file paths ("short"?)
|
||||
const dirname = std.fs.path.dirname(file_path) orelse if (file_path[0] == '/') "/" else ".";
|
||||
var dirname_path_space: windows.PathSpace = undefined;
|
||||
dirname_path_space.len = try std.unicode.utf8ToUtf16Le(&dirname_path_space.data, dirname);
|
||||
dirname_path_space.data[dirname_path_space.len] = 0;
|
||||
|
||||
const basename = std.fs.path.basename(file_path);
|
||||
var basename_path_space: windows.PathSpace = undefined;
|
||||
basename_path_space.len = try std.unicode.utf8ToUtf16Le(&basename_path_space.data, basename);
|
||||
basename_path_space.data[basename_path_space.len] = 0;
|
||||
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
|
||||
const gop = try self.os_data.dir_table.getOrPut(self.allocator, dirname);
|
||||
errdefer assert(self.os_data.dir_table.remove(dirname));
|
||||
if (gop.found_existing) {
|
||||
const dir = gop.value_ptr.*;
|
||||
|
||||
const file_gop = try dir.file_table.getOrPut(self.allocator, basename);
|
||||
errdefer assert(dir.file_table.remove(basename));
|
||||
if (file_gop.found_existing) {
|
||||
const prev_value = file_gop.value_ptr.*;
|
||||
file_gop.value_ptr.* = value;
|
||||
return prev_value;
|
||||
} else {
|
||||
file_gop.value_ptr.* = value;
|
||||
file_gop.key_ptr.* = try self.allocator.dupe(u8, basename);
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
const dir_handle = try windows.OpenFile(dirname_path_space.span(), .{
|
||||
.dir = std.fs.cwd().fd,
|
||||
.access_mask = windows.FILE_LIST_DIRECTORY,
|
||||
.creation = windows.FILE_OPEN,
|
||||
.io_mode = .evented,
|
||||
.filter = .dir_only,
|
||||
});
|
||||
errdefer windows.CloseHandle(dir_handle);
|
||||
|
||||
const dir = try self.allocator.create(OsData.Dir);
|
||||
errdefer self.allocator.destroy(dir);
|
||||
|
||||
gop.key_ptr.* = try self.allocator.dupe(u8, dirname);
|
||||
errdefer self.allocator.free(gop.key_ptr.*);
|
||||
|
||||
dir.* = OsData.Dir{
|
||||
.file_table = OsData.FileTable.init(self.allocator),
|
||||
.putter_frame = undefined,
|
||||
.dir_handle = dir_handle,
|
||||
};
|
||||
gop.value_ptr.* = dir;
|
||||
try dir.file_table.put(self.allocator, try self.allocator.dupe(u8, basename), value);
|
||||
dir.putter_frame = async self.windowsDirReader(dir, gop.key_ptr.*);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
fn windowsDirReader(self: *Self, dir: *OsData.Dir, dirname: []const u8) void {
|
||||
defer os.close(dir.dir_handle);
|
||||
var resume_node = Loop.ResumeNode.Basic{
|
||||
.base = Loop.ResumeNode{
|
||||
.id = .Basic,
|
||||
.handle = @frame(),
|
||||
.overlapped = windows.OVERLAPPED{
|
||||
.Internal = 0,
|
||||
.InternalHigh = 0,
|
||||
.DUMMYUNIONNAME = .{
|
||||
.DUMMYSTRUCTNAME = .{
|
||||
.Offset = 0,
|
||||
.OffsetHigh = 0,
|
||||
},
|
||||
},
|
||||
.hEvent = null,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined;
|
||||
|
||||
global_event_loop.beginOneEvent();
|
||||
defer global_event_loop.finishOneEvent();
|
||||
|
||||
while (!self.os_data.cancelled) main_loop: {
|
||||
suspend {
|
||||
_ = windows.kernel32.ReadDirectoryChangesW(
|
||||
dir.dir_handle,
|
||||
&event_buf,
|
||||
event_buf.len,
|
||||
windows.FALSE, // watch subtree
|
||||
windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME |
|
||||
windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE |
|
||||
windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS |
|
||||
windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY,
|
||||
null, // number of bytes transferred (unused for async)
|
||||
&resume_node.base.overlapped,
|
||||
null, // completion routine - unused because we use IOCP
|
||||
);
|
||||
}
|
||||
|
||||
var bytes_transferred: windows.DWORD = undefined;
|
||||
if (windows.kernel32.GetOverlappedResult(
|
||||
dir.dir_handle,
|
||||
&resume_node.base.overlapped,
|
||||
&bytes_transferred,
|
||||
windows.FALSE,
|
||||
) == 0) {
|
||||
const potential_error = windows.kernel32.GetLastError();
|
||||
const err = switch (potential_error) {
|
||||
.OPERATION_ABORTED, .IO_INCOMPLETE => err_blk: {
|
||||
if (self.os_data.cancelled)
|
||||
break :main_loop
|
||||
else
|
||||
break :err_blk windows.unexpectedError(potential_error);
|
||||
},
|
||||
else => |err| windows.unexpectedError(err),
|
||||
};
|
||||
self.channel.put(err);
|
||||
} else {
|
||||
var ptr: [*]u8 = &event_buf;
|
||||
const end_ptr = ptr + bytes_transferred;
|
||||
while (@intFromPtr(ptr) < @intFromPtr(end_ptr)) {
|
||||
const ev = @as(*const windows.FILE_NOTIFY_INFORMATION, @ptrCast(ptr));
|
||||
const emit = switch (ev.Action) {
|
||||
windows.FILE_ACTION_REMOVED => WatchEventId.Delete,
|
||||
windows.FILE_ACTION_MODIFIED => .CloseWrite,
|
||||
else => null,
|
||||
};
|
||||
if (emit) |id| {
|
||||
const basename_ptr = @as([*]u16, @ptrCast(ptr + @sizeOf(windows.FILE_NOTIFY_INFORMATION)));
|
||||
const basename_utf16le = basename_ptr[0 .. ev.FileNameLength / 2];
|
||||
var basename_data: [std.fs.MAX_PATH_BYTES]u8 = undefined;
|
||||
const basename = basename_data[0 .. std.unicode.utf16leToUtf8(&basename_data, basename_utf16le) catch unreachable];
|
||||
|
||||
if (dir.file_table.getEntry(basename)) |entry| {
|
||||
self.channel.put(Event{
|
||||
.id = id,
|
||||
.data = entry.value_ptr.*,
|
||||
.dirname = dirname,
|
||||
.basename = entry.key_ptr.*,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (ev.NextEntryOffset == 0) break;
|
||||
ptr = @alignCast(ptr + ev.NextEntryOffset);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn removeFile(self: *Self, file_path: []const u8) !?V {
|
||||
switch (builtin.os.tag) {
|
||||
.linux => {
|
||||
const dirname = std.fs.path.dirname(file_path) orelse if (file_path[0] == '/') "/" else ".";
|
||||
const basename = std.fs.path.basename(file_path);
|
||||
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
|
||||
const dir = self.os_data.wd_table.get(dirname) orelse return null;
|
||||
if (dir.file_table.fetchRemove(basename)) |file_entry| {
|
||||
self.allocator.free(file_entry.key);
|
||||
return file_entry.value;
|
||||
}
|
||||
return null;
|
||||
},
|
||||
.windows => {
|
||||
const dirname = std.fs.path.dirname(file_path) orelse if (file_path[0] == '/') "/" else ".";
|
||||
const basename = std.fs.path.basename(file_path);
|
||||
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
|
||||
const dir = self.os_data.dir_table.get(dirname) orelse return null;
|
||||
if (dir.file_table.fetchRemove(basename)) |file_entry| {
|
||||
self.allocator.free(file_entry.key);
|
||||
return file_entry.value;
|
||||
}
|
||||
return null;
|
||||
},
|
||||
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
|
||||
var realpath_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
|
||||
const realpath = try os.realpath(file_path, &realpath_buf);
|
||||
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
|
||||
const entry = self.os_data.file_table.getEntry(realpath) orelse return null;
|
||||
entry.value_ptr.cancelled = true;
|
||||
// @TODO Close the fd here?
|
||||
await entry.value_ptr.putter_frame;
|
||||
self.allocator.free(entry.key_ptr.*);
|
||||
self.allocator.destroy(entry.value_ptr.*);
|
||||
|
||||
assert(self.os_data.file_table.remove(realpath));
|
||||
},
|
||||
else => @compileError("Unsupported OS"),
|
||||
}
|
||||
}
|
||||
|
||||
fn linuxEventPutter(self: *Self) void {
|
||||
global_event_loop.beginOneEvent();
|
||||
|
||||
defer {
|
||||
std.debug.assert(self.os_data.wd_table.count() == 0);
|
||||
self.os_data.wd_table.deinit(self.allocator);
|
||||
os.close(self.os_data.inotify_fd);
|
||||
self.allocator.free(self.channel.buffer_nodes);
|
||||
self.channel.deinit();
|
||||
global_event_loop.finishOneEvent();
|
||||
}
|
||||
|
||||
var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined;
|
||||
|
||||
while (!self.os_data.cancelled) {
|
||||
const bytes_read = global_event_loop.read(self.os_data.inotify_fd, &event_buf, false) catch unreachable;
|
||||
|
||||
var ptr: [*]u8 = &event_buf;
|
||||
const end_ptr = ptr + bytes_read;
|
||||
while (@intFromPtr(ptr) < @intFromPtr(end_ptr)) {
|
||||
const ev = @as(*const os.linux.inotify_event, @ptrCast(ptr));
|
||||
if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
|
||||
const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
|
||||
const basename = std.mem.span(@as([*:0]u8, @ptrCast(basename_ptr)));
|
||||
|
||||
const dir = &self.os_data.wd_table.get(ev.wd).?;
|
||||
if (dir.file_table.getEntry(basename)) |file_value| {
|
||||
self.channel.put(Event{
|
||||
.id = .CloseWrite,
|
||||
.data = file_value.value_ptr.*,
|
||||
.dirname = dir.dirname,
|
||||
.basename = file_value.key_ptr.*,
|
||||
});
|
||||
}
|
||||
} else if (ev.mask & os.linux.IN_IGNORED == os.linux.IN_IGNORED) {
|
||||
// Directory watch was removed
|
||||
const held = self.os_data.table_lock.acquire();
|
||||
defer held.release();
|
||||
if (self.os_data.wd_table.fetchRemove(ev.wd)) |wd_entry| {
|
||||
var file_it = wd_entry.value.file_table.keyIterator();
|
||||
while (file_it.next()) |file_entry| {
|
||||
self.allocator.free(file_entry.*);
|
||||
}
|
||||
self.allocator.free(wd_entry.value.dirname);
|
||||
wd_entry.value.file_table.deinit(self.allocator);
|
||||
}
|
||||
} else if (ev.mask & os.linux.IN_DELETE == os.linux.IN_DELETE) {
|
||||
// File or directory was removed or deleted
|
||||
const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
|
||||
const basename = std.mem.span(@as([*:0]u8, @ptrCast(basename_ptr)));
|
||||
|
||||
const dir = &self.os_data.wd_table.get(ev.wd).?;
|
||||
if (dir.file_table.getEntry(basename)) |file_value| {
|
||||
self.channel.put(Event{
|
||||
.id = .Delete,
|
||||
.data = file_value.value_ptr.*,
|
||||
.dirname = dir.dirname,
|
||||
.basename = file_value.key_ptr.*,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
ptr = @alignCast(ptr + @sizeOf(os.linux.inotify_event) + ev.len);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
const test_tmp_dir = "std_event_fs_test";
|
||||
|
||||
test "write a file, watch it, write it again, delete it" {
|
||||
if (!std.io.is_async) return error.SkipZigTest;
|
||||
// TODO https://github.com/ziglang/zig/issues/1908
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
try std.fs.cwd().makePath(test_tmp_dir);
|
||||
defer std.fs.cwd().deleteTree(test_tmp_dir) catch {};
|
||||
|
||||
return testWriteWatchWriteDelete(std.testing.allocator);
|
||||
}
|
||||
|
||||
fn testWriteWatchWriteDelete(allocator: Allocator) !void {
|
||||
const file_path = try std.fs.path.join(allocator, &[_][]const u8{ test_tmp_dir, "file.txt" });
|
||||
defer allocator.free(file_path);
|
||||
|
||||
const contents =
|
||||
\\line 1
|
||||
\\line 2
|
||||
;
|
||||
const line2_offset = 7;
|
||||
|
||||
// first just write then read the file
|
||||
try std.fs.cwd().writeFile(file_path, contents);
|
||||
|
||||
const read_contents = try std.fs.cwd().readFileAlloc(allocator, file_path, 1024 * 1024);
|
||||
defer allocator.free(read_contents);
|
||||
try testing.expectEqualSlices(u8, contents, read_contents);
|
||||
|
||||
// now watch the file
|
||||
var watch = try Watch(void).init(allocator, 0);
|
||||
defer watch.deinit();
|
||||
|
||||
try testing.expect((try watch.addFile(file_path, {})) == null);
|
||||
|
||||
var ev = async watch.channel.get();
|
||||
var ev_consumed = false;
|
||||
defer if (!ev_consumed) {
|
||||
_ = await ev;
|
||||
};
|
||||
|
||||
// overwrite line 2
|
||||
const file = try std.fs.cwd().openFile(file_path, .{ .mode = .read_write });
|
||||
{
|
||||
defer file.close();
|
||||
const write_contents = "lorem ipsum";
|
||||
var iovec = [_]os.iovec_const{.{
|
||||
.iov_base = write_contents,
|
||||
.iov_len = write_contents.len,
|
||||
}};
|
||||
_ = try file.pwritevAll(&iovec, line2_offset);
|
||||
}
|
||||
|
||||
switch ((try await ev).id) {
|
||||
.CloseWrite => {
|
||||
ev_consumed = true;
|
||||
},
|
||||
.Delete => @panic("wrong event"),
|
||||
}
|
||||
|
||||
const contents_updated = try std.fs.cwd().readFileAlloc(allocator, file_path, 1024 * 1024);
|
||||
defer allocator.free(contents_updated);
|
||||
|
||||
try testing.expectEqualSlices(u8,
|
||||
\\line 1
|
||||
\\lorem ipsum
|
||||
, contents_updated);
|
||||
|
||||
ev = async watch.channel.get();
|
||||
ev_consumed = false;
|
||||
|
||||
try std.fs.cwd().deleteFile(file_path);
|
||||
switch ((try await ev).id) {
|
||||
.Delete => {
|
||||
ev_consumed = true;
|
||||
},
|
||||
.CloseWrite => @panic("wrong event"),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Test: Add another file watch, remove the old file watch, get an event in the new
|
@ -92,9 +92,6 @@ pub const elf = @import("elf.zig");
|
||||
/// Enum-related metaprogramming helpers.
|
||||
pub const enums = @import("enums.zig");
|
||||
|
||||
/// Evented I/O data structures.
|
||||
pub const event = @import("event.zig");
|
||||
|
||||
/// First in, first out data structures.
|
||||
pub const fifo = @import("fifo.zig");
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user