zig/lib/std/io/in_stream.zig

277 lines
11 KiB
Zig
Raw Normal View History

const std = @import("../std.zig");
const builtin = @import("builtin");
const root = @import("root");
const math = std.math;
const assert = std.debug.assert;
const mem = std.mem;
const Buffer = std.Buffer;
const testing = std.testing;
improvements targeted at improving async functions * Reuse bytes of async function frames when non-async functions make `noasync` calls. This prevents explosive stack growth. * Zig now passes a stack size argument to the linker when linking ELF binaries. Linux ignores this value, but it is available as a program header called GNU_STACK. I prototyped some code that memory maps extra space to the stack using this program header, but there was still a problem when accessing stack memory very far down. Stack probing is needed or not working or something. I also prototyped using `@newStackCall` to call main and that does work around the issue but it also brings its own issues. That code is commented out for now in std/special/start.zig. I'm on a plane with no Internet, but I plan to consult with the musl community for advice when I get a chance. * Added `noasync` to a bunch of function calls in std.debug. It's very messy but it's a workaround that makes stack traces functional with evented I/O enabled. Eventually these will be cleaned up as the root bugs are found and fixed. Programs built in blocking mode are unaffected. * Lowered the default stack size of std.io.InStream (for the async version) to 1 MiB instead of 4. Until we figure out how to get choosing a stack size working (see 2nd bullet point above), 4 MiB tends to cause segfaults due to stack size running out, or usage of stack memory too far apart, or something like that. * Default thread stack size is bumped from 8 MiB to 16 to match the size we give for the main thread. It's planned to eventually remove this hard coded value and have Zig able to determine this value during semantic analysis, with call graph analysis and function pointer annotations and extern function annotations.
2019-09-12 01:22:49 +01:00
pub const default_stack_size = 1 * 1024 * 1024;
pub const stack_size: usize = if (@hasDecl(root, "stack_size_std_io_InStream"))
root.stack_size_std_io_InStream
else
default_stack_size;
pub fn InStream(comptime ReadError: type) type {
return struct {
const Self = @This();
pub const Error = ReadError;
pub const ReadFn = if (std.io.is_async)
async fn (self: *Self, buffer: []u8) Error!usize
else
fn (self: *Self, buffer: []u8) Error!usize;
/// Returns the number of bytes read. It may be less than buffer.len.
/// If the number of bytes read is 0, it means end of stream.
/// End of stream is not an error condition.
readFn: ReadFn,
/// Returns the number of bytes read. It may be less than buffer.len.
/// If the number of bytes read is 0, it means end of stream.
/// End of stream is not an error condition.
pub fn read(self: *Self, buffer: []u8) Error!usize {
if (std.io.is_async) {
// Let's not be writing 0xaa in safe modes for upwards of 4 MiB for every stream read.
@setRuntimeSafety(false);
std lib networking improvements, especially non-blocking I/O * delete the std/event/net directory * `std.event.Loop.waitUntilFdReadable` and related functions no longer have possibility of failure. On Linux, they fall back to poll() and then fall back to sleep(). * add some missing `noasync` decorations in `std.event.Loop` * redo the `std.net.Server` API. it's quite nice now, but shutdown does not work cleanly. There is a race condition with close() that I am actively working on. * move `std.io.OutStream` to its own file to match `std.io.InStream`. I started working on making `write` integrated with evented I/O, but it got tricky so I backed off and filed #3557. However I did integrate `std.os.writev` and `std.os.pwritev` with evented I/O. * add `std.Target.stack_align` * move networking tests to `lib/std/net/test.zig` * add `std.net.tcpConnectToHost` and `std.net.tcpConnectToAddress`. * rename `error.UnknownName` to `error.UnknownHostName` within the context of DNS resolution. * add `std.os.readv`, which is integrated with evented I/O. * `std.os.preadv`, is now integrated with evented I/O. * `std.os.accept4` now asserts that ENOTSOCK and EOPNOTSUPP never occur (misuse of API), instead of returning errors. * `std.os.connect` is now integrated with evented I/O. `std.os.connect_async` is gone. Just use `std.os.connect`. * fix false positive dependency loop regarding async function frames * add more compile notes to help when dependency loops occur in determining whether a function is async. * ir: change an assert to ir_assert to make it easier to find workarounds for when such an assert is triggered. In this case it was trying to parse an IPv4 address at comptime.
2019-10-30 02:59:30 +00:00
var stack_frame: [stack_size]u8 align(std.Target.stack_align) = undefined;
return await @asyncCall(&stack_frame, {}, self.readFn, self, buffer);
} else {
return self.readFn(self, buffer);
}
}
/// Returns the number of bytes read. If the number read is smaller than buf.len, it
/// means the stream reached the end. Reaching the end of a stream is not an error
/// condition.
pub fn readFull(self: *Self, buffer: []u8) Error!usize {
var index: usize = 0;
while (index != buffer.len) {
const amt = try self.read(buffer[index..]);
if (amt == 0) return index;
index += amt;
}
return index;
}
/// Returns the number of bytes read. If the number read would be smaller than buf.len,
/// error.EndOfStream is returned instead.
pub fn readNoEof(self: *Self, buf: []u8) !void {
const amt_read = try self.readFull(buf);
if (amt_read < buf.len) return error.EndOfStream;
}
/// Replaces `buffer` contents by reading from the stream until it is finished.
/// If `buffer.len()` would exceed `max_size`, `error.StreamTooLong` is returned and
/// the contents read from the stream are lost.
pub fn readAllBuffer(self: *Self, buffer: *Buffer, max_size: usize) !void {
try buffer.resize(0);
var actual_buf_len: usize = 0;
while (true) {
const dest_slice = buffer.toSlice()[actual_buf_len..];
const bytes_read = try self.readFull(dest_slice);
actual_buf_len += bytes_read;
if (bytes_read != dest_slice.len) {
buffer.shrink(actual_buf_len);
return;
}
const new_buf_size = math.min(max_size, actual_buf_len + mem.page_size);
if (new_buf_size == actual_buf_len) return error.StreamTooLong;
try buffer.resize(new_buf_size);
}
}
/// Allocates enough memory to hold all the contents of the stream. If the allocated
/// memory would be greater than `max_size`, returns `error.StreamTooLong`.
/// Caller owns returned memory.
/// If this function returns an error, the contents from the stream read so far are lost.
pub fn readAllAlloc(self: *Self, allocator: *mem.Allocator, max_size: usize) ![]u8 {
var buf = Buffer.initNull(allocator);
defer buf.deinit();
try self.readAllBuffer(&buf, max_size);
return buf.toOwnedSlice();
}
/// Replaces `buffer` contents by reading from the stream until `delimiter` is found.
/// Does not include the delimiter in the result.
/// If `buffer.len()` would exceed `max_size`, `error.StreamTooLong` is returned and the contents
/// read from the stream so far are lost.
pub fn readUntilDelimiterBuffer(self: *Self, buffer: *Buffer, delimiter: u8, max_size: usize) !void {
try buffer.resize(0);
while (true) {
var byte: u8 = try self.readByte();
if (byte == delimiter) {
return;
}
if (buffer.len() == max_size) {
return error.StreamTooLong;
}
try buffer.appendByte(byte);
}
}
/// Allocates enough memory to read until `delimiter`. If the allocated
/// memory would be greater than `max_size`, returns `error.StreamTooLong`.
/// Caller owns returned memory.
/// If this function returns an error, the contents from the stream read so far are lost.
pub fn readUntilDelimiterAlloc(self: *Self, allocator: *mem.Allocator, delimiter: u8, max_size: usize) ![]u8 {
var buf = Buffer.initNull(allocator);
defer buf.deinit();
try self.readUntilDelimiterBuffer(&buf, delimiter, max_size);
return buf.toOwnedSlice();
}
/// Reads from the stream until specified byte is found. If the buffer is not
/// large enough to hold the entire contents, `error.StreamTooLong` is returned.
/// If end-of-stream is found, returns the rest of the stream. If this
/// function is called again after that, returns null.
/// Returns a slice of the stream data, with ptr equal to `buf.ptr`. The
/// delimiter byte is not included in the returned slice.
pub fn readUntilDelimiterOrEof(self: *Self, buf: []u8, delimiter: u8) !?[]u8 {
var index: usize = 0;
while (true) {
const byte = self.readByte() catch |err| switch (err) {
error.EndOfStream => {
if (index == 0) {
return null;
} else {
return buf[0..index];
}
},
else => |e| return e,
};
if (byte == delimiter) return buf[0..index];
if (index >= buf.len) return error.StreamTooLong;
buf[index] = byte;
index += 1;
}
}
/// Reads from the stream until specified byte is found, discarding all data,
/// including the delimiter.
/// If end-of-stream is found, this function succeeds.
pub fn skipUntilDelimiterOrEof(self: *Self, delimiter: u8) !void {
while (true) {
const byte = self.readByte() catch |err| switch (err) {
error.EndOfStream => return,
else => |e| return e,
};
if (byte == delimiter) return;
}
}
/// Reads 1 byte from the stream or returns `error.EndOfStream`.
pub fn readByte(self: *Self) !u8 {
var result: [1]u8 = undefined;
const amt_read = try self.read(result[0..]);
if (amt_read < 1) return error.EndOfStream;
return result[0];
}
/// Same as `readByte` except the returned byte is signed.
pub fn readByteSigned(self: *Self) !i8 {
return @bitCast(i8, try self.readByte());
}
/// Reads a native-endian integer
pub fn readIntNative(self: *Self, comptime T: type) !T {
var bytes: [(T.bit_count + 7) / 8]u8 = undefined;
try self.readNoEof(bytes[0..]);
return mem.readIntNative(T, &bytes);
}
/// Reads a foreign-endian integer
pub fn readIntForeign(self: *Self, comptime T: type) !T {
var bytes: [(T.bit_count + 7) / 8]u8 = undefined;
try self.readNoEof(bytes[0..]);
return mem.readIntForeign(T, &bytes);
}
pub fn readIntLittle(self: *Self, comptime T: type) !T {
var bytes: [(T.bit_count + 7) / 8]u8 = undefined;
try self.readNoEof(bytes[0..]);
return mem.readIntLittle(T, &bytes);
}
pub fn readIntBig(self: *Self, comptime T: type) !T {
var bytes: [(T.bit_count + 7) / 8]u8 = undefined;
try self.readNoEof(bytes[0..]);
return mem.readIntBig(T, &bytes);
}
pub fn readInt(self: *Self, comptime T: type, endian: builtin.Endian) !T {
var bytes: [(T.bit_count + 7) / 8]u8 = undefined;
try self.readNoEof(bytes[0..]);
return mem.readInt(T, &bytes, endian);
}
pub fn readVarInt(self: *Self, comptime ReturnType: type, endian: builtin.Endian, size: usize) !ReturnType {
assert(size <= @sizeOf(ReturnType));
var bytes_buf: [@sizeOf(ReturnType)]u8 = undefined;
const bytes = bytes_buf[0..size];
try self.readNoEof(bytes);
return mem.readVarInt(ReturnType, bytes, endian);
}
pub fn skipBytes(self: *Self, num_bytes: u64) !void {
var i: u64 = 0;
while (i < num_bytes) : (i += 1) {
_ = try self.readByte();
}
}
pub fn readStruct(self: *Self, comptime T: type) !T {
// Only extern and packed structs have defined in-memory layout.
comptime assert(@typeInfo(T).Struct.layout != builtin.TypeInfo.ContainerLayout.Auto);
var res: [1]T = undefined;
try self.readNoEof(@sliceToBytes(res[0..]));
return res[0];
}
/// Reads an integer with the same size as the given enum's tag type. If the integer matches
/// an enum tag, casts the integer to the enum tag and returns it. Otherwise, returns an error.
/// TODO optimization taking advantage of most fields being in order
pub fn readEnum(self: *Self, comptime Enum: type, endian: builtin.Endian) !Enum {
const E = error{
/// An integer was read, but it did not match any of the tags in the supplied enum.
InvalidValue,
};
const type_info = @typeInfo(Enum).Enum;
const tag = try self.readInt(type_info.tag_type, endian);
inline for (std.meta.fields(Enum)) |field| {
if (tag == field.value) {
return @field(Enum, field.name);
}
}
return E.InvalidValue;
}
};
}
test "InStream" {
var buf = "a\x02".*;
var slice_stream = std.io.SliceInStream.init(&buf);
const in_stream = &slice_stream.stream;
testing.expect((try in_stream.readByte()) == 'a');
testing.expect((try in_stream.readEnum(enum(u8) {
a = 0,
b = 99,
c = 2,
d = 3,
}, undefined)) == .c);
testing.expectError(error.EndOfStream, in_stream.readByte());
}