diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index a0c5b88..fb264f4 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -9,6 +9,7 @@ import ( "time" "github.com/Sudo-Ivan/reticulum-go/pkg/common" + "github.com/Sudo-Ivan/reticulum-go/pkg/identity" "github.com/Sudo-Ivan/reticulum-go/pkg/packet" ) @@ -18,17 +19,29 @@ var ( ) const ( - PathfinderM = 128 // Maximum number of hops + PathfinderM = 128 // Maximum number of hops that Reticulum will transport a packet PathRequestTTL = 300 // Time to live for path requests in seconds AnnounceTimeout = 15 // Timeout for announce responses in seconds // Link constants - EstablishmentTimeoutPerHop = 6 // Timeout for link establishment in seconds per hop - KeepaliveTimeoutFactor = 4 // RTT timeout factor for link timeout calculation - StaleGrace = 2 // Grace period in seconds for link timeout - Keepalive = 360 // Interval for sending keep-alive packets in seconds + EstablishmentTimeoutPerHop = 6 // Timeout for link establishment per hop + KeepaliveTimeoutFactor = 4 // RTT timeout factor for link timeout + StaleGrace = 2 // Grace period in seconds + Keepalive = 360 // Interval for sending keep-alive packets StaleTime = 720 // Time after which link is considered stale + // Resource strategies + AcceptNone = 0 + AcceptAll = 1 + AcceptApp = 2 + + // Resource status + ResourceStatusPending = 0x00 + ResourceStatusActive = 0x01 + ResourceStatusComplete = 0x02 + ResourceStatusFailed = 0x03 + ResourceStatusCancelled = 0x04 + // Direction constants OUT = 0x02 IN = 0x01 @@ -117,19 +130,27 @@ func (t *Transport) Close() error { } type Link struct { - destination []byte - establishedAt time.Time - lastInbound time.Time - lastOutbound time.Time - lastData time.Time - rtt time.Duration - establishedCb func() - closedCb func() - packetCb func([]byte) - resourceCb func(interface{}) bool - resourceStrategy int - connectedCb func() - disconnectedCb func() + mutex sync.RWMutex + destination []byte + establishedAt time.Time + lastInbound time.Time + lastOutbound time.Time + lastData time.Time + rtt time.Duration + establishedCb func() + closedCb func() + packetCb func([]byte, *packet.Packet) + resourceCb func(interface{}) bool + resourceStrategy int + resourceStartedCb func(interface{}) + resourceConcludedCb func(interface{}) + remoteIdentifiedCb func(*Link, []byte) + connectedCb func() + disconnectedCb func() + remoteIdentity []byte + physicalStats bool + staleTime time.Duration + staleGrace time.Duration } type Destination struct { @@ -149,6 +170,8 @@ func NewLink(dest []byte, establishedCallback func(), closedCallback func()) *Li lastData: time.Now(), establishedCb: establishedCallback, closedCb: closedCallback, + staleTime: time.Duration(StaleTime) * time.Second, + staleGrace: time.Duration(StaleGrace) * time.Second, } } @@ -178,7 +201,7 @@ func (l *Link) InactiveFor() time.Duration { return outbound } -func (l *Link) SetPacketCallback(cb func([]byte)) { +func (l *Link) SetPacketCallback(cb func([]byte, *packet.Packet)) { l.packetCb = cb } @@ -221,6 +244,14 @@ type AnnounceHandler interface { func (t *Transport) RegisterAnnounceHandler(handler AnnounceHandler) { t.handlerLock.Lock() defer t.handlerLock.Unlock() + + // Check for duplicate handlers + for _, h := range t.announceHandlers { + if h == handler { + return + } + } + t.announceHandlers = append(t.announceHandlers, handler) } @@ -321,12 +352,14 @@ func (t *Transport) UpdatePath(destinationHash []byte, nextHop []byte, interface } } -func (t *Transport) HandleAnnounce(destinationHash []byte, identity []byte, appData []byte) { +func (t *Transport) HandleAnnounce(destinationHash []byte, identity []byte, appData []byte, announceHash []byte) { t.handlerLock.RLock() defer t.handlerLock.RUnlock() for _, handler := range t.announceHandlers { - handler.ReceivedAnnounce(destinationHash, identity, appData) + if handler.ReceivePathResponses() || announceHash != nil { + handler.ReceivedAnnounce(destinationHash, identity, appData) + } } } @@ -469,7 +502,7 @@ func SendAnnounce(packet []byte) error { return lastErr } -func (t *Transport) HandlePacket(data []byte, iface interface{}) { +func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) { if len(data) < 1 { return } @@ -487,7 +520,7 @@ func (t *Transport) HandlePacket(data []byte, iface interface{}) { } } -func (t *Transport) handlePathRequest(data []byte, iface interface{}) { +func (t *Transport) handlePathRequest(data []byte, iface common.NetworkInterface) { if len(data) < 33 { // 32 bytes hash + 1 byte TTL minimum return } @@ -517,9 +550,7 @@ func (t *Transport) handlePathRequest(data []byte, iface interface{}) { response = append(response, tag...) } - if i, ok := iface.(common.NetworkInterface); ok { - i.Send(response, "") - } + iface.Send(response, "") } else if recursive && ttl > 0 { // Forward path request to other interfaces newData := make([]byte, len(data)) @@ -527,14 +558,14 @@ func (t *Transport) handlePathRequest(data []byte, iface interface{}) { newData[32] = ttl - 1 // Decrease TTL for name, otherIface := range t.interfaces { - if name != iface.(common.NetworkInterface).GetName() && otherIface.IsEnabled() { + if name != iface.GetName() && otherIface.IsEnabled() { otherIface.Send(newData, "") } } } } -func (t *Transport) handleLinkPacket(data []byte, iface interface{}) { +func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface) { if len(data) < 40 { // 32 bytes dest + 8 bytes timestamp minimum return } @@ -546,10 +577,13 @@ func (t *Transport) handleLinkPacket(data []byte, iface interface{}) { // Check if we're the destination if t.HasPath(dest) { nextHop := t.NextHop(dest) - nextIface := t.NextHopInterface(dest) + nextIfaceName := t.NextHopInterface(dest) - if iface, ok := t.interfaces[nextIface]; ok { - iface.Send(data, string(nextHop)) + // Only forward if received on different interface + if nextIfaceName != iface.GetName() { + if nextIface, ok := t.interfaces[nextIfaceName]; ok { + nextIface.Send(data, string(nextHop)) + } } } @@ -557,12 +591,17 @@ func (t *Transport) handleLinkPacket(data []byte, iface interface{}) { if link := t.findLink(dest); link != nil { link.lastInbound = time.Unix(int64(timestamp), 0) if link.packetCb != nil { - link.packetCb(payload) + // Create a packet object to pass to callback + p := &packet.Packet{ + Data: payload, + // Add other necessary packet fields + } + link.packetCb(payload, p) } } } -func (t *Transport) handlePathResponse(data []byte, iface interface{}) { +func (t *Transport) handlePathResponse(data []byte, iface common.NetworkInterface) { if len(data) < 33 { // 32 bytes hash + 1 byte hops minimum return } @@ -575,19 +614,19 @@ func (t *Transport) handlePathResponse(data []byte, iface interface{}) { nextHop = data[33:] } - // Update path information - if i, ok := iface.(common.NetworkInterface); ok { - t.UpdatePath(destHash, nextHop, i.GetName(), hops) + // Use interface name when updating path + if iface != nil { + t.UpdatePath(destHash, nextHop, iface.GetName(), hops) } } -func (t *Transport) handleAnnouncePacket(data []byte, iface interface{}) { - if len(data) < 32 { // 32 bytes minimum for hash +func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) { + if len(data) < 32 { return } destHash := data[:32] - var identity, appData []byte + var identityData, appData []byte if len(data) > 32 { splitPoint := 32 @@ -597,21 +636,29 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface interface{}) { break } } - identity = data[32:splitPoint] + identityData = data[32:splitPoint] if splitPoint < len(data)-1 { appData = data[splitPoint+1:] } } - t.HandleAnnounce(destHash, identity, appData) + // Use identity package's GetRandomHash + announceHash := identity.GetRandomHash() + + // Use interface name in announce handling + if iface != nil { + t.HandleAnnounce(destHash, identityData, appData, announceHash) + } } func (t *Transport) findLink(dest []byte) *Link { t.mutex.RLock() defer t.mutex.RUnlock() - // This is a simplified version - you might want to maintain a map of active links - // in the Transport struct for better performance + // Use dest to lookup link in map + if link, exists := t.links[string(dest)]; exists { + return link + } return nil } @@ -643,24 +690,23 @@ func (t *Transport) SendPacket(p *packet.Packet) error { func (t *Transport) GetLink(destHash []byte) (*Link, error) { t.mutex.RLock() defer t.mutex.RUnlock() - + link, exists := t.links[string(destHash)] if !exists { // Create new link if it doesn't exist link = NewLink( destHash, - nil, // established callback - nil, // closed callback + nil, // established callback + nil, // closed callback ) t.links[string(destHash)] = link } - + return link, nil } func (l *Link) OnConnected(cb func()) { l.connectedCb = cb - // If already established, trigger callback immediately if !l.establishedAt.IsZero() && cb != nil { cb() } @@ -669,3 +715,77 @@ func (l *Link) OnConnected(cb func()) { func (l *Link) OnDisconnected(cb func()) { l.disconnectedCb = cb } + +func (l *Link) GetRemoteIdentity() []byte { + return l.remoteIdentity +} + +func (l *Link) TrackPhyStats(track bool) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.physicalStats = track +} + +func (l *Link) GetRSSI() int { + // Implement physical layer stats + return 0 +} + +func (l *Link) GetSNR() float64 { + // Implement physical layer stats + return 0 +} + +func (l *Link) GetQ() float64 { + // Implement physical layer stats + return 0 +} + +func (l *Link) SetResourceStrategy(strategy int) error { + l.mutex.Lock() + defer l.mutex.Unlock() + + if strategy != AcceptNone && strategy != AcceptAll && strategy != AcceptApp { + return errors.New("invalid resource strategy") + } + + l.resourceStrategy = strategy + return nil +} + +func (l *Link) SetResourceStartedCallback(cb func(interface{})) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.resourceStartedCb = cb +} + +func (l *Link) SetResourceConcludedCallback(cb func(interface{})) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.resourceConcludedCb = cb +} + +func (l *Link) SetRemoteIdentifiedCallback(cb func(*Link, []byte)) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.remoteIdentifiedCb = cb +} + +func (l *Link) HandleResource(resource interface{}) bool { + l.mutex.RLock() + defer l.mutex.RUnlock() + + switch l.resourceStrategy { + case AcceptNone: + return false + case AcceptAll: + return true + case AcceptApp: + if l.resourceCb != nil { + return l.resourceCb(resource) + } + return false + default: + return false + } +}