This commit is contained in:
Arran Ireland
2025-07-17 13:38:39 +01:00
parent 0ca5f82454
commit 33d9362f16
5 changed files with 205 additions and 82 deletions

View File

@@ -58,6 +58,12 @@ pub fn add(
}
}
pub fn remove(self: *Self, endpoint: Hash.Short) void {
const key = self.entries.getKeyPtr(&endpoint) orelse return;
_ = self.entries.swapRemove(key.*);
self.ally.free(key.*);
}
pub fn getPtr(self: *Self, endpoint: Hash.Short) ?*Entry {
return self.entries.getPtr(&endpoint);
}

View File

@@ -63,7 +63,6 @@ outgoing: *Outgoing,
packet_factory: PacketFactory,
mode: Mode,
directionality: Directionality,
bit_rate: ?BitRate,
pub fn init(
ally: Allocator,
@@ -81,7 +80,6 @@ pub fn init(
.packet_factory = packet_factory,
.mode = config.mode,
.directionality = config.directionality,
.bit_rate = config.initial_bit_rate,
};
}

View File

@@ -115,7 +115,7 @@ pub fn process(self: *Self) !void {
interfaces = self.interfaces.iterator();
while (interfaces.next()) |entry| {
try self.eventsOut(entry.interface, &entry.pending_out);
try self.eventsOut(entry.interface, &entry.pending);
}
}
@@ -132,12 +132,18 @@ fn eventsIn(self: *Self, interface: *Interface, now: u64) !void {
}
}
fn eventsOut(self: *Self, interface: *Interface, pending_out: *Interface.Outgoing) !void {
while (pending_out.pop()) |event_out| {
var event = event_out;
fn eventsOut(self: *Self, interface: *Interface, pending: *Interface.Outgoing, now: u64) !void {
while (pending.pop()) |entry| {
var event = entry.event;
const origin_interface = if (entry.origin_id) |id| self.interfaces.getPtr(id) else null;
const not_sent = switch (event) {
.packet => |*packet| self.packetOut(interface, packet),
.packet => |*packet| self.packetOut(
interface,
packet,
origin_interface,
now,
),
} catch true;
if (not_sent) {
@@ -181,10 +187,10 @@ fn packetIn(self: *Self, interface: *Interface, packet: *Packet, now: u64) !void
try self.transport(now, packet);
try switch (header.purpose) {
.announce => self.announcePacketIn(now, interface, packet),
.data => self.dataPacketIn(now, packet),
.link_request => self.linkRequestPacketIn(now, packet),
.proof => self.proofPacketIn(now, packet),
.announce => self.announcePacketIn(interface, packet, now),
.data => self.dataPacketIn(packet, now),
.link_request => self.linkRequestPacketIn(packet, now),
.proof => self.proofPacketIn(packet, now),
};
}
@@ -193,9 +199,7 @@ fn plainTask(self: *Self, interface: *Interface, plain: *Event.In.Plain) !void {
try self.interfaces.broadcast(packet, null);
}
fn packetOut(self: *Self, originating_interface: ?*Interface, packet: *Packet, now: u64) !bool {
_ = self;
fn packetOut(self: *Self, interface: *Interface, origin_interface: ?*Interface, packet: *Packet, now: u64) !bool {
const purpose = packet.header.purpose;
const variant = packet.header.endpoint;
const hops = packet.header.hops;
@@ -228,7 +232,7 @@ fn packetOut(self: *Self, originating_interface: ?*Interface, packet: *Packet, n
// TODO: If the endpoint variant is a link, don't transmit if closed.
// TODO: If interface is not the one we expect for this packet, don't transmit.
if (purpose == .announce and originating_interface == null) {
if (purpose == .announce and origin_interface == null) {
switch (interface.mode) {
.access_point => should_transmit = false,
.roaming => if (self.endpoints.get(endpoint)) |_| {
@@ -244,9 +248,9 @@ fn packetOut(self: *Self, originating_interface: ?*Interface, packet: *Packet, n
// Set interface announce allowed at, whatever that means.
// Else:
should_transmit = false;
// should_transmit = false;
// If not max announce queue length:
var should_queue = false;
// var should_queue = false;
// If there's already a similar announce in the queue and the current announce is newer, replace it with that one.
// Make sure the priority queue is recalculated to reflect this.
// should_queue = false if we replace an entry.
@@ -299,7 +303,7 @@ fn transport(self: *Self, now: u64, packet: *Packet) !void {
}
self.routes.setLastSeen(endpoint, now);
try self.interfaces.transmit(packet, route.source_interface);
try self.interfaces.transmit(packet, route.origin_interface);
}
}
@@ -310,52 +314,52 @@ fn transport(self: *Self, now: u64, packet: *Packet) !void {
}
}
fn announcePacketIn(self: *Self, now: u64, interface: *Interface, packet: *Packet) !void {
if (packet.payload != .announce) {
return Error.InvalidAnnounce;
}
fn announcePacketIn(self: *Self, interface: *Interface, announce: *Packet, now: u64) !void {
if (announce.payload != .announce) return Error.InvalidAnnounce;
const max_hops = 128;
const endpoint = packet.endpoints.endpoint();
const hops = packet.header.hops;
const noise = packet.payload.announce.noise;
const timestamp = packet.payload.announce.timestamp;
const endpoint = announce.endpoints.endpoint();
const hops = announce.header.hops;
const noise = announce.payload.announce.noise;
const timestamp = announce.payload.announce.timestamp;
// If hash isn't in routes, apply potential ingress limiting, hold packet and return.
// If not one of our endpoints and packet is in transport:
// Get announce entry from table.
// If entry hops is hops - 1, increment local broadcast count and if at max then remove from table.
// If entry hops is hops - 2 and retries is more than 0 and retransmission timeout reached, remove from table.
if (self.endpoints.has(&endpoint) or hops >= max_hops) {
if (!self.routes.has(&endpoint) and self.interfaces.shouldIngressLimit(interface.id, now)) {
self.interfaces.holdAnnounce(interface.id, try announce.clone());
return;
}
if (self.endpoints.has(&endpoint) or hops >= max_hops) return;
if (self.options.transport_enabled and announce.endpoints == .transport) {
if (self.announces.getPtr(&endpoint)) |entry| {
if (entry.hops == hops - 1) {
entry.rebroadcasts += 1;
if (entry.rebroadcasts >= 2) self.announces.remove(&endpoint);
} else if (entry.hops == hops - 2 and entry.retries > 0) {
if (now < entry.retransmit_timeout) self.announces.remove(&endpoint);
}
}
}
if (self.routes.get(endpoint)) |route| {
if (route.has(timestamp, noise)) {
if (timestamp != route.latest_timestamp or route.state != .unresponsive) {
return;
}
if (timestamp != route.latest_timestamp or route.state != .unresponsive) return;
}
const better_route = (hops <= route.hops and timestamp > route.latest_timestamp);
const route_expired = now >= route.expiry_time;
const newer_route = timestamp >= route.latest_timestamp;
if (!(better_route or route_expired or newer_route)) {
return;
}
if (!(better_route or route_expired or newer_route)) return;
}
self.routes.setState(endpoint, .unknown);
if (self.options.transport_enabled and packet.context != .path_response) {
// Should be put in to announce table.
const announce = try packet.clone();
if (self.options.transport_enabled and announce.context != .path_response) {
const retransmit_delay = self.system.rng.intRangeAtMost(u64, 0, 500_000);
try self.announces.add(
endpoint,
announce,
try announce.clone(),
interface.id,
hops,
retransmit_delay,
@@ -366,22 +370,22 @@ fn announcePacketIn(self: *Self, now: u64, interface: *Interface, packet: *Packe
// TODO: Check if the announce matches any discovery path requests and answer it if so.
// TODO: Cache packet if announce.
try self.routes.updateFrom(packet, interface, now);
try self.routes.updateFrom(announce, interface, now);
}
fn dataPacketIn(self: *Self, now: u64, packet: *Packet) !void {
fn dataPacketIn(self: *Self, packet: *Packet, now: u64) !void {
_ = self;
_ = now;
_ = packet;
}
fn linkRequestPacketIn(self: *Self, now: u64, packet: *Packet) !void {
fn linkRequestPacketIn(self: *Self, packet: *Packet, now: u64) !void {
_ = self;
_ = now;
_ = packet;
}
fn proofPacketIn(self: *Self, now: u64, packet: *Packet) !void {
fn proofPacketIn(self: *Self, packet: *Packet, now: u64) !void {
_ = self;
_ = now;
_ = packet;

View File

@@ -16,7 +16,7 @@ pub const Entry = struct {
};
const Noises = std.AutoArrayHashMap(TimestampedNoise, void);
source_interface: Interface.Id,
origin_interface: Interface.Id,
next_hop: Hash.Short,
hops: u8,
last_seen: u64,
@@ -70,21 +70,21 @@ pub fn setState(self: *Self, endpoint: Hash.Short, state: State) void {
}
}
pub fn updateFrom(self: *Self, packet: *const Packet, interface: *const Interface, now: u64) !void {
const endpoint = packet.endpoints.endpoint();
const next_hop = packet.endpoints.nextHop();
const timestamp = packet.payload.announce.timestamp;
const noise = packet.payload.announce.noise;
pub fn updateFrom(self: *Self, announce: *const Packet, interface: *const Interface, now: u64) !void {
const endpoint = announce.endpoints.endpoint();
const next_hop = announce.endpoints.nextHop();
const timestamp = announce.payload.announce.timestamp;
const noise = announce.payload.announce.noise;
var entry = Entry{
.source_interface = interface.id,
.origin_interface = interface.id,
.next_hop = next_hop,
.hops = packet.header.hops,
.hops = announce.header.hops,
.last_seen = now,
.expiry_time = now + interface.mode.routeLifetime(),
.noises = Entry.Noises.init(self.ally),
.latest_timestamp = timestamp,
.packet_hash = packet.hash(),
.packet_hash = announce.hash(),
.state = .unknown,
};

View File

@@ -1,6 +1,9 @@
const std = @import("std");
const unit = @import("../unit.zig");
const Allocator = std.mem.Allocator;
const Direction = @import("../endpoint.zig").Direction;
const Event = @import("../node/Event.zig");
const Interface = @import("../Interface.zig");
const Packet = @import("../packet/Managed.zig");
const PacketFactory = @import("../packet/Factory.zig");
@@ -14,8 +17,52 @@ pub const Error = error{
} || Allocator.Error;
const Entry = struct {
const Pending = std.fifo.LinearFifo(PendingEvent, .Dynamic);
const PendingEvent = struct {
event: Event.Out,
origin_id: Interface.Id,
};
const Metrics = struct {
last_seen: u64,
bytes_in: u64,
bytes_out: u64,
};
const IngressControl = struct {
held_release: u64,
new_time: u64,
burst_active: bool,
burst_freq: u64,
burst_freq_new: u64,
burst_hold: u64,
burst_activated: u64,
burst_penalty: u64,
incoming_announce_times: std.fifo.LinearFifo(u64, .{ .Static = 6 }),
fn announceFrequencyIn(self: @This(), now: u64) u64 {
const count = self.incoming_announce_times.count;
if (count < 1) return 0;
var sum = now - self.incoming_announce_times.peekItem(count - 1);
for (1..count) |i| {
sum += self.incoming_announce_times.peekItem(i) - self.incoming_announce_times.peekItem(i - 1);
}
const average = if (sum != 0) (1_000_000 * count) / sum else 0;
return average;
}
};
interface: *Interface,
pending_out: Interface.Outgoing,
pending: Pending,
metrics: Metrics,
ingress_control: IngressControl,
held_announces: std.StringArrayHashMap(Packet),
creation_time: u64,
};
const interface_limit = 256;
@@ -38,27 +85,83 @@ pub fn iterator(self: *Self) std.AutoHashMap(Interface.Id, Entry).ValueIterator
return self.entries.valueIterator();
}
pub fn add(self: *Self, config: Interface.Config) Error!Interface.Api {
if (self.entries.count() >= interface_limit) {
return Error.TooManyInterfaces;
pub fn getPtr(self: *Self, id: Interface.Id) ?*Interface {
if (self.entries.getPtr(id)) |entry| {
return entry.interface;
}
return null;
}
pub fn shouldIngressLimit(self: *Self, id: Interface.Id, now: u64) !bool {
const entry = self.entries.getPtr(id) orelse return Error.InterfaceNotFound;
const lifetime = now - entry.creation_time;
const control = &entry.ingress_control;
const frequency = entry.ingress_control.announceFrequencyIn(now);
const threshold = if (lifetime < control.new_time) control.burst_freq_new else control.burst_freq;
if (control.burst_active) {
if (frequency < threshold and now > control.burst_activated + control.burst_hold) {
control.burst_active = false;
control.held_release = now + control.burst_penalty;
}
return true;
} else {
if (frequency > threshold) {
control.burst_active = true;
control.burst_activated = now;
return true;
}
return false;
}
}
pub fn holdAnnounce(self: *Self, id: Interface.Id, announce: Packet) !void {
const entry = self.entries.getPtr(id) orelse return Error.InterfaceNotFound;
const max_held_announces = 256;
if (entry.held_announces.count() > max_held_announces) return;
const endpoint = announce.endpoints.endpoint();
const key = try self.ally.dupe(u8, &endpoint);
try entry.held_announces.put(key, announce);
}
pub fn updateMetrics(
self: *Self,
id: Interface.Id,
direction: Direction,
packet: *const Packet,
now: u64,
) void {
if (self.entries.getPtr(id)) |entry| {
switch (direction) {
.in => entry.bytes_in += packet.size(),
.out => entry.bytes_out += packet.size(),
}
entry.last_seen = now;
}
}
pub fn add(self: *Self, config: Interface.Config) Error!Interface.Api {
if (self.entries.count() >= interface_limit) return Error.TooManyInterfaces;
const id = self.current_interface_id;
self.current_interface_id += 1;
const incoming = try self.ally.create(Interface.Incoming);
incoming.* = Interface.Incoming.init(self.ally);
errdefer {
self.ally.destroy(incoming);
}
errdefer self.ally.destroy(incoming);
const outgoing = try self.ally.create(Interface.Outgoing);
outgoing.* = Interface.Outgoing.init(self.ally);
errdefer {
self.ally.destroy(outgoing);
}
errdefer self.ally.destroy(outgoing);
const packet_factory = PacketFactory.init(
self.ally,
@@ -68,9 +171,7 @@ pub fn add(self: *Self, config: Interface.Config) Error!Interface.Api {
const interface = try self.ally.create(Interface);
errdefer {
self.ally.destroy(interface);
}
errdefer self.ally.destroy(interface);
interface.* = Interface.init(
self.ally,
@@ -83,7 +184,7 @@ pub fn add(self: *Self, config: Interface.Config) Error!Interface.Api {
try self.entries.put(id, Entry{
.interface = interface,
.pending_out = Interface.Outgoing.init(self.ally),
.pending = Entry.Pending.init(self.ally),
});
return interface.api();
@@ -94,28 +195,33 @@ pub fn remove(self: *Self, id: Interface.Id) void {
entry.interface.deinit(self.ally);
self.ally.destroy(entry.interface);
self.entries.remove(id);
self.current_interface_id -= 1;
}
}
pub fn transmit(self: *Self, packet: *const Packet, id: Interface.Id) !void {
pub fn transmit(self: *Self, packet: *const Packet, id: Interface.Id, origin_id: ?Interface.Id) !void {
const entry = self.entries.getPtr(id) orelse return Error.InterfaceNotFound;
try entry.pending_out.push(.{
.packet = try packet.clone(),
try entry.pending.writeItem(.{
.event = .{
.packet = try packet.clone(),
},
.origin_id = origin_id,
});
}
pub fn broadcast(self: *Self, packet: Packet, excluded_id: ?Interface.Id) !void {
pub fn broadcast(self: *Self, packet: Packet, origin_id: ?Interface.Id) !void {
var entries = self.entries.valueIterator();
while (entries.next()) |entry| {
if (excluded_id) |id| {
if (entry.interface.id == id) {
continue;
}
if (origin_id) |id| {
if (entry.interface.id == id) continue;
}
try entry.pending_out.push(.{
.packet = try packet.clone(),
try entry.pending.writeItem(.{
.event = .{
.packet = try packet.clone(),
},
.origin_id = origin_id,
});
}
}
@@ -124,11 +230,20 @@ pub fn deinit(self: *Self) void {
var entries = self.entries.valueIterator();
while (entries.next()) |entry| {
while (entry.pending_out.pop()) |event| {
while (entry.pending.pop()) |event| {
var e = event;
e.deinit();
}
entry.pending_out.deinit();
var held_announces = entry.held_announces.iterator();
while (held_announces.next()) |announce| {
self.ally.free(announce.key_ptr);
announce.value_ptr.deinit();
}
entry.held_announces.deinit();
entry.pending.deinit();
entry.interface.deinit();
self.ally.destroy(entry.interface);
}