refactor: improve path request handling and link packet processing with enhanced logging and error management
This commit is contained in:
@@ -551,18 +551,60 @@ func (t *Transport) NextHopInterface(destinationHash []byte) string {
|
||||
}
|
||||
|
||||
func (t *Transport) RequestPath(destinationHash []byte, onInterface string, tag []byte, recursive bool) error {
|
||||
packet := &PathRequest{
|
||||
DestinationHash: destinationHash,
|
||||
Tag: tag,
|
||||
TTL: PathRequestTTL,
|
||||
Recursive: recursive,
|
||||
if tag == nil {
|
||||
tag = make([]byte, common.SIZE_16)
|
||||
rand.Read(tag)
|
||||
}
|
||||
|
||||
var pathRequestData []byte
|
||||
if t.identity != nil {
|
||||
pathRequestData = append(destinationHash, t.identity.Hash()...)
|
||||
pathRequestData = append(pathRequestData, tag...)
|
||||
} else {
|
||||
pathRequestData = append(destinationHash, tag...)
|
||||
}
|
||||
|
||||
pathRequestDest := destination.NewDestination(
|
||||
nil,
|
||||
destination.OUT,
|
||||
destination.PLAIN,
|
||||
"rnstransport",
|
||||
"path",
|
||||
"request",
|
||||
)
|
||||
|
||||
pkt := packet.NewPacket(
|
||||
pathRequestDest,
|
||||
pathRequestData,
|
||||
common.DATA,
|
||||
)
|
||||
pkt.SetTransportType(common.BROADCAST)
|
||||
pkt.SetHeaderType(common.HEADER_1)
|
||||
|
||||
if err := pkt.Pack(); err != nil {
|
||||
return fmt.Errorf("failed to pack path request: %w", err)
|
||||
}
|
||||
|
||||
debug.Log(debug.DEBUG_INFO, "Sending path request", "dest_hash", fmt.Sprintf("%x", destinationHash), "data_len", len(pathRequestData))
|
||||
|
||||
if onInterface != "" {
|
||||
return t.sendPathRequest(packet, onInterface)
|
||||
iface, ok := t.interfaces[onInterface]
|
||||
if !ok {
|
||||
return fmt.Errorf("interface not found: %s", onInterface)
|
||||
}
|
||||
return iface.Send(pkt.Raw, "")
|
||||
}
|
||||
|
||||
return t.broadcastPathRequest(packet)
|
||||
for _, iface := range t.interfaces {
|
||||
if !iface.IsEnabled() {
|
||||
continue
|
||||
}
|
||||
if err := iface.Send(pkt.Raw, ""); err != nil {
|
||||
debug.Log(debug.DEBUG_ERROR, "Failed to send path request on interface", "interface", iface.GetName(), "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updatePathUnlocked updates path without acquiring mutex (caller must hold lock)
|
||||
@@ -825,36 +867,41 @@ func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) {
|
||||
debug.Log(debug.DEBUG_PACKETS, "Updated TCP interface stats", "rx_bytes", len(data))
|
||||
}
|
||||
|
||||
switch packetType {
|
||||
case PACKET_TYPE_ANNOUNCE:
|
||||
debug.Log(debug.DEBUG_VERBOSE, "Processing announce packet")
|
||||
if err := t.handleAnnouncePacket(data, iface); err != nil {
|
||||
debug.Log(debug.DEBUG_INFO, "Announce handling failed", "error", err)
|
||||
dataCopy := make([]byte, len(data))
|
||||
copy(dataCopy, data)
|
||||
|
||||
go func() {
|
||||
switch packetType {
|
||||
case PACKET_TYPE_ANNOUNCE:
|
||||
debug.Log(debug.DEBUG_VERBOSE, "Processing announce packet")
|
||||
if err := t.handleAnnouncePacket(dataCopy, iface); err != nil {
|
||||
debug.Log(debug.DEBUG_INFO, "Announce handling failed", "error", err)
|
||||
}
|
||||
case PACKET_TYPE_LINK:
|
||||
debug.Log(debug.DEBUG_ERROR, "Processing link packet (type=0x02)", "packet_size", len(dataCopy))
|
||||
t.handleLinkPacket(dataCopy[common.ONE:], iface, PACKET_TYPE_LINK)
|
||||
case packet.PacketTypeProof:
|
||||
debug.Log(debug.DEBUG_VERBOSE, "Processing proof packet")
|
||||
fullData := append([]byte{packet.PacketTypeProof}, dataCopy[common.ONE:]...)
|
||||
pkt := &packet.Packet{Raw: fullData}
|
||||
if err := pkt.Unpack(); err != nil {
|
||||
debug.Log(debug.DEBUG_INFO, "Failed to unpack proof packet", "error", err)
|
||||
return
|
||||
}
|
||||
t.handleProofPacket(pkt, iface)
|
||||
case common.ZERO:
|
||||
// Data packets addressed to link destinations carry link traffic
|
||||
if destType == DEST_TYPE_LINK {
|
||||
debug.Log(debug.DEBUG_ERROR, "Processing link data packet (dest_type=3)", "packet_size", len(dataCopy))
|
||||
t.handleLinkPacket(dataCopy[common.ONE:], iface, common.ZERO)
|
||||
} else {
|
||||
debug.Log(debug.DEBUG_ERROR, "Processing data packet (type 0x00)", "packet_size", len(dataCopy), "dest_type", destType, "header_type", headerType)
|
||||
t.handleTransportPacket(dataCopy[common.ONE:], iface)
|
||||
}
|
||||
default:
|
||||
debug.Log(debug.DEBUG_INFO, "Unknown packet type", "type", fmt.Sprintf(common.STR_FMT_HEX, packetType), "source", iface.GetName())
|
||||
}
|
||||
case PACKET_TYPE_LINK:
|
||||
debug.Log(debug.DEBUG_ERROR, "Processing link packet (type=0x02)", "packet_size", len(data))
|
||||
t.handleLinkPacket(data[common.ONE:], iface, PACKET_TYPE_LINK)
|
||||
case packet.PacketTypeProof:
|
||||
debug.Log(debug.DEBUG_VERBOSE, "Processing proof packet")
|
||||
fullData := append([]byte{packet.PacketTypeProof}, data[common.ONE:]...)
|
||||
pkt := &packet.Packet{Raw: fullData}
|
||||
if err := pkt.Unpack(); err != nil {
|
||||
debug.Log(debug.DEBUG_INFO, "Failed to unpack proof packet", "error", err)
|
||||
return
|
||||
}
|
||||
t.handleProofPacket(pkt, iface)
|
||||
case common.ZERO:
|
||||
// Data packets addressed to link destinations carry link traffic
|
||||
if destType == DEST_TYPE_LINK {
|
||||
debug.Log(debug.DEBUG_ERROR, "Processing link data packet (dest_type=3)", "packet_size", len(data))
|
||||
t.handleLinkPacket(data[common.ONE:], iface, common.ZERO)
|
||||
} else {
|
||||
debug.Log(debug.DEBUG_ERROR, "Processing data packet (type 0x00)", "packet_size", len(data), "dest_type", destType, "header_type", headerType)
|
||||
t.handleTransportPacket(data[common.ONE:], iface)
|
||||
}
|
||||
default:
|
||||
debug.Log(debug.DEBUG_INFO, "Unknown packet type", "type", fmt.Sprintf(common.STR_FMT_HEX, packetType), "source", iface.GetName())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) error {
|
||||
@@ -1099,18 +1146,19 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterf
|
||||
}
|
||||
|
||||
func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface, packetType byte) {
|
||||
debug.Log(debug.DEBUG_ERROR, "Handling link packet", "bytes", len(data), "packet_type", fmt.Sprintf("0x%02x", packetType), "PACKET_TYPE_LINK", fmt.Sprintf("0x%02x", PACKET_TYPE_LINK))
|
||||
startTime := time.Now()
|
||||
debug.Log(debug.DEBUG_INFO, "Handling link packet", "bytes", len(data), "packet_type", fmt.Sprintf("0x%02x", packetType), "interface", iface.GetName())
|
||||
|
||||
pkt := &packet.Packet{}
|
||||
|
||||
// If this is a LINKREQUEST packet (type=0x02), handle it as link establishment
|
||||
if packetType == PACKET_TYPE_LINK {
|
||||
debug.Log(debug.DEBUG_ERROR, "Processing LINKREQUEST (type=0x02)")
|
||||
debug.Log(debug.DEBUG_INFO, "Processing LINKREQUEST (type=0x02)", "interface", iface.GetName())
|
||||
|
||||
// Parse as LINKREQUEST packet - prepend the packet type
|
||||
pkt.Raw = append([]byte{PACKET_TYPE_LINK}, data...)
|
||||
if err := pkt.Unpack(); err != nil {
|
||||
debug.Log(debug.DEBUG_ERROR, "Failed to unpack link request", "error", err)
|
||||
debug.Log(debug.DEBUG_ERROR, "Failed to unpack link request", "error", err, "elapsed", time.Since(startTime).Seconds())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1119,7 +1167,7 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface,
|
||||
destHash = destHash[:16]
|
||||
}
|
||||
|
||||
debug.Log(debug.DEBUG_ERROR, "Link request for destination", "hash", fmt.Sprintf("%x", destHash))
|
||||
debug.Log(debug.DEBUG_INFO, "Link request for destination", "hash", fmt.Sprintf("%x", destHash), "interface", iface.GetName())
|
||||
|
||||
// Look up the destination
|
||||
t.mutex.RLock()
|
||||
@@ -1127,24 +1175,26 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface,
|
||||
t.mutex.RUnlock()
|
||||
|
||||
if !exists {
|
||||
debug.Log(debug.DEBUG_ERROR, "No destination registered for hash", "hash", fmt.Sprintf("%x", destHash))
|
||||
debug.Log(debug.DEBUG_ERROR, "No destination registered for hash", "hash", fmt.Sprintf("%x", destHash), "elapsed", time.Since(startTime).Seconds())
|
||||
return
|
||||
}
|
||||
|
||||
debug.Log(debug.DEBUG_ERROR, "Found registered destination", "hash", fmt.Sprintf("%x", destHash))
|
||||
debug.Log(debug.DEBUG_INFO, "Found registered destination", "hash", fmt.Sprintf("%x", destHash), "elapsed", time.Since(startTime).Seconds())
|
||||
|
||||
// Handle the incoming link request
|
||||
reqStartTime := time.Now()
|
||||
t.handleIncomingLinkRequest(pkt, destIface, iface)
|
||||
debug.Log(debug.DEBUG_INFO, "Link request handling completed", "elapsed", time.Since(reqStartTime).Seconds(), "total_elapsed", time.Since(startTime).Seconds())
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise, this is a data packet for an established link
|
||||
debug.Log(debug.DEBUG_ERROR, "Processing link data packet")
|
||||
debug.Log(debug.DEBUG_INFO, "Processing link data packet", "interface", iface.GetName())
|
||||
|
||||
// Parse as data packet - prepend packet type 0x00
|
||||
pkt.Raw = append([]byte{0x00}, data...)
|
||||
if err := pkt.Unpack(); err != nil {
|
||||
debug.Log(debug.DEBUG_ERROR, "Failed to unpack link data packet", "error", err)
|
||||
debug.Log(debug.DEBUG_ERROR, "Failed to unpack link data packet", "error", err, "interface", iface.GetName())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1154,7 +1204,7 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface,
|
||||
linkID = linkID[:16]
|
||||
}
|
||||
|
||||
debug.Log(debug.DEBUG_ERROR, "Link data for link ID", "link_id", fmt.Sprintf("%x", linkID))
|
||||
debug.Log(debug.DEBUG_INFO, "Link data for link ID", "link_id", fmt.Sprintf("%x", linkID), "context", fmt.Sprintf("0x%02x", pkt.Context), "packet_type", fmt.Sprintf("0x%02x", pkt.PacketType), "interface", iface.GetName())
|
||||
|
||||
// Find the established link
|
||||
t.mutex.RLock()
|
||||
@@ -1172,16 +1222,17 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface,
|
||||
}
|
||||
|
||||
func (t *Transport) handleIncomingLinkRequest(pkt *packet.Packet, destIface interface{}, networkIface common.NetworkInterface) {
|
||||
debug.Log(debug.DEBUG_TRACE, "Handling incoming link request")
|
||||
startTime := time.Now()
|
||||
debug.Log(debug.DEBUG_INFO, "Handling incoming link request", "interface", networkIface.GetName())
|
||||
|
||||
// The link ID is in the packet data
|
||||
linkID := pkt.Data
|
||||
if len(linkID) == 0 {
|
||||
debug.Log(debug.DEBUG_INFO, "No link ID in link request packet")
|
||||
debug.Log(debug.DEBUG_INFO, "No link ID in link request packet", "elapsed", time.Since(startTime).Seconds())
|
||||
return
|
||||
}
|
||||
|
||||
debug.Log(debug.DEBUG_TRACE, "Link request with ID", "id", fmt.Sprintf("%x", linkID[:8]))
|
||||
debug.Log(debug.DEBUG_INFO, "Link request with ID", "id", fmt.Sprintf("%x", linkID[:8]), "full_id", fmt.Sprintf("%x", linkID), "elapsed", time.Since(startTime).Seconds())
|
||||
|
||||
// Call the destination's HandleIncomingLinkRequest method
|
||||
destValue := reflect.ValueOf(destIface)
|
||||
@@ -1194,18 +1245,19 @@ func (t *Transport) handleIncomingLinkRequest(pkt *packet.Packet, destIface inte
|
||||
reflect.ValueOf(t),
|
||||
reflect.ValueOf(networkIface),
|
||||
}
|
||||
callStartTime := time.Now()
|
||||
results := method.Call(args)
|
||||
if len(results) > 0 && !results[0].IsNil() {
|
||||
err := results[0].Interface().(error)
|
||||
debug.Log(debug.DEBUG_ERROR, "Failed to handle incoming link request", "error", err)
|
||||
debug.Log(debug.DEBUG_ERROR, "Failed to handle incoming link request", "error", err, "call_elapsed", time.Since(callStartTime).Seconds(), "total_elapsed", time.Since(startTime).Seconds())
|
||||
} else {
|
||||
debug.Log(debug.DEBUG_VERBOSE, "Link request handled successfully by destination")
|
||||
debug.Log(debug.DEBUG_INFO, "Link request handled successfully by destination", "call_elapsed", time.Since(callStartTime).Seconds(), "total_elapsed", time.Since(startTime).Seconds())
|
||||
}
|
||||
} else {
|
||||
debug.Log(debug.DEBUG_INFO, "Destination does not have HandleIncomingLinkRequest method")
|
||||
debug.Log(debug.DEBUG_ERROR, "Destination does not have HandleIncomingLinkRequest method", "elapsed", time.Since(startTime).Seconds())
|
||||
}
|
||||
} else {
|
||||
debug.Log(debug.DEBUG_INFO, "Invalid destination object")
|
||||
debug.Log(debug.DEBUG_ERROR, "Invalid destination object", "elapsed", time.Since(startTime).Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1628,7 +1680,7 @@ type LinkInterface interface {
|
||||
SetPacketTimeout(packet interface{}, callback func(interface{}), timeout time.Duration)
|
||||
SetPacketDelivered(packet interface{}, callback func(interface{}))
|
||||
HandleInbound(pkt *packet.Packet) error
|
||||
ValidateLinkProof(pkt *packet.Packet) error
|
||||
ValidateLinkProof(pkt *packet.Packet, networkIface common.NetworkInterface) error
|
||||
}
|
||||
|
||||
func (l *Link) GetRTT() float64 {
|
||||
@@ -1809,15 +1861,19 @@ func (t *Transport) handleProofPacket(pkt *packet.Packet, iface common.NetworkIn
|
||||
linkID = linkID[:16]
|
||||
}
|
||||
|
||||
debug.Log(debug.DEBUG_INFO, "Received link proof packet", "link_id", fmt.Sprintf("%x", linkID), "data_len", len(pkt.Data))
|
||||
|
||||
t.mutex.RLock()
|
||||
link, exists := t.links[string(linkID)]
|
||||
t.mutex.RUnlock()
|
||||
|
||||
if exists && link != nil {
|
||||
if err := link.ValidateLinkProof(pkt); err != nil {
|
||||
debug.Log(debug.DEBUG_ERROR, "Link proof validation failed", "error", err)
|
||||
debug.Log(debug.DEBUG_INFO, "Found link for proof, validating", "link_id", fmt.Sprintf("%x", linkID), "interface", iface.GetName())
|
||||
startTime := time.Now()
|
||||
if err := link.ValidateLinkProof(pkt, iface); err != nil {
|
||||
debug.Log(debug.DEBUG_ERROR, "Link proof validation failed", "error", err, "link_id", fmt.Sprintf("%x", linkID), "elapsed", time.Since(startTime).Seconds())
|
||||
} else {
|
||||
debug.Log(debug.DEBUG_INFO, "Link proof validated successfully")
|
||||
debug.Log(debug.DEBUG_INFO, "Link proof validated successfully", "link_id", fmt.Sprintf("%x", linkID), "elapsed", time.Since(startTime).Seconds())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user