From f96af89269e49f3d719f02c5bfbf06186221991f Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Tue, 30 Dec 2025 01:27:21 -0600 Subject: [PATCH] refactor: improve path request handling and link packet processing with enhanced logging and error management --- pkg/transport/transport.go | 168 ++++++++++++++++++++++++------------- 1 file changed, 112 insertions(+), 56 deletions(-) diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index 8c010c5..ce8be3d 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -551,18 +551,60 @@ func (t *Transport) NextHopInterface(destinationHash []byte) string { } func (t *Transport) RequestPath(destinationHash []byte, onInterface string, tag []byte, recursive bool) error { - packet := &PathRequest{ - DestinationHash: destinationHash, - Tag: tag, - TTL: PathRequestTTL, - Recursive: recursive, + if tag == nil { + tag = make([]byte, common.SIZE_16) + rand.Read(tag) } + var pathRequestData []byte + if t.identity != nil { + pathRequestData = append(destinationHash, t.identity.Hash()...) + pathRequestData = append(pathRequestData, tag...) + } else { + pathRequestData = append(destinationHash, tag...) + } + + pathRequestDest := destination.NewDestination( + nil, + destination.OUT, + destination.PLAIN, + "rnstransport", + "path", + "request", + ) + + pkt := packet.NewPacket( + pathRequestDest, + pathRequestData, + common.DATA, + ) + pkt.SetTransportType(common.BROADCAST) + pkt.SetHeaderType(common.HEADER_1) + + if err := pkt.Pack(); err != nil { + return fmt.Errorf("failed to pack path request: %w", err) + } + + debug.Log(debug.DEBUG_INFO, "Sending path request", "dest_hash", fmt.Sprintf("%x", destinationHash), "data_len", len(pathRequestData)) + if onInterface != "" { - return t.sendPathRequest(packet, onInterface) + iface, ok := t.interfaces[onInterface] + if !ok { + return fmt.Errorf("interface not found: %s", onInterface) + } + return iface.Send(pkt.Raw, "") } - return t.broadcastPathRequest(packet) + for _, iface := range t.interfaces { + if !iface.IsEnabled() { + continue + } + if err := iface.Send(pkt.Raw, ""); err != nil { + debug.Log(debug.DEBUG_ERROR, "Failed to send path request on interface", "interface", iface.GetName(), "error", err) + } + } + + return nil } // updatePathUnlocked updates path without acquiring mutex (caller must hold lock) @@ -825,36 +867,41 @@ func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) { debug.Log(debug.DEBUG_PACKETS, "Updated TCP interface stats", "rx_bytes", len(data)) } - switch packetType { - case PACKET_TYPE_ANNOUNCE: - debug.Log(debug.DEBUG_VERBOSE, "Processing announce packet") - if err := t.handleAnnouncePacket(data, iface); err != nil { - debug.Log(debug.DEBUG_INFO, "Announce handling failed", "error", err) + dataCopy := make([]byte, len(data)) + copy(dataCopy, data) + + go func() { + switch packetType { + case PACKET_TYPE_ANNOUNCE: + debug.Log(debug.DEBUG_VERBOSE, "Processing announce packet") + if err := t.handleAnnouncePacket(dataCopy, iface); err != nil { + debug.Log(debug.DEBUG_INFO, "Announce handling failed", "error", err) + } + case PACKET_TYPE_LINK: + debug.Log(debug.DEBUG_ERROR, "Processing link packet (type=0x02)", "packet_size", len(dataCopy)) + t.handleLinkPacket(dataCopy[common.ONE:], iface, PACKET_TYPE_LINK) + case packet.PacketTypeProof: + debug.Log(debug.DEBUG_VERBOSE, "Processing proof packet") + fullData := append([]byte{packet.PacketTypeProof}, dataCopy[common.ONE:]...) + pkt := &packet.Packet{Raw: fullData} + if err := pkt.Unpack(); err != nil { + debug.Log(debug.DEBUG_INFO, "Failed to unpack proof packet", "error", err) + return + } + t.handleProofPacket(pkt, iface) + case common.ZERO: + // Data packets addressed to link destinations carry link traffic + if destType == DEST_TYPE_LINK { + debug.Log(debug.DEBUG_ERROR, "Processing link data packet (dest_type=3)", "packet_size", len(dataCopy)) + t.handleLinkPacket(dataCopy[common.ONE:], iface, common.ZERO) + } else { + debug.Log(debug.DEBUG_ERROR, "Processing data packet (type 0x00)", "packet_size", len(dataCopy), "dest_type", destType, "header_type", headerType) + t.handleTransportPacket(dataCopy[common.ONE:], iface) + } + default: + debug.Log(debug.DEBUG_INFO, "Unknown packet type", "type", fmt.Sprintf(common.STR_FMT_HEX, packetType), "source", iface.GetName()) } - case PACKET_TYPE_LINK: - debug.Log(debug.DEBUG_ERROR, "Processing link packet (type=0x02)", "packet_size", len(data)) - t.handleLinkPacket(data[common.ONE:], iface, PACKET_TYPE_LINK) - case packet.PacketTypeProof: - debug.Log(debug.DEBUG_VERBOSE, "Processing proof packet") - fullData := append([]byte{packet.PacketTypeProof}, data[common.ONE:]...) - pkt := &packet.Packet{Raw: fullData} - if err := pkt.Unpack(); err != nil { - debug.Log(debug.DEBUG_INFO, "Failed to unpack proof packet", "error", err) - return - } - t.handleProofPacket(pkt, iface) - case common.ZERO: - // Data packets addressed to link destinations carry link traffic - if destType == DEST_TYPE_LINK { - debug.Log(debug.DEBUG_ERROR, "Processing link data packet (dest_type=3)", "packet_size", len(data)) - t.handleLinkPacket(data[common.ONE:], iface, common.ZERO) - } else { - debug.Log(debug.DEBUG_ERROR, "Processing data packet (type 0x00)", "packet_size", len(data), "dest_type", destType, "header_type", headerType) - t.handleTransportPacket(data[common.ONE:], iface) - } - default: - debug.Log(debug.DEBUG_INFO, "Unknown packet type", "type", fmt.Sprintf(common.STR_FMT_HEX, packetType), "source", iface.GetName()) - } + }() } func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) error { @@ -1099,18 +1146,19 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterf } func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface, packetType byte) { - debug.Log(debug.DEBUG_ERROR, "Handling link packet", "bytes", len(data), "packet_type", fmt.Sprintf("0x%02x", packetType), "PACKET_TYPE_LINK", fmt.Sprintf("0x%02x", PACKET_TYPE_LINK)) + startTime := time.Now() + debug.Log(debug.DEBUG_INFO, "Handling link packet", "bytes", len(data), "packet_type", fmt.Sprintf("0x%02x", packetType), "interface", iface.GetName()) pkt := &packet.Packet{} // If this is a LINKREQUEST packet (type=0x02), handle it as link establishment if packetType == PACKET_TYPE_LINK { - debug.Log(debug.DEBUG_ERROR, "Processing LINKREQUEST (type=0x02)") + debug.Log(debug.DEBUG_INFO, "Processing LINKREQUEST (type=0x02)", "interface", iface.GetName()) // Parse as LINKREQUEST packet - prepend the packet type pkt.Raw = append([]byte{PACKET_TYPE_LINK}, data...) if err := pkt.Unpack(); err != nil { - debug.Log(debug.DEBUG_ERROR, "Failed to unpack link request", "error", err) + debug.Log(debug.DEBUG_ERROR, "Failed to unpack link request", "error", err, "elapsed", time.Since(startTime).Seconds()) return } @@ -1119,7 +1167,7 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface, destHash = destHash[:16] } - debug.Log(debug.DEBUG_ERROR, "Link request for destination", "hash", fmt.Sprintf("%x", destHash)) + debug.Log(debug.DEBUG_INFO, "Link request for destination", "hash", fmt.Sprintf("%x", destHash), "interface", iface.GetName()) // Look up the destination t.mutex.RLock() @@ -1127,24 +1175,26 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface, t.mutex.RUnlock() if !exists { - debug.Log(debug.DEBUG_ERROR, "No destination registered for hash", "hash", fmt.Sprintf("%x", destHash)) + debug.Log(debug.DEBUG_ERROR, "No destination registered for hash", "hash", fmt.Sprintf("%x", destHash), "elapsed", time.Since(startTime).Seconds()) return } - debug.Log(debug.DEBUG_ERROR, "Found registered destination", "hash", fmt.Sprintf("%x", destHash)) + debug.Log(debug.DEBUG_INFO, "Found registered destination", "hash", fmt.Sprintf("%x", destHash), "elapsed", time.Since(startTime).Seconds()) // Handle the incoming link request + reqStartTime := time.Now() t.handleIncomingLinkRequest(pkt, destIface, iface) + debug.Log(debug.DEBUG_INFO, "Link request handling completed", "elapsed", time.Since(reqStartTime).Seconds(), "total_elapsed", time.Since(startTime).Seconds()) return } // Otherwise, this is a data packet for an established link - debug.Log(debug.DEBUG_ERROR, "Processing link data packet") + debug.Log(debug.DEBUG_INFO, "Processing link data packet", "interface", iface.GetName()) // Parse as data packet - prepend packet type 0x00 pkt.Raw = append([]byte{0x00}, data...) if err := pkt.Unpack(); err != nil { - debug.Log(debug.DEBUG_ERROR, "Failed to unpack link data packet", "error", err) + debug.Log(debug.DEBUG_ERROR, "Failed to unpack link data packet", "error", err, "interface", iface.GetName()) return } @@ -1154,7 +1204,7 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface, linkID = linkID[:16] } - debug.Log(debug.DEBUG_ERROR, "Link data for link ID", "link_id", fmt.Sprintf("%x", linkID)) + debug.Log(debug.DEBUG_INFO, "Link data for link ID", "link_id", fmt.Sprintf("%x", linkID), "context", fmt.Sprintf("0x%02x", pkt.Context), "packet_type", fmt.Sprintf("0x%02x", pkt.PacketType), "interface", iface.GetName()) // Find the established link t.mutex.RLock() @@ -1172,16 +1222,17 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface, } func (t *Transport) handleIncomingLinkRequest(pkt *packet.Packet, destIface interface{}, networkIface common.NetworkInterface) { - debug.Log(debug.DEBUG_TRACE, "Handling incoming link request") + startTime := time.Now() + debug.Log(debug.DEBUG_INFO, "Handling incoming link request", "interface", networkIface.GetName()) // The link ID is in the packet data linkID := pkt.Data if len(linkID) == 0 { - debug.Log(debug.DEBUG_INFO, "No link ID in link request packet") + debug.Log(debug.DEBUG_INFO, "No link ID in link request packet", "elapsed", time.Since(startTime).Seconds()) return } - debug.Log(debug.DEBUG_TRACE, "Link request with ID", "id", fmt.Sprintf("%x", linkID[:8])) + debug.Log(debug.DEBUG_INFO, "Link request with ID", "id", fmt.Sprintf("%x", linkID[:8]), "full_id", fmt.Sprintf("%x", linkID), "elapsed", time.Since(startTime).Seconds()) // Call the destination's HandleIncomingLinkRequest method destValue := reflect.ValueOf(destIface) @@ -1194,18 +1245,19 @@ func (t *Transport) handleIncomingLinkRequest(pkt *packet.Packet, destIface inte reflect.ValueOf(t), reflect.ValueOf(networkIface), } + callStartTime := time.Now() results := method.Call(args) if len(results) > 0 && !results[0].IsNil() { err := results[0].Interface().(error) - debug.Log(debug.DEBUG_ERROR, "Failed to handle incoming link request", "error", err) + debug.Log(debug.DEBUG_ERROR, "Failed to handle incoming link request", "error", err, "call_elapsed", time.Since(callStartTime).Seconds(), "total_elapsed", time.Since(startTime).Seconds()) } else { - debug.Log(debug.DEBUG_VERBOSE, "Link request handled successfully by destination") + debug.Log(debug.DEBUG_INFO, "Link request handled successfully by destination", "call_elapsed", time.Since(callStartTime).Seconds(), "total_elapsed", time.Since(startTime).Seconds()) } } else { - debug.Log(debug.DEBUG_INFO, "Destination does not have HandleIncomingLinkRequest method") + debug.Log(debug.DEBUG_ERROR, "Destination does not have HandleIncomingLinkRequest method", "elapsed", time.Since(startTime).Seconds()) } } else { - debug.Log(debug.DEBUG_INFO, "Invalid destination object") + debug.Log(debug.DEBUG_ERROR, "Invalid destination object", "elapsed", time.Since(startTime).Seconds()) } } @@ -1628,7 +1680,7 @@ type LinkInterface interface { SetPacketTimeout(packet interface{}, callback func(interface{}), timeout time.Duration) SetPacketDelivered(packet interface{}, callback func(interface{})) HandleInbound(pkt *packet.Packet) error - ValidateLinkProof(pkt *packet.Packet) error + ValidateLinkProof(pkt *packet.Packet, networkIface common.NetworkInterface) error } func (l *Link) GetRTT() float64 { @@ -1809,15 +1861,19 @@ func (t *Transport) handleProofPacket(pkt *packet.Packet, iface common.NetworkIn linkID = linkID[:16] } + debug.Log(debug.DEBUG_INFO, "Received link proof packet", "link_id", fmt.Sprintf("%x", linkID), "data_len", len(pkt.Data)) + t.mutex.RLock() link, exists := t.links[string(linkID)] t.mutex.RUnlock() if exists && link != nil { - if err := link.ValidateLinkProof(pkt); err != nil { - debug.Log(debug.DEBUG_ERROR, "Link proof validation failed", "error", err) + debug.Log(debug.DEBUG_INFO, "Found link for proof, validating", "link_id", fmt.Sprintf("%x", linkID), "interface", iface.GetName()) + startTime := time.Now() + if err := link.ValidateLinkProof(pkt, iface); err != nil { + debug.Log(debug.DEBUG_ERROR, "Link proof validation failed", "error", err, "link_id", fmt.Sprintf("%x", linkID), "elapsed", time.Since(startTime).Seconds()) } else { - debug.Log(debug.DEBUG_INFO, "Link proof validated successfully") + debug.Log(debug.DEBUG_INFO, "Link proof validated successfully", "link_id", fmt.Sprintf("%x", linkID), "elapsed", time.Since(startTime).Seconds()) } return }