Add destination registration and link handling in Transport

This commit is contained in:
2025-10-07 22:43:45 -05:00
parent cc10830df3
commit 005e2566aa

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"log"
"net"
"reflect"
"sync"
"time"
@@ -107,6 +108,7 @@ type Transport struct {
config *common.ReticulumConfig
interfaces map[string]common.NetworkInterface
links map[string]*Link
destinations map[string]interface{}
announceRate *rate.Limiter
seenAnnounces map[string]bool
pathfinder *pathfinder.PathFinder
@@ -129,11 +131,30 @@ func NewTransport(cfg *common.ReticulumConfig) *Transport {
mutex: sync.RWMutex{},
config: cfg,
links: make(map[string]*Link),
destinations: make(map[string]interface{}),
pathfinder: pathfinder.NewPathFinder(),
}
return t
}
// RegisterDestination registers a destination to receive incoming link requests
func (t *Transport) RegisterDestination(hash []byte, dest interface{}) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.destinations[string(hash)] = dest
log.Printf("[DEBUG-5] Registered destination %x with transport", hash)
}
// CreateIncomingLink creates a link object for an incoming link request
// This avoids circular import issues by having transport create the link
func (t *Transport) CreateIncomingLink(dest interface{}, networkIface common.NetworkInterface) interface{} {
// This function signature uses interface{} to avoid importing link package
// The actual implementation will be in the application code
// For now, return nil to indicate links aren't fully implemented
log.Printf("[DEBUG-5] CreateIncomingLink called (not yet fully implemented)")
return nil
}
// Add GetTransportInstance function
func GetTransportInstance() *Transport {
transportMutex.Lock()
@@ -391,12 +412,12 @@ func (t *Transport) RequestPath(destinationHash []byte, onInterface string, tag
return t.broadcastPathRequest(packet)
}
func (t *Transport) UpdatePath(destinationHash []byte, nextHop []byte, interfaceName string, hops uint8) {
t.mutex.Lock()
defer t.mutex.Unlock()
iface, err := t.GetInterface(interfaceName)
if err != nil {
// updatePathUnlocked updates path without acquiring mutex (caller must hold lock)
func (t *Transport) updatePathUnlocked(destinationHash []byte, nextHop []byte, interfaceName string, hops uint8) {
// Direct access to interfaces map since caller holds the lock
iface, exists := t.interfaces[interfaceName]
if !exists {
log.Printf("[DEBUG-3] Interface %s not found", interfaceName)
return
}
@@ -408,6 +429,12 @@ func (t *Transport) UpdatePath(destinationHash []byte, nextHop []byte, interface
}
}
func (t *Transport) UpdatePath(destinationHash []byte, nextHop []byte, interfaceName string, hops uint8) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.updatePathUnlocked(destinationHash, nextHop, interfaceName, hops)
}
func (t *Transport) HandleAnnounce(data []byte, sourceIface common.NetworkInterface) error {
if len(data) < 53 { // Minimum size for announce packet
return fmt.Errorf("announce packet too small: %d bytes", len(data))
@@ -823,6 +850,9 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterf
log.Printf("[DEBUG-3] Accepted announce app_data: %x (%q)", appData, string(appData))
}
// Store the identity for later recall
identity.Remember(data, destinationHash, pubKey, appData)
// Generate announce hash to check for duplicates
announceHash := sha256.Sum256(data)
hashStr := string(announceHash[:])
@@ -840,6 +870,16 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterf
log.Printf("[DEBUG-3] Processing new announce")
// Register the path from this announce
// The destination is reachable via the interface that received this announce
if iface != nil {
// Use unlocked version since we may be called in a locked context
t.mutex.Lock()
t.updatePathUnlocked(destinationHash, nil, iface.GetName(), hopCount)
t.mutex.Unlock()
log.Printf("[DEBUG-3] Registered path to %x via %s (%d hops)", destinationHash, iface.GetName(), hopCount)
}
// Notify handlers first, regardless of forwarding limits
log.Printf("[DEBUG-3] Notifying announce handlers: destHash=%x, appDataLen=%d", addresses[:16], len(appData))
t.notifyAnnounceHandlers(addresses[:16], id, appData)
@@ -892,44 +932,94 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterf
}
func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface) {
if len(data) < 40 {
log.Printf("[DEBUG-3] Dropping link packet: insufficient length (%d bytes)", len(data))
log.Printf("[DEBUG-5] Handling link packet (%d bytes)", len(data))
// Parse the packet - need to prepend the packet type byte that was stripped
fullData := append([]byte{PACKET_TYPE_LINK}, data...)
pkt := &packet.Packet{Raw: fullData}
if err := pkt.Unpack(); err != nil {
log.Printf("[DEBUG-3] Failed to unpack link packet: %v", err)
return
}
dest := data[:32]
timestamp := binary.BigEndian.Uint64(data[32:40])
payload := data[40:]
destHash := pkt.DestinationHash
if len(destHash) > 16 {
destHash = destHash[:16]
}
log.Printf("[DEBUG-5] Link packet for destination: %x, context: 0x%02x", destHash, pkt.Context)
log.Printf("[DEBUG-5] Link packet - Destination: %x, Timestamp: %d, Payload: %d bytes",
dest, timestamp, len(payload))
if t.HasPath(dest) {
nextHop := t.NextHop(dest)
nextIfaceName := t.NextHopInterface(dest)
log.Printf("[DEBUG-6] Found path - Next hop: %x, Interface: %s", nextHop, nextIfaceName)
if nextIfaceName != iface.GetName() {
if nextIface, ok := t.interfaces[nextIfaceName]; ok {
log.Printf("[DEBUG-7] Forwarding link packet to %s", nextIfaceName)
if err := nextIface.Send(data, string(nextHop)); err != nil { // #nosec G104
log.Printf("[DEBUG-7] Failed to forward link packet: %v", err)
}
}
// Check if this is a link request (initial link establishment)
if pkt.Context == packet.ContextLinkIdentify {
log.Printf("[DEBUG-4] Received link request for destination %x", destHash)
// Look up the destination
t.mutex.RLock()
destIface, exists := t.destinations[string(destHash)]
t.mutex.RUnlock()
if !exists {
log.Printf("[DEBUG-3] No destination registered for hash %x", destHash)
return
}
log.Printf("[DEBUG-5] Found registered destination for %x", destHash)
// Handle the incoming link request
t.handleIncomingLinkRequest(pkt, destIface, iface)
return
}
if link := t.findLink(dest); link != nil {
log.Printf("[DEBUG-6] Updating link timing - Last inbound: %v", time.Unix(int64(timestamp), 0)) // #nosec G115
link.lastInbound = time.Unix(int64(timestamp), 0) // #nosec G115
// Handle regular link packets (for established links)
if link := t.findLink(destHash); link != nil {
log.Printf("[DEBUG-6] Routing packet to established link")
if link.packetCb != nil {
log.Printf("[DEBUG-7] Executing packet callback with %d bytes", len(payload))
p := &packet.Packet{Data: payload}
link.packetCb(payload, p)
log.Printf("[DEBUG-7] Executing packet callback with %d bytes", len(pkt.Data))
link.packetCb(pkt.Data, pkt)
}
} else {
log.Printf("[DEBUG-5] No established link found for destination %x", destHash)
}
}
func (t *Transport) handleIncomingLinkRequest(pkt *packet.Packet, destIface interface{}, networkIface common.NetworkInterface) {
log.Printf("[DEBUG-5] Handling incoming link request")
// The link ID is in the packet data
linkID := pkt.Data
if len(linkID) == 0 {
log.Printf("[DEBUG-3] No link ID in link request packet")
return
}
log.Printf("[DEBUG-5] Link request with ID %x", linkID[:8])
// Call the destination's link established callback directly
// Use reflection to call the method if it exists
destValue := reflect.ValueOf(destIface)
if destValue.IsValid() && !destValue.IsNil() {
// Try to call GetLinkCallback method
method := destValue.MethodByName("GetLinkCallback")
if method.IsValid() {
results := method.Call(nil)
if len(results) > 0 && !results[0].IsNil() {
// The callback is of type common.LinkEstablishedCallback which is func(interface{})
callback := results[0].Interface().(common.LinkEstablishedCallback)
log.Printf("[DEBUG-4] Calling destination's link established callback")
callback(linkID)
} else {
log.Printf("[DEBUG-5] No link established callback set on destination")
}
} else {
log.Printf("[DEBUG-3] Destination does not have GetLinkCallback method")
}
} else {
log.Printf("[DEBUG-3] Invalid destination object")
}
log.Printf("[DEBUG-4] Link request handled successfully")
}
func (t *Transport) handlePathResponse(data []byte, iface common.NetworkInterface) {
if len(data) < 33 { // 32 bytes hash + 1 byte hops minimum
return
@@ -977,7 +1067,11 @@ func (t *Transport) SendPacket(p *packet.Packet) error {
}
log.Printf("[DEBUG-5] Serialized packet size: %d bytes", len(data))
destHash := p.Addresses[:packet.AddressSize]
// Use the DestinationHash field directly for path lookup
destHash := p.DestinationHash
if len(destHash) > 16 {
destHash = destHash[:16]
}
log.Printf("[DEBUG-6] Destination hash: %x", destHash)
path, exists := t.paths[string(destHash)]