Enhance Transport struct with path request handling and cleanup mechanisms; introduce DiscoveryPathRequest and PathAnnounceEntry types for improved path management, and implement maintenance jobs for expired paths, discovery requests, and announces.

This commit is contained in:
2025-12-01 20:30:53 -06:00
parent 1d83e7f539
commit b489135c5b

View File

@@ -14,6 +14,7 @@ import (
"github.com/Sudo-Ivan/reticulum-go/pkg/announce" "github.com/Sudo-Ivan/reticulum-go/pkg/announce"
"github.com/Sudo-Ivan/reticulum-go/pkg/common" "github.com/Sudo-Ivan/reticulum-go/pkg/common"
"github.com/Sudo-Ivan/reticulum-go/pkg/debug" "github.com/Sudo-Ivan/reticulum-go/pkg/debug"
"github.com/Sudo-Ivan/reticulum-go/pkg/destination"
"github.com/Sudo-Ivan/reticulum-go/pkg/identity" "github.com/Sudo-Ivan/reticulum-go/pkg/identity"
"github.com/Sudo-Ivan/reticulum-go/pkg/interfaces" "github.com/Sudo-Ivan/reticulum-go/pkg/interfaces"
"github.com/Sudo-Ivan/reticulum-go/pkg/packet" "github.com/Sudo-Ivan/reticulum-go/pkg/packet"
@@ -104,20 +105,52 @@ type PathInfo struct {
} }
type Transport struct { type Transport struct {
mutex sync.RWMutex mutex sync.RWMutex
config *common.ReticulumConfig config *common.ReticulumConfig
interfaces map[string]common.NetworkInterface interfaces map[string]common.NetworkInterface
links map[string]*Link links map[string]LinkInterface
destinations map[string]interface{} destinations map[string]interface{}
announceRate *rate.Limiter announceRate *rate.Limiter
seenAnnounces map[string]bool seenAnnounces map[string]bool
pathfinder *pathfinder.PathFinder pathfinder *pathfinder.PathFinder
announceHandlers []announce.Handler announceHandlers []announce.Handler
paths map[string]*common.Path paths map[string]*common.Path
receipts []*packet.PacketReceipt receipts []*packet.PacketReceipt
receiptsMutex sync.RWMutex receiptsMutex sync.RWMutex
pathRequests map[string]time.Time
pathStates map[string]byte
discoveryPathRequests map[string]*DiscoveryPathRequest
discoveryPRTags map[string]bool
announceTable map[string]*PathAnnounceEntry
heldAnnounces map[string]*PathAnnounceEntry
transportIdentity *identity.Identity
pathRequestDest interface{}
} }
type DiscoveryPathRequest struct {
DestinationHash []byte
Timeout time.Time
RequestingIface common.NetworkInterface
}
type PathAnnounceEntry struct {
CreatedAt time.Time
RetransmitTimeout time.Time
Retries int
ReceivedFrom common.NetworkInterface
AnnounceHops byte
Packet *packet.Packet
LocalRebroadcasts int
BlockRebroadcasts bool
AttachedInterface common.NetworkInterface
}
const (
STATE_UNKNOWN = 0x00
STATE_UNRESPONSIVE = 0x01
STATE_RESPONSIVE = 0x02
)
type Path struct { type Path struct {
NextHop []byte NextHop []byte
Interface common.NetworkInterface Interface common.NetworkInterface
@@ -126,21 +159,135 @@ type Path struct {
func NewTransport(cfg *common.ReticulumConfig) *Transport { func NewTransport(cfg *common.ReticulumConfig) *Transport {
t := &Transport{ t := &Transport{
interfaces: make(map[string]common.NetworkInterface), interfaces: make(map[string]common.NetworkInterface),
paths: make(map[string]*common.Path), paths: make(map[string]*common.Path),
seenAnnounces: make(map[string]bool), seenAnnounces: make(map[string]bool),
announceRate: rate.NewLimiter(PROPAGATION_RATE, 1), announceRate: rate.NewLimiter(PROPAGATION_RATE, 1),
mutex: sync.RWMutex{}, mutex: sync.RWMutex{},
config: cfg, config: cfg,
links: make(map[string]*Link), links: make(map[string]LinkInterface),
destinations: make(map[string]interface{}), destinations: make(map[string]interface{}),
pathfinder: pathfinder.NewPathFinder(), pathfinder: pathfinder.NewPathFinder(),
receipts: make([]*packet.PacketReceipt, 0), receipts: make([]*packet.PacketReceipt, 0),
receiptsMutex: sync.RWMutex{}, receiptsMutex: sync.RWMutex{},
pathRequests: make(map[string]time.Time),
pathStates: make(map[string]byte),
discoveryPathRequests: make(map[string]*DiscoveryPathRequest),
discoveryPRTags: make(map[string]bool),
announceTable: make(map[string]*PathAnnounceEntry),
heldAnnounces: make(map[string]*PathAnnounceEntry),
} }
transportIdent, err := identity.LoadOrCreateTransportIdentity()
if err == nil {
t.transportIdentity = transportIdent
}
go t.startMaintenanceJobs()
return t return t
} }
func (t *Transport) startMaintenanceJobs() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
t.cleanupExpiredPaths()
t.cleanupExpiredDiscoveryRequests()
t.cleanupExpiredAnnounces()
t.cleanupExpiredReceipts()
}
}
func (t *Transport) cleanupExpiredPaths() {
t.mutex.Lock()
defer t.mutex.Unlock()
now := time.Now()
pathExpiry := 7 * 24 * time.Hour
for destHash, path := range t.paths {
if now.Sub(path.LastUpdated) > pathExpiry {
delete(t.paths, destHash)
delete(t.pathStates, destHash)
debug.Log(debug.DEBUG_VERBOSE, "Expired path", "dest_hash", fmt.Sprintf("%x", destHash[:8]))
}
}
}
func (t *Transport) cleanupExpiredDiscoveryRequests() {
t.mutex.Lock()
defer t.mutex.Unlock()
now := time.Now()
for destHash, req := range t.discoveryPathRequests {
if now.After(req.Timeout) {
delete(t.discoveryPathRequests, destHash)
debug.Log(debug.DEBUG_VERBOSE, "Expired discovery path request", "dest_hash", fmt.Sprintf("%x", destHash[:8]))
}
}
}
func (t *Transport) cleanupExpiredAnnounces() {
t.mutex.Lock()
defer t.mutex.Unlock()
announceExpiry := 24 * time.Hour
for destHash, entry := range t.announceTable {
if entry != nil && time.Since(entry.CreatedAt) > announceExpiry {
delete(t.announceTable, destHash)
debug.Log(debug.DEBUG_VERBOSE, "Expired announce entry", "dest_hash", fmt.Sprintf("%x", destHash[:8]))
}
}
for destHash, entry := range t.heldAnnounces {
if entry != nil && time.Since(entry.CreatedAt) > announceExpiry {
delete(t.heldAnnounces, destHash)
}
}
}
func (t *Transport) cleanupExpiredReceipts() {
t.receiptsMutex.Lock()
defer t.receiptsMutex.Unlock()
validReceipts := make([]*packet.PacketReceipt, 0)
for _, receipt := range t.receipts {
if receipt != nil && !receipt.IsTimedOut() {
status := receipt.GetStatus()
if status == packet.RECEIPT_SENT || status == packet.RECEIPT_DELIVERED {
validReceipts = append(validReceipts, receipt)
}
}
}
if len(validReceipts) < len(t.receipts) {
t.receipts = validReceipts
debug.Log(debug.DEBUG_VERBOSE, "Cleaned up expired receipts", "remaining", len(validReceipts))
}
}
func (t *Transport) MarkPathUnresponsive(destHash []byte) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.pathStates[string(destHash)] = STATE_UNRESPONSIVE
}
func (t *Transport) MarkPathResponsive(destHash []byte) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.pathStates[string(destHash)] = STATE_RESPONSIVE
}
func (t *Transport) PathIsUnresponsive(destHash []byte) bool {
t.mutex.RLock()
defer t.mutex.RUnlock()
state, exists := t.pathStates[string(destHash)]
return exists && state == STATE_UNRESPONSIVE
}
// RegisterDestination registers a destination to receive incoming link requests // RegisterDestination registers a destination to receive incoming link requests
func (t *Transport) RegisterDestination(hash []byte, dest interface{}) { func (t *Transport) RegisterDestination(hash []byte, dest interface{}) {
t.mutex.Lock() t.mutex.Lock()
@@ -478,7 +625,7 @@ func (t *Transport) HandleAnnounce(data []byte, sourceIface common.NetworkInterf
debug.Log(debug.DEBUG_ALL, "Failed to generate random delay", "error", err) debug.Log(debug.DEBUG_ALL, "Failed to generate random delay", "error", err)
delay = time.Duration(0) // Default to no delay on error delay = time.Duration(0) // Default to no delay on error
} else { } else {
delay = time.Duration(binary.BigEndian.Uint64(b)%2000) * time.Millisecond // #nosec G115 delay = time.Duration(binary.BigEndian.Uint64(b)%2000) * time.Millisecond // #nosec G115
} }
time.Sleep(delay) time.Sleep(delay)
@@ -700,7 +847,7 @@ func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) {
t.handleLinkPacket(data[1:], iface, 0x00) t.handleLinkPacket(data[1:], iface, 0x00)
} else { } else {
debug.Log(debug.DEBUG_ERROR, "Processing data packet (type 0x00)", "packet_size", len(data), "dest_type", destType, "header_type", headerType) debug.Log(debug.DEBUG_ERROR, "Processing data packet (type 0x00)", "packet_size", len(data), "dest_type", destType, "header_type", headerType)
t.handleTransportPacket(data[1:], iface) t.handleTransportPacket(data[1:], iface)
} }
default: default:
debug.Log(debug.DEBUG_INFO, "Unknown packet type", "type", fmt.Sprintf("0x%02x", packetType), "source", iface.GetName()) debug.Log(debug.DEBUG_INFO, "Unknown packet type", "type", fmt.Sprintf("0x%02x", packetType), "source", iface.GetName())
@@ -751,7 +898,7 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterf
var destinationHash []byte var destinationHash []byte
var context byte var context byte
var payload []byte var payload []byte
if headerType == 0 { if headerType == 0 {
// HEADER_TYPE_1: Header(2) + DestHash(16) + Context(1) + Data // HEADER_TYPE_1: Header(2) + DestHash(16) + Context(1) + Data
destinationHash = data[startIdx : startIdx+16] destinationHash = data[startIdx : startIdx+16]
@@ -849,8 +996,8 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterf
// Validate destination hash according to RNS spec: // Validate destination hash according to RNS spec:
// expected_hash = SHA256(name_hash + identity_hash)[:16] // expected_hash = SHA256(name_hash + identity_hash)[:16]
hashMaterial := make([]byte, 0) hashMaterial := make([]byte, 0)
hashMaterial = append(hashMaterial, nameHash...) // Name hash (10 bytes) first hashMaterial = append(hashMaterial, nameHash...) // Name hash (10 bytes) first
hashMaterial = append(hashMaterial, id.Hash()...) // Identity hash (16 bytes) second hashMaterial = append(hashMaterial, id.Hash()...) // Identity hash (16 bytes) second
expectedHashFull := sha256.Sum256(hashMaterial) expectedHashFull := sha256.Sum256(hashMaterial)
expectedHash := expectedHashFull[:16] expectedHash := expectedHashFull[:16]
@@ -950,39 +1097,39 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterf
func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface, packetType byte) { func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface, packetType byte) {
debug.Log(debug.DEBUG_ERROR, "Handling link packet", "bytes", len(data), "packet_type", fmt.Sprintf("0x%02x", packetType), "PACKET_TYPE_LINK", fmt.Sprintf("0x%02x", PACKET_TYPE_LINK)) debug.Log(debug.DEBUG_ERROR, "Handling link packet", "bytes", len(data), "packet_type", fmt.Sprintf("0x%02x", packetType), "PACKET_TYPE_LINK", fmt.Sprintf("0x%02x", PACKET_TYPE_LINK))
pkt := &packet.Packet{} pkt := &packet.Packet{}
// If this is a LINKREQUEST packet (type=0x02), handle it as link establishment // If this is a LINKREQUEST packet (type=0x02), handle it as link establishment
if packetType == PACKET_TYPE_LINK { if packetType == PACKET_TYPE_LINK {
debug.Log(debug.DEBUG_ERROR, "Processing LINKREQUEST (type=0x02)") debug.Log(debug.DEBUG_ERROR, "Processing LINKREQUEST (type=0x02)")
// Parse as LINKREQUEST packet - prepend the packet type // Parse as LINKREQUEST packet - prepend the packet type
pkt.Raw = append([]byte{PACKET_TYPE_LINK}, data...) pkt.Raw = append([]byte{PACKET_TYPE_LINK}, data...)
if err := pkt.Unpack(); err != nil { if err := pkt.Unpack(); err != nil {
debug.Log(debug.DEBUG_ERROR, "Failed to unpack link request", "error", err) debug.Log(debug.DEBUG_ERROR, "Failed to unpack link request", "error", err)
return return
} }
destHash := pkt.DestinationHash
if len(destHash) > 16 {
destHash = destHash[:16]
}
destHash := pkt.DestinationHash
if len(destHash) > 16 {
destHash = destHash[:16]
}
debug.Log(debug.DEBUG_ERROR, "Link request for destination", "hash", fmt.Sprintf("%x", destHash)) debug.Log(debug.DEBUG_ERROR, "Link request for destination", "hash", fmt.Sprintf("%x", destHash))
// Look up the destination // Look up the destination
t.mutex.RLock() t.mutex.RLock()
destIface, exists := t.destinations[string(destHash)] destIface, exists := t.destinations[string(destHash)]
t.mutex.RUnlock() t.mutex.RUnlock()
if !exists { if !exists {
debug.Log(debug.DEBUG_ERROR, "No destination registered for hash", "hash", fmt.Sprintf("%x", destHash)) debug.Log(debug.DEBUG_ERROR, "No destination registered for hash", "hash", fmt.Sprintf("%x", destHash))
return return
} }
debug.Log(debug.DEBUG_ERROR, "Found registered destination", "hash", fmt.Sprintf("%x", destHash)) debug.Log(debug.DEBUG_ERROR, "Found registered destination", "hash", fmt.Sprintf("%x", destHash))
// Handle the incoming link request // Handle the incoming link request
t.handleIncomingLinkRequest(pkt, destIface, iface) t.handleIncomingLinkRequest(pkt, destIface, iface)
return return
@@ -990,36 +1137,31 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface,
// Otherwise, this is a data packet for an established link (destType=2, packetType=0x00) // Otherwise, this is a data packet for an established link (destType=2, packetType=0x00)
debug.Log(debug.DEBUG_ERROR, "Processing link data packet") debug.Log(debug.DEBUG_ERROR, "Processing link data packet")
// Parse as data packet - prepend packet type 0x00 // Parse as data packet - prepend packet type 0x00
pkt.Raw = append([]byte{0x00}, data...) pkt.Raw = append([]byte{0x00}, data...)
if err := pkt.Unpack(); err != nil { if err := pkt.Unpack(); err != nil {
debug.Log(debug.DEBUG_ERROR, "Failed to unpack link data packet", "error", err) debug.Log(debug.DEBUG_ERROR, "Failed to unpack link data packet", "error", err)
return return
} }
// For link data packets, the destination hash is actually the link ID // For link data packets, the destination hash is actually the link ID
linkID := pkt.DestinationHash linkID := pkt.DestinationHash
if len(linkID) > 16 { if len(linkID) > 16 {
linkID = linkID[:16] linkID = linkID[:16]
} }
debug.Log(debug.DEBUG_ERROR, "Link data for link ID", "link_id", fmt.Sprintf("%x", linkID)) debug.Log(debug.DEBUG_ERROR, "Link data for link ID", "link_id", fmt.Sprintf("%x", linkID))
// Find the established link // Find the established link
t.mutex.RLock() t.mutex.RLock()
link, exists := t.links[string(linkID)] linkObj, exists := t.links[string(linkID)]
t.mutex.RUnlock() t.mutex.RUnlock()
if exists && link != nil { if exists && linkObj != nil {
debug.Log(debug.DEBUG_VERBOSE, "Routing packet to established link") debug.Log(debug.DEBUG_VERBOSE, "Routing packet to established link")
if linkObj, ok := link.(interface{ HandleInbound(*packet.Packet) error }); ok { if err := linkObj.HandleInbound(pkt); err != nil {
if err := linkObj.HandleInbound(pkt); err != nil { debug.Log(debug.DEBUG_ERROR, "Error handling inbound packet", "error", err)
debug.Log(debug.DEBUG_ERROR, "Error handling inbound packet", "error", err)
}
} else if link.packetCb != nil {
debug.Log(debug.DEBUG_VERBOSE, "Executing packet callback", "bytes", len(pkt.Data))
link.packetCb(pkt.Data, pkt)
} }
} else { } else {
debug.Log(debug.DEBUG_INFO, "No established link found for link ID", "link_id", fmt.Sprintf("%x", linkID)) debug.Log(debug.DEBUG_INFO, "No established link found for link ID", "link_id", fmt.Sprintf("%x", linkID))
@@ -1028,16 +1170,16 @@ func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface,
func (t *Transport) handleIncomingLinkRequest(pkt *packet.Packet, destIface interface{}, networkIface common.NetworkInterface) { func (t *Transport) handleIncomingLinkRequest(pkt *packet.Packet, destIface interface{}, networkIface common.NetworkInterface) {
debug.Log(debug.DEBUG_TRACE, "Handling incoming link request") debug.Log(debug.DEBUG_TRACE, "Handling incoming link request")
// The link ID is in the packet data // The link ID is in the packet data
linkID := pkt.Data linkID := pkt.Data
if len(linkID) == 0 { if len(linkID) == 0 {
debug.Log(debug.DEBUG_INFO, "No link ID in link request packet") debug.Log(debug.DEBUG_INFO, "No link ID in link request packet")
return return
} }
debug.Log(debug.DEBUG_TRACE, "Link request with ID", "id", fmt.Sprintf("%x", linkID[:8])) debug.Log(debug.DEBUG_TRACE, "Link request with ID", "id", fmt.Sprintf("%x", linkID[:8]))
// Call the destination's HandleIncomingLinkRequest method // Call the destination's HandleIncomingLinkRequest method
destValue := reflect.ValueOf(destIface) destValue := reflect.ValueOf(destIface)
if destValue.IsValid() && !destValue.IsNil() { if destValue.IsValid() && !destValue.IsNil() {
@@ -1084,20 +1226,190 @@ func (t *Transport) handlePathResponse(data []byte, iface common.NetworkInterfac
} }
func (t *Transport) handleTransportPacket(data []byte, iface common.NetworkInterface) { func (t *Transport) handleTransportPacket(data []byte, iface common.NetworkInterface) {
// Handle transport packet if len(data) < 2 {
return
}
headerByte := data[0]
packetType := headerByte & 0x03
destType := (headerByte & 0x0C) >> 2
if packetType == packet.PacketTypeData && destType == DEST_TYPE_PLAIN {
if len(data) < 19 {
return
}
context := data[18]
if context == packet.ContextPathResponse {
t.handlePathResponse(data[19:], iface)
}
}
} }
func (t *Transport) findLink(dest []byte) *Link { func (t *Transport) InitializePathRequestHandler() error {
t.mutex.RLock() if t.transportIdentity == nil {
defer t.mutex.RUnlock() return errors.New("transport identity not initialized")
// Use dest to lookup link in map
if link, exists := t.links[string(dest)]; exists {
return link
} }
pathRequestDest, err := destination.New(t.transportIdentity, destination.IN, destination.PLAIN, "rnstransport", t, "path", "request")
if err != nil {
return fmt.Errorf("failed to create path request destination: %w", err)
}
pathRequestDest.SetPacketCallback(func(data []byte, iface common.NetworkInterface) {
t.handlePathRequest(data, iface)
})
pathRequestDest.AcceptsLinks(true)
t.pathRequestDest = pathRequestDest
t.RegisterDestination(pathRequestDest.GetHash(), pathRequestDest)
debug.Log(debug.DEBUG_INFO, "Path request handler initialized")
return nil return nil
} }
func (t *Transport) handlePathRequest(data []byte, iface common.NetworkInterface) {
if len(data) < identity.TRUNCATED_HASHLENGTH/8 {
debug.Log(debug.DEBUG_INFO, "Path request too short")
return
}
destHash := data[:identity.TRUNCATED_HASHLENGTH/8]
var requestorTransportID []byte
var tag []byte
if len(data) > identity.TRUNCATED_HASHLENGTH/8*2 {
requestorTransportID = data[identity.TRUNCATED_HASHLENGTH/8 : identity.TRUNCATED_HASHLENGTH/8*2]
tag = data[identity.TRUNCATED_HASHLENGTH/8*2:]
if len(tag) > identity.TRUNCATED_HASHLENGTH/8 {
tag = tag[:identity.TRUNCATED_HASHLENGTH/8]
}
} else if len(data) > identity.TRUNCATED_HASHLENGTH/8 {
tag = data[identity.TRUNCATED_HASHLENGTH/8:]
if len(tag) > identity.TRUNCATED_HASHLENGTH/8 {
tag = tag[:identity.TRUNCATED_HASHLENGTH/8]
}
}
if tag == nil {
debug.Log(debug.DEBUG_INFO, "Ignoring tagless path request", "dest_hash", fmt.Sprintf("%x", destHash))
return
}
uniqueTag := append(destHash, tag...)
tagStr := string(uniqueTag)
t.mutex.Lock()
if t.discoveryPRTags[tagStr] {
t.mutex.Unlock()
debug.Log(debug.DEBUG_INFO, "Ignoring duplicate path request", "dest_hash", fmt.Sprintf("%x", destHash), "tag", fmt.Sprintf("%x", tag))
return
}
t.discoveryPRTags[tagStr] = true
if len(t.discoveryPRTags) > 32000 {
t.discoveryPRTags = make(map[string]bool)
}
t.mutex.Unlock()
t.processPathRequest(destHash, iface, requestorTransportID, tag)
}
func (t *Transport) processPathRequest(destHash []byte, attachedIface common.NetworkInterface, requestorTransportID []byte, tag []byte) {
destHashStr := string(destHash)
debug.Log(debug.DEBUG_INFO, "Processing path request", "dest_hash", fmt.Sprintf("%x", destHash))
t.mutex.RLock()
localDest, isLocal := t.destinations[destHashStr]
path, hasPath := t.paths[destHashStr]
t.mutex.RUnlock()
if isLocal {
if dest, ok := localDest.(*destination.Destination); ok {
debug.Log(debug.DEBUG_INFO, "Answering path request for local destination", "dest_hash", fmt.Sprintf("%x", destHash))
dest.Announce(true, tag, attachedIface)
}
return
}
if hasPath {
nextHop := path.NextHop
if requestorTransportID != nil && string(nextHop) == string(requestorTransportID) {
debug.Log(debug.DEBUG_INFO, "Not answering path request, next hop is requestor", "dest_hash", fmt.Sprintf("%x", destHash))
return
}
debug.Log(debug.DEBUG_INFO, "Answering path request with known path", "dest_hash", fmt.Sprintf("%x", destHash), "hops", path.HopCount)
t.mutex.RLock()
announceEntry, hasAnnounce := t.announceTable[destHashStr]
t.mutex.RUnlock()
if hasAnnounce && announceEntry != nil {
now := time.Now()
retries := 1
localRebroadcasts := 0
blockRebroadcasts := true
announceHops := path.HopCount
retransmitTimeout := now.Add(time.Duration(400) * time.Millisecond)
entry := &PathAnnounceEntry{
CreatedAt: now,
RetransmitTimeout: retransmitTimeout,
Retries: retries,
ReceivedFrom: path.Interface,
AnnounceHops: announceHops,
Packet: announceEntry.Packet,
LocalRebroadcasts: localRebroadcasts,
BlockRebroadcasts: blockRebroadcasts,
AttachedInterface: attachedIface,
}
t.mutex.Lock()
if _, held := t.announceTable[destHashStr]; held {
t.heldAnnounces[destHashStr] = t.announceTable[destHashStr]
}
t.announceTable[destHashStr] = entry
t.mutex.Unlock()
}
return
}
if attachedIface != nil {
debug.Log(debug.DEBUG_INFO, "Attempting to discover unknown path", "dest_hash", fmt.Sprintf("%x", destHash))
t.mutex.Lock()
if _, exists := t.discoveryPathRequests[destHashStr]; exists {
t.mutex.Unlock()
debug.Log(debug.DEBUG_INFO, "Path request already pending", "dest_hash", fmt.Sprintf("%x", destHash))
return
}
prEntry := &DiscoveryPathRequest{
DestinationHash: destHash,
Timeout: time.Now().Add(15 * time.Second),
RequestingIface: attachedIface,
}
t.discoveryPathRequests[destHashStr] = prEntry
t.mutex.Unlock()
for name, iface := range t.interfaces {
if iface != attachedIface && iface.IsEnabled() {
req := &PathRequest{
DestinationHash: destHash,
Tag: tag,
TTL: 15,
Recursive: true,
}
t.sendPathRequest(req, name)
}
}
} else {
debug.Log(debug.DEBUG_INFO, "Ignoring path request, no path known", "dest_hash", fmt.Sprintf("%x", destHash))
}
}
func (t *Transport) SendPacket(p *packet.Packet) error { func (t *Transport) SendPacket(p *packet.Packet) error {
t.mutex.RLock() t.mutex.RLock()
defer t.mutex.RUnlock() defer t.mutex.RUnlock()
@@ -1144,39 +1456,22 @@ func (t *Transport) SendPacket(p *packet.Packet) error {
return nil return nil
} }
func (t *Transport) GetLink(destHash []byte) (*Link, error) { func (t *Transport) RegisterLink(linkID []byte, linkObj LinkInterface) {
t.mutex.RLock()
defer t.mutex.RUnlock()
link, exists := t.links[string(destHash)]
if !exists {
// Create new link if it doesn't exist
link = NewLink(
destHash,
nil, // established callback
nil, // closed callback
)
t.links[string(destHash)] = link
}
return link, nil
}
func (t *Transport) RegisterLink(linkID []byte, link interface{}) {
t.mutex.Lock() t.mutex.Lock()
defer t.mutex.Unlock() defer t.mutex.Unlock()
if len(linkID) > 16 { if len(linkID) > 16 {
linkID = linkID[:16] linkID = linkID[:16]
} }
t.links[string(linkID)] = link
t.links[string(linkID)] = linkObj
debug.Log(debug.DEBUG_VERBOSE, "Registered link", "link_id", fmt.Sprintf("%x", linkID)) debug.Log(debug.DEBUG_VERBOSE, "Registered link", "link_id", fmt.Sprintf("%x", linkID))
} }
func (t *Transport) UnregisterLink(linkID []byte) { func (t *Transport) UnregisterLink(linkID []byte) {
t.mutex.Lock() t.mutex.Lock()
defer t.mutex.Unlock() defer t.mutex.Unlock()
if len(linkID) > 16 { if len(linkID) > 16 {
linkID = linkID[:16] linkID = linkID[:16]
} }
@@ -1277,13 +1572,16 @@ func (t *Transport) Start() error {
// LinkInterface defines the methods required by Channel // LinkInterface defines the methods required by Channel
type LinkInterface interface { type LinkInterface interface {
GetStatus() int GetStatus() byte
GetRTT() float64 GetRTT() float64
RTT() float64 RTT() float64
GetLinkID() []byte
Send(data []byte) interface{} Send(data []byte) interface{}
Resend(packet interface{}) error Resend(packet interface{}) error
SetPacketTimeout(packet interface{}, callback func(interface{}), timeout time.Duration) SetPacketTimeout(packet interface{}, callback func(interface{}), timeout time.Duration)
SetPacketDelivered(packet interface{}, callback func(interface{})) SetPacketDelivered(packet interface{}, callback func(interface{}))
HandleInbound(pkt *packet.Packet) error
ValidateLinkProof(pkt *packet.Packet) error
} }
func (l *Link) GetRTT() float64 { func (l *Link) GetRTT() float64 {
@@ -1445,7 +1743,7 @@ func (t *Transport) RegisterReceipt(receipt *packet.PacketReceipt) {
func (t *Transport) UnregisterReceipt(receipt *packet.PacketReceipt) { func (t *Transport) UnregisterReceipt(receipt *packet.PacketReceipt) {
t.receiptsMutex.Lock() t.receiptsMutex.Lock()
defer t.receiptsMutex.Unlock() defer t.receiptsMutex.Unlock()
for i, r := range t.receipts { for i, r := range t.receipts {
if r == receipt { if r == receipt {
t.receipts = append(t.receipts[:i], t.receipts[i+1:]...) t.receipts = append(t.receipts[:i], t.receipts[i+1:]...)
@@ -1457,31 +1755,29 @@ func (t *Transport) UnregisterReceipt(receipt *packet.PacketReceipt) {
func (t *Transport) handleProofPacket(pkt *packet.Packet, iface common.NetworkInterface) { func (t *Transport) handleProofPacket(pkt *packet.Packet, iface common.NetworkInterface) {
debug.Log(debug.DEBUG_PACKETS, "Processing proof packet", "size", len(pkt.Data), "context", fmt.Sprintf("0x%02x", pkt.Context)) debug.Log(debug.DEBUG_PACKETS, "Processing proof packet", "size", len(pkt.Data), "context", fmt.Sprintf("0x%02x", pkt.Context))
if pkt.Context == packet.ContextLRProof { if pkt.Context == packet.ContextLRProof {
linkID := pkt.DestinationHash linkID := pkt.DestinationHash
if len(linkID) > 16 { if len(linkID) > 16 {
linkID = linkID[:16] linkID = linkID[:16]
} }
t.mutex.RLock() t.mutex.RLock()
link, exists := t.links[string(linkID)] link, exists := t.links[string(linkID)]
t.mutex.RUnlock() t.mutex.RUnlock()
if exists && link != nil { if exists && link != nil {
if linkObj, ok := link.(interface{ ValidateLinkProof(*packet.Packet) error }); ok { if err := link.ValidateLinkProof(pkt); err != nil {
if err := linkObj.ValidateLinkProof(pkt); err != nil { debug.Log(debug.DEBUG_ERROR, "Link proof validation failed", "error", err)
debug.Log(debug.DEBUG_ERROR, "Link proof validation failed", "error", err) } else {
} else { debug.Log(debug.DEBUG_INFO, "Link proof validated successfully")
debug.Log(debug.DEBUG_INFO, "Link proof validated successfully")
}
return
} }
return
} }
debug.Log(debug.DEBUG_INFO, "No link found for proof packet", "link_id", fmt.Sprintf("%x", linkID)) debug.Log(debug.DEBUG_INFO, "No link found for proof packet", "link_id", fmt.Sprintf("%x", linkID))
return return
} }
var proofHash []byte var proofHash []byte
if len(pkt.Data) == packet.EXPL_LENGTH { if len(pkt.Data) == packet.EXPL_LENGTH {
proofHash = pkt.Data[:identity.HASHLENGTH/8] proofHash = pkt.Data[:identity.HASHLENGTH/8]
@@ -1489,15 +1785,15 @@ func (t *Transport) handleProofPacket(pkt *packet.Packet, iface common.NetworkIn
} else { } else {
debug.Log(debug.DEBUG_PACKETS, "Implicit proof") debug.Log(debug.DEBUG_PACKETS, "Implicit proof")
} }
t.receiptsMutex.RLock() t.receiptsMutex.RLock()
receipts := make([]*packet.PacketReceipt, len(t.receipts)) receipts := make([]*packet.PacketReceipt, len(t.receipts))
copy(receipts, t.receipts) copy(receipts, t.receipts)
t.receiptsMutex.RUnlock() t.receiptsMutex.RUnlock()
for _, receipt := range receipts { for _, receipt := range receipts {
receiptValidated := false receiptValidated := false
if proofHash != nil { if proofHash != nil {
receiptHash := receipt.GetHash() receiptHash := receipt.GetHash()
if string(receiptHash) == string(proofHash) { if string(receiptHash) == string(proofHash) {
@@ -1506,13 +1802,13 @@ func (t *Transport) handleProofPacket(pkt *packet.Packet, iface common.NetworkIn
} else { } else {
receiptValidated = receipt.ValidateProofPacket(pkt) receiptValidated = receipt.ValidateProofPacket(pkt)
} }
if receiptValidated { if receiptValidated {
debug.Log(debug.DEBUG_PACKETS, "Proof validated for receipt") debug.Log(debug.DEBUG_PACKETS, "Proof validated for receipt")
t.UnregisterReceipt(receipt) t.UnregisterReceipt(receipt)
return return
} }
} }
debug.Log(debug.DEBUG_PACKETS, "No matching receipt for proof") debug.Log(debug.DEBUG_PACKETS, "No matching receipt for proof")
} }