transport: update constants, functions

This commit is contained in:
Sudo-Ivan
2024-12-31 11:21:55 -06:00
parent 59cef5e56a
commit f2c146b7c5

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/Sudo-Ivan/reticulum-go/pkg/common" "github.com/Sudo-Ivan/reticulum-go/pkg/common"
"github.com/Sudo-Ivan/reticulum-go/pkg/identity"
"github.com/Sudo-Ivan/reticulum-go/pkg/packet" "github.com/Sudo-Ivan/reticulum-go/pkg/packet"
) )
@@ -18,17 +19,29 @@ var (
) )
const ( const (
PathfinderM = 128 // Maximum number of hops PathfinderM = 128 // Maximum number of hops that Reticulum will transport a packet
PathRequestTTL = 300 // Time to live for path requests in seconds PathRequestTTL = 300 // Time to live for path requests in seconds
AnnounceTimeout = 15 // Timeout for announce responses in seconds AnnounceTimeout = 15 // Timeout for announce responses in seconds
// Link constants // Link constants
EstablishmentTimeoutPerHop = 6 // Timeout for link establishment in seconds per hop EstablishmentTimeoutPerHop = 6 // Timeout for link establishment per hop
KeepaliveTimeoutFactor = 4 // RTT timeout factor for link timeout calculation KeepaliveTimeoutFactor = 4 // RTT timeout factor for link timeout
StaleGrace = 2 // Grace period in seconds for link timeout StaleGrace = 2 // Grace period in seconds
Keepalive = 360 // Interval for sending keep-alive packets in seconds Keepalive = 360 // Interval for sending keep-alive packets
StaleTime = 720 // Time after which link is considered stale StaleTime = 720 // Time after which link is considered stale
// Resource strategies
AcceptNone = 0
AcceptAll = 1
AcceptApp = 2
// Resource status
ResourceStatusPending = 0x00
ResourceStatusActive = 0x01
ResourceStatusComplete = 0x02
ResourceStatusFailed = 0x03
ResourceStatusCancelled = 0x04
// Direction constants // Direction constants
OUT = 0x02 OUT = 0x02
IN = 0x01 IN = 0x01
@@ -117,6 +130,7 @@ func (t *Transport) Close() error {
} }
type Link struct { type Link struct {
mutex sync.RWMutex
destination []byte destination []byte
establishedAt time.Time establishedAt time.Time
lastInbound time.Time lastInbound time.Time
@@ -125,11 +139,18 @@ type Link struct {
rtt time.Duration rtt time.Duration
establishedCb func() establishedCb func()
closedCb func() closedCb func()
packetCb func([]byte) packetCb func([]byte, *packet.Packet)
resourceCb func(interface{}) bool resourceCb func(interface{}) bool
resourceStrategy int resourceStrategy int
resourceStartedCb func(interface{})
resourceConcludedCb func(interface{})
remoteIdentifiedCb func(*Link, []byte)
connectedCb func() connectedCb func()
disconnectedCb func() disconnectedCb func()
remoteIdentity []byte
physicalStats bool
staleTime time.Duration
staleGrace time.Duration
} }
type Destination struct { type Destination struct {
@@ -149,6 +170,8 @@ func NewLink(dest []byte, establishedCallback func(), closedCallback func()) *Li
lastData: time.Now(), lastData: time.Now(),
establishedCb: establishedCallback, establishedCb: establishedCallback,
closedCb: closedCallback, closedCb: closedCallback,
staleTime: time.Duration(StaleTime) * time.Second,
staleGrace: time.Duration(StaleGrace) * time.Second,
} }
} }
@@ -178,7 +201,7 @@ func (l *Link) InactiveFor() time.Duration {
return outbound return outbound
} }
func (l *Link) SetPacketCallback(cb func([]byte)) { func (l *Link) SetPacketCallback(cb func([]byte, *packet.Packet)) {
l.packetCb = cb l.packetCb = cb
} }
@@ -221,6 +244,14 @@ type AnnounceHandler interface {
func (t *Transport) RegisterAnnounceHandler(handler AnnounceHandler) { func (t *Transport) RegisterAnnounceHandler(handler AnnounceHandler) {
t.handlerLock.Lock() t.handlerLock.Lock()
defer t.handlerLock.Unlock() defer t.handlerLock.Unlock()
// Check for duplicate handlers
for _, h := range t.announceHandlers {
if h == handler {
return
}
}
t.announceHandlers = append(t.announceHandlers, handler) t.announceHandlers = append(t.announceHandlers, handler)
} }
@@ -321,14 +352,16 @@ func (t *Transport) UpdatePath(destinationHash []byte, nextHop []byte, interface
} }
} }
func (t *Transport) HandleAnnounce(destinationHash []byte, identity []byte, appData []byte) { func (t *Transport) HandleAnnounce(destinationHash []byte, identity []byte, appData []byte, announceHash []byte) {
t.handlerLock.RLock() t.handlerLock.RLock()
defer t.handlerLock.RUnlock() defer t.handlerLock.RUnlock()
for _, handler := range t.announceHandlers { for _, handler := range t.announceHandlers {
if handler.ReceivePathResponses() || announceHash != nil {
handler.ReceivedAnnounce(destinationHash, identity, appData) handler.ReceivedAnnounce(destinationHash, identity, appData)
} }
} }
}
func (t *Transport) NewDestination(identity interface{}, direction int, destType int, appName string, aspects ...string) *Destination { func (t *Transport) NewDestination(identity interface{}, direction int, destType int, appName string, aspects ...string) *Destination {
return &Destination{ return &Destination{
@@ -469,7 +502,7 @@ func SendAnnounce(packet []byte) error {
return lastErr return lastErr
} }
func (t *Transport) HandlePacket(data []byte, iface interface{}) { func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) {
if len(data) < 1 { if len(data) < 1 {
return return
} }
@@ -487,7 +520,7 @@ func (t *Transport) HandlePacket(data []byte, iface interface{}) {
} }
} }
func (t *Transport) handlePathRequest(data []byte, iface interface{}) { func (t *Transport) handlePathRequest(data []byte, iface common.NetworkInterface) {
if len(data) < 33 { // 32 bytes hash + 1 byte TTL minimum if len(data) < 33 { // 32 bytes hash + 1 byte TTL minimum
return return
} }
@@ -517,9 +550,7 @@ func (t *Transport) handlePathRequest(data []byte, iface interface{}) {
response = append(response, tag...) response = append(response, tag...)
} }
if i, ok := iface.(common.NetworkInterface); ok { iface.Send(response, "")
i.Send(response, "")
}
} else if recursive && ttl > 0 { } else if recursive && ttl > 0 {
// Forward path request to other interfaces // Forward path request to other interfaces
newData := make([]byte, len(data)) newData := make([]byte, len(data))
@@ -527,14 +558,14 @@ func (t *Transport) handlePathRequest(data []byte, iface interface{}) {
newData[32] = ttl - 1 // Decrease TTL newData[32] = ttl - 1 // Decrease TTL
for name, otherIface := range t.interfaces { for name, otherIface := range t.interfaces {
if name != iface.(common.NetworkInterface).GetName() && otherIface.IsEnabled() { if name != iface.GetName() && otherIface.IsEnabled() {
otherIface.Send(newData, "") otherIface.Send(newData, "")
} }
} }
} }
} }
func (t *Transport) handleLinkPacket(data []byte, iface interface{}) { func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface) {
if len(data) < 40 { // 32 bytes dest + 8 bytes timestamp minimum if len(data) < 40 { // 32 bytes dest + 8 bytes timestamp minimum
return return
} }
@@ -546,10 +577,13 @@ func (t *Transport) handleLinkPacket(data []byte, iface interface{}) {
// Check if we're the destination // Check if we're the destination
if t.HasPath(dest) { if t.HasPath(dest) {
nextHop := t.NextHop(dest) nextHop := t.NextHop(dest)
nextIface := t.NextHopInterface(dest) nextIfaceName := t.NextHopInterface(dest)
if iface, ok := t.interfaces[nextIface]; ok { // Only forward if received on different interface
iface.Send(data, string(nextHop)) if nextIfaceName != iface.GetName() {
if nextIface, ok := t.interfaces[nextIfaceName]; ok {
nextIface.Send(data, string(nextHop))
}
} }
} }
@@ -557,12 +591,17 @@ func (t *Transport) handleLinkPacket(data []byte, iface interface{}) {
if link := t.findLink(dest); link != nil { if link := t.findLink(dest); link != nil {
link.lastInbound = time.Unix(int64(timestamp), 0) link.lastInbound = time.Unix(int64(timestamp), 0)
if link.packetCb != nil { if link.packetCb != nil {
link.packetCb(payload) // Create a packet object to pass to callback
p := &packet.Packet{
Data: payload,
// Add other necessary packet fields
}
link.packetCb(payload, p)
} }
} }
} }
func (t *Transport) handlePathResponse(data []byte, iface interface{}) { func (t *Transport) handlePathResponse(data []byte, iface common.NetworkInterface) {
if len(data) < 33 { // 32 bytes hash + 1 byte hops minimum if len(data) < 33 { // 32 bytes hash + 1 byte hops minimum
return return
} }
@@ -575,19 +614,19 @@ func (t *Transport) handlePathResponse(data []byte, iface interface{}) {
nextHop = data[33:] nextHop = data[33:]
} }
// Update path information // Use interface name when updating path
if i, ok := iface.(common.NetworkInterface); ok { if iface != nil {
t.UpdatePath(destHash, nextHop, i.GetName(), hops) t.UpdatePath(destHash, nextHop, iface.GetName(), hops)
} }
} }
func (t *Transport) handleAnnouncePacket(data []byte, iface interface{}) { func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) {
if len(data) < 32 { // 32 bytes minimum for hash if len(data) < 32 {
return return
} }
destHash := data[:32] destHash := data[:32]
var identity, appData []byte var identityData, appData []byte
if len(data) > 32 { if len(data) > 32 {
splitPoint := 32 splitPoint := 32
@@ -597,21 +636,29 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface interface{}) {
break break
} }
} }
identity = data[32:splitPoint] identityData = data[32:splitPoint]
if splitPoint < len(data)-1 { if splitPoint < len(data)-1 {
appData = data[splitPoint+1:] appData = data[splitPoint+1:]
} }
} }
t.HandleAnnounce(destHash, identity, appData) // Use identity package's GetRandomHash
announceHash := identity.GetRandomHash()
// Use interface name in announce handling
if iface != nil {
t.HandleAnnounce(destHash, identityData, appData, announceHash)
}
} }
func (t *Transport) findLink(dest []byte) *Link { func (t *Transport) findLink(dest []byte) *Link {
t.mutex.RLock() t.mutex.RLock()
defer t.mutex.RUnlock() defer t.mutex.RUnlock()
// This is a simplified version - you might want to maintain a map of active links // Use dest to lookup link in map
// in the Transport struct for better performance if link, exists := t.links[string(dest)]; exists {
return link
}
return nil return nil
} }
@@ -660,7 +707,6 @@ func (t *Transport) GetLink(destHash []byte) (*Link, error) {
func (l *Link) OnConnected(cb func()) { func (l *Link) OnConnected(cb func()) {
l.connectedCb = cb l.connectedCb = cb
// If already established, trigger callback immediately
if !l.establishedAt.IsZero() && cb != nil { if !l.establishedAt.IsZero() && cb != nil {
cb() cb()
} }
@@ -669,3 +715,77 @@ func (l *Link) OnConnected(cb func()) {
func (l *Link) OnDisconnected(cb func()) { func (l *Link) OnDisconnected(cb func()) {
l.disconnectedCb = cb l.disconnectedCb = cb
} }
func (l *Link) GetRemoteIdentity() []byte {
return l.remoteIdentity
}
func (l *Link) TrackPhyStats(track bool) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.physicalStats = track
}
func (l *Link) GetRSSI() int {
// Implement physical layer stats
return 0
}
func (l *Link) GetSNR() float64 {
// Implement physical layer stats
return 0
}
func (l *Link) GetQ() float64 {
// Implement physical layer stats
return 0
}
func (l *Link) SetResourceStrategy(strategy int) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if strategy != AcceptNone && strategy != AcceptAll && strategy != AcceptApp {
return errors.New("invalid resource strategy")
}
l.resourceStrategy = strategy
return nil
}
func (l *Link) SetResourceStartedCallback(cb func(interface{})) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.resourceStartedCb = cb
}
func (l *Link) SetResourceConcludedCallback(cb func(interface{})) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.resourceConcludedCb = cb
}
func (l *Link) SetRemoteIdentifiedCallback(cb func(*Link, []byte)) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.remoteIdentifiedCb = cb
}
func (l *Link) HandleResource(resource interface{}) bool {
l.mutex.RLock()
defer l.mutex.RUnlock()
switch l.resourceStrategy {
case AcceptNone:
return false
case AcceptAll:
return true
case AcceptApp:
if l.resourceCb != nil {
return l.resourceCb(resource)
}
return false
default:
return false
}
}