io: upgrade to 0.15.1

This commit is contained in:
Arran Ireland
2025-10-02 10:40:37 +01:00
parent 3928731834
commit cb697eccb2
2 changed files with 167 additions and 217 deletions

View File

@@ -14,9 +14,10 @@ running: bool,
node: *core.Node,
host: []const u8,
port: u16,
stream: ?std.net.Stream,
reader: ?hdlc.Reader(std.net.Stream.Reader, mtu),
writer: ?hdlc.Writer(std.net.Stream.Writer),
stream_reader: ?std.net.Stream.Reader,
stream_writer: ?std.net.Stream.Writer,
hdlc_reader: ?hdlc.Reader,
hdlc_writer: ?hdlc.Writer,
pub fn init(node: *core.Node, host: []const u8, port: u16, ally: Allocator) !Self {
return .{
@@ -25,9 +26,10 @@ pub fn init(node: *core.Node, host: []const u8, port: u16, ally: Allocator) !Sel
.node = node,
.host = try ally.dupe(u8, host),
.port = port,
.stream = null,
.reader = null,
.writer = null,
.stream_reader = null,
.stream_writer = null,
.hdlc_reader = null,
.hdlc_writer = null,
};
}
@@ -35,13 +37,12 @@ pub fn deinit(self: *Self) void {
self.running = false;
self.ally.destroy(&self.host);
if (self.stream) |stream| {
stream.close();
self.stream = null;
if (self.hdlc_reader) |r| {
self.ally.free(r.reader.buffer);
}
if (self.reader) |*reader| {
reader.deinit();
if (self.hdlc_writer) |w| {
self.ally.free(w.writer.buffer);
}
}
@@ -49,17 +50,21 @@ pub fn run(self: *Self) !void {
try self.connect();
self.running = true;
var reader = self.reader orelse return error.NoReader;
var frames: [1024][]const u8 = undefined;
var hdlc_reader = self.hdlc_reader orelse return error.NoReader;
// TODO: Replace with 2 * packet mtu.
var frame: [1024]u8 = @splat(0);
while (self.running) {
const n = reader.readFrames(&frames) catch |err| {
if (err == error.EndOfStream) continue else return err;
const n = hdlc_reader.readFrame(&frame) catch |err| {
switch (err) {
error.EndOfStream => continue,
else => return err,
}
};
for (frames[0..n]) |frame| {
try self.handleFrame(frame);
}
if (n == 0) continue;
try self.handleFrame(frame[0..n]);
}
}
@@ -67,36 +72,32 @@ fn connect(self: *Self) !void {
const address_list = try std.net.getAddressList(self.ally, self.host, self.port);
defer address_list.deinit();
if (address_list.addrs.len == 0) {
return error.FailedHostLookup;
}
if (address_list.addrs.len == 0) return error.FailedHostLookup;
const address = address_list.addrs[0];
self.stream = try std.net.tcpConnectToAddress(address);
self.reader = try hdlc.Reader(std.net.Stream.Reader, mtu).init(
self.stream.?.reader(),
self.ally,
);
self.writer = hdlc.Writer(std.net.Stream.Writer).init(
self.stream.?.writer(),
);
var stream = try std.net.tcpConnectToAddress(address);
log.info("connected to {s}:{} at {}", .{ self.host, self.port, address });
self.stream_reader = stream.reader(try self.ally.alloc(u8, mtu));
self.stream_writer = stream.writer(try self.ally.alloc(u8, mtu));
self.hdlc_reader = hdlc.Reader.init(self.stream_reader.?.interface());
self.hdlc_writer = hdlc.Writer.init(&self.stream_writer.?.interface);
log.info("connected to {s}:{d} at {f}", .{ self.host, self.port, address });
}
pub fn write(self: *Self, data: []const u8) !void {
const writer = self.writer orelse return error.NotConnected;
const n = try writer.writeFrame(data);
log.debug("sent {} bytes", .{n});
log.debug("sent {d} bytes", .{n});
}
fn handleFrame(self: *Self, data: []const u8) !void {
log.debug("handling frame ({} bytes)", .{data.len});
log.debug("raw bytes: {}", .{std.fmt.fmtSliceHexLower(data)});
log.debug("handling frame ({d} bytes)", .{data.len});
// log.debug("raw bytes: {f}", .{std.fmt.hex(data)});
var factory = core.packet.Factory.init(self.ally, std.crypto.random, .{});
var packet = factory.fromBytes(data) catch |err| {
log.err("failed to parse packet: {}", .{err});
log.err("failed to parse packet: {any}", .{err});
return;
};
@@ -104,11 +105,11 @@ fn handleFrame(self: *Self, data: []const u8) !void {
var event = core.Node.Event.Out{ .packet = packet };
defer event.deinit();
log.info("{}", .{event});
log.info("{f}", .{event});
if (packet.header.purpose == .announce) {
packet.validate() catch |err| {
log.err("announce validation failed: {}", .{err});
log.err("announce validation failed: {any}", .{err});
return;
};
log.info("announce validation succeeded", .{});

View File

@@ -8,237 +8,186 @@ const escape_mask = 0x20;
const escaped_escape = 0x5d;
const escaped_flag = 0x5e;
pub fn Writer(comptime W: type) type {
return struct {
impl: W,
pub const Writer = struct {
writer: *std.Io.Writer,
const Self = @This();
pub const Error = W.Error;
pub fn init(impl: W) Self {
pub fn init(writer: *std.Io.Writer) Self {
return .{
.impl = impl,
.writer = writer,
};
}
pub fn writeFrame(self: Self, bytes: []const u8) Error!usize {
try self.impl.writeByte(flag);
pub fn writeFrame(self: Self, payload: []const u8) !void {
try self.writer.writeByte(flag);
var start: usize = 0;
var index: usize = 0;
while (index < bytes.len) : (index += 1) {
const byte = bytes[index];
while (index < payload.len) : (index += 1) {
const byte = payload[index];
if (byte == escape or byte == flag) {
if (index > start) {
try self.impl.writeAll(bytes[start..index]);
try self.writer.writeAll(payload[start..index]);
}
try self.impl.writeByte(escape);
try self.impl.writeByte(byte ^ escape_mask);
try self.writer.writeByte(escape);
try self.writer.writeByte(byte ^ escape_mask);
start = index + 1;
}
}
if (start < bytes.len) {
try self.impl.writeAll(bytes[start..]);
if (start < payload.len) {
try self.writer.writeAll(payload[start..]);
}
try self.impl.writeByte(flag);
return bytes.len;
try self.writer.writeByte(flag);
}
};
}
};
pub fn Reader(comptime R: type, comptime size: usize) type {
return struct {
pub const Reader = struct {
const Self = @This();
pub const Error = R.Error || error{EndOfStream};
reader: *std.Io.Reader,
impl: R,
escaped: bool = false,
buffer: []u8,
count: usize = 0,
ally: Allocator,
pub fn init(impl: R, ally: Allocator) !Self {
pub fn init(reader: *std.Io.Reader) Self {
return .{
.impl = impl,
.ally = ally,
.buffer = try ally.alloc(u8, size),
.reader = reader,
};
}
pub fn deinit(self: *Self) void {
self.ally.free(self.buffer);
}
// I'm assuming here that a valid frame can always be read in one call.
pub fn readFrame(self: *Self, frame: []u8) !usize {
var index: usize = 0;
var escaped = false;
pub fn readFrames(self: *Self, frame_buffer: [][]const u8) Error!usize {
const n = self.impl.read(self.buffer[self.count..]) catch |err| {
if (err == error.EndOfStream and self.count == 0) {
return 0;
}
return err;
};
while (true) {
if (index >= frame.len) return error.FrameFull;
self.count += n;
var frame_count: usize = 0;
var input_index: usize = 0;
var data_index: usize = 0;
while (frame_count < frame_buffer.len and input_index < self.count) {
const frame_start = data_index;
while (input_index < self.count and data_index < self.buffer.len) {
const byte = self.buffer[input_index];
input_index += 1;
if (self.escaped) {
self.buffer[data_index] = byte ^ escape_mask;
data_index += 1;
self.escaped = false;
continue;
}
switch (byte) {
flag => {
if (data_index > frame_start) {
frame_buffer[frame_count] = self.buffer[frame_start..data_index];
frame_count += 1;
}
break;
},
escape => {
self.escaped = true;
},
else => {
self.buffer[data_index] = byte;
data_index += 1;
},
}
}
}
if (input_index < self.count) {
const remaining = self.buffer[input_index..self.count];
std.mem.copyForwards(u8, self.buffer[0..remaining.len], remaining);
self.count = remaining.len;
switch (try self.reader.takeByte()) {
flag => break,
escape => escaped = true,
else => |byte| {
if (escaped) {
frame[index] = byte ^ escape_mask;
escaped = false;
} else {
self.count = 0;
frame[index] = byte;
}
return frame_count;
index += 1;
},
}
};
}
}
return index;
}
};
const t = std.testing;
test "write" {
var frames = std.ArrayList(u8).init(t.allocator);
defer frames.deinit();
var io_writer = std.Io.Writer.Allocating.init(t.allocator);
defer io_writer.deinit();
const frames_writer = frames.writer();
const writer = Writer(@TypeOf(frames_writer)).init(frames_writer);
const writer = Writer.init(&io_writer.writer);
const input = "this is some data";
_ = try writer.writeFrame(input);
try t.expectEqualSlices(u8, [_]u8{flag} ++ input ++ [_]u8{flag}, frames.items);
frames.clearRetainingCapacity();
try writer.writeFrame(input);
try t.expectEqualSlices(u8, [_]u8{flag} ++ input ++ [_]u8{flag}, io_writer.written());
io_writer.clearRetainingCapacity();
const flag_input = [_]u8{flag};
_ = try writer.writeFrame(&flag_input);
try t.expectEqualSlices(u8, &[_]u8{ flag, escape, escaped_flag, flag }, frames.items);
frames.clearRetainingCapacity();
try writer.writeFrame(&flag_input);
try t.expectEqualSlices(u8, &[_]u8{ flag, escape, escaped_flag, flag }, io_writer.written());
io_writer.clearRetainingCapacity();
const esc_input = [_]u8{escape};
const escaped_esc = [_]u8{ flag, escape, escaped_escape, flag };
_ = try writer.writeFrame(&esc_input);
try t.expectEqualSlices(u8, &escaped_esc, frames.items);
frames.clearRetainingCapacity();
try writer.writeFrame(&esc_input);
try t.expectEqualSlices(u8, &escaped_esc, io_writer.written());
io_writer.clearRetainingCapacity();
const mixed_input = [_]u8{ 0x01, flag, 0x02, escape, 0x03, 0x04, escape_mask };
const mixed_expected = [_]u8{ flag, 0x01, escape, escaped_flag, 0x02, escape, escaped_escape, 0x03, 0x04, escape_mask, flag };
_ = try writer.writeFrame(&mixed_input);
try t.expectEqualSlices(u8, &mixed_expected, frames.items);
try writer.writeFrame(&mixed_input);
try t.expectEqualSlices(u8, &mixed_expected, io_writer.written());
io_writer.clearRetainingCapacity();
}
test "read" {
const input = [_]u8{flag} ++ "this is some data" ++ [_]u8{flag};
var stream = std.io.fixedBufferStream(input);
var reader = try Reader(
@TypeOf(stream.reader()),
input.len,
).init(stream.reader(), t.allocator);
defer reader.deinit();
const payload = "this is some data";
var io_reader = std.Io.Reader.fixed([_]u8{flag} ++ payload ++ [_]u8{flag});
var reader = Reader.init(&io_reader);
var frame: [32]u8 = @splat(0);
var frame_buffer: [1][]const u8 = undefined;
const n = try reader.readFrames(&frame_buffer);
var n = try reader.readFrame(&frame);
try t.expectEqual(0, n);
try t.expectEqual(frame_buffer.len, n);
try t.expectEqualSlices(u8, "this is some data", frame_buffer[0]);
n = try reader.readFrame(&frame);
try t.expectEqualSlices(u8, payload, frame[0..n]);
const err = reader.readFrame(&frame);
try t.expectError(error.EndOfStream, err);
}
test "read-multiple-flags" {
const input = [_]u8{ flag, 0x01, 0x02, flag, 0x03, 0x04, flag };
var stream = std.io.fixedBufferStream(&input);
var reader = try Reader(
@TypeOf(stream.reader()),
input.len,
).init(stream.reader(), t.allocator);
defer reader.deinit();
var io_reader = std.Io.Reader.fixed(&[_]u8{ flag, 0x01, 0x02, flag, 0x03, 0x04, flag });
var reader = Reader.init(&io_reader);
var frame: [32]u8 = @splat(0);
var frame_buffer: [2][]const u8 = undefined;
const n = try reader.readFrames(&frame_buffer);
var n = try reader.readFrame(&frame);
try t.expectEqual(0, n);
try t.expectEqual(frame_buffer.len, n);
try t.expectEqualSlices(u8, &[_]u8{ 0x01, 0x02 }, frame_buffer[0]);
try t.expectEqualSlices(u8, &[_]u8{ 0x03, 0x04 }, frame_buffer[1]);
n = try reader.readFrame(&frame);
try t.expectEqualSlices(u8, &[_]u8{ 0x01, 0x02 }, frame[0..n]);
n = try reader.readFrame(&frame);
try t.expectEqualSlices(u8, &[_]u8{ 0x03, 0x04 }, frame[0..n]);
const err = reader.readFrame(&frame);
try t.expectError(error.EndOfStream, err);
}
test "read-mixed-data" {
const input = [_]u8{ flag, 0x01, escape, escaped_flag, 0x02, escape, escaped_escape, 0x03, flag };
var stream = std.io.fixedBufferStream(&input);
var reader = try Reader(
@TypeOf(stream.reader()),
input.len,
).init(stream.reader(), t.allocator);
defer reader.deinit();
var io_reader = std.Io.Reader.fixed(
&[_]u8{ flag, 0x01, escape, escaped_flag, 0x02, escape, escaped_escape, 0x03, flag },
);
var reader = Reader.init(&io_reader);
var frame: [32]u8 = @splat(0);
var frame_buffer: [1][]const u8 = undefined;
const n = try reader.readFrames(&frame_buffer);
var n = try reader.readFrame(&frame);
try t.expectEqual(0, n);
n = try reader.readFrame(&frame);
const unescaped = [_]u8{ 0x01, flag, 0x02, escape, 0x03 };
try t.expectEqual(frame_buffer.len, n);
try t.expectEqualSlices(u8, &unescaped, frame_buffer[0]);
try t.expectEqualSlices(u8, &unescaped, frame[0..n]);
const err = reader.readFrame(&frame);
try t.expectError(error.EndOfStream, err);
}
test "round-trip" {
var frames = std.ArrayList(u8).init(t.allocator);
defer frames.deinit();
const frames_writer = frames.writer();
const writer = Writer(@TypeOf(frames_writer)).init(frames_writer);
var io_writer = std.Io.Writer.Allocating.init(t.allocator);
defer io_writer.deinit();
const input = [_]u8{ 0x00, flag, 0xaa, escape, 0xff, flag, escape };
_ = try writer.writeFrame(&input);
const writer = Writer.init(&io_writer.writer);
try writer.writeFrame(&input);
var stream = std.io.fixedBufferStream(frames.items);
var reader = try Reader(
@TypeOf(stream.reader()),
input.len * 2,
).init(stream.reader(), t.allocator);
defer reader.deinit();
const escaped = [_]u8{ flag, 0x00, escape, escaped_flag, 0xaa, escape, escaped_escape, 0xff, escape, escaped_flag, escape, escaped_escape, flag };
try t.expectEqualSlices(u8, &escaped, io_writer.written());
var frame_buffer: [1][]const u8 = undefined;
const n = try reader.readFrames(&frame_buffer);
var io_reader = std.Io.Reader.fixed(io_writer.written());
var reader = Reader.init(&io_reader);
var frame: [32]u8 = @splat(0);
try t.expectEqual(frame_buffer.len, n);
try t.expectEqualSlices(u8, &input, frame_buffer[0]);
var n = try reader.readFrame(&frame);
try t.expectEqual(0, n);
n = try reader.readFrame(&frame);
try t.expectEqualSlices(u8, &input, frame[0..n]);
}