diff --git a/cmd/reticulum-go/main.go b/cmd/reticulum-go/main.go index 6e647fa..1122d92 100644 --- a/cmd/reticulum-go/main.go +++ b/cmd/reticulum-go/main.go @@ -56,6 +56,7 @@ type Reticulum struct { pathRequests map[string]*common.PathRequest announceHistory map[string]announceRecord announceHistoryMu sync.RWMutex + identity *identity.Identity } type announceRecord struct { @@ -78,6 +79,12 @@ func NewReticulum(cfg *common.ReticulumConfig) (*Reticulum, error) { t := transport.NewTransport(cfg) debugLog(3, "Transport initialized") + identity, err := identity.NewIdentity() + if err != nil { + return nil, fmt.Errorf("failed to create identity: %v", err) + } + debugLog(2, "Created new identity: %x", identity.Hash()) + r := &Reticulum{ config: cfg, transport: t, @@ -87,6 +94,7 @@ func NewReticulum(cfg *common.ReticulumConfig) (*Reticulum, error) { announceHandlers: make(map[string][]announce.AnnounceHandler), pathRequests: make(map[string]*common.PathRequest), announceHistory: make(map[string]announceRecord), + identity: identity, } // Initialize interfaces from config @@ -240,20 +248,12 @@ func main() { log.Fatalf("Failed to create Reticulum instance: %v", err) } - // Create identity and destination - identity, err := identity.NewIdentity() - if err != nil { - log.Fatalf("Failed to create identity: %v", err) - } - - debugLog(2, "Created new identity: %x", identity.Hash()) - - // Create announce + // Create announce using r.identity announce, err := announce.NewAnnounce( - identity, - []byte("nomadnetwork.node"), - nil, // No ratchet ID - false, // Not a path response + r.identity, + []byte("HELLO WORLD"), + nil, + false, ) if err != nil { log.Fatalf("Failed to create announce: %v", err) @@ -386,19 +386,12 @@ func initializeDirectories() error { func (r *Reticulum) Start() error { debugLog(2, "Starting Reticulum...") - // Create identity for announces - identity, err := identity.NewIdentity() - if err != nil { - return fmt.Errorf("failed to create identity: %v", err) - } - debugLog(2, "Created new identity: %x", identity.Hash()) - - // Create announce + // Create announce using r.identity announce, err := announce.NewAnnounce( - identity, + r.identity, []byte("Reticulum-Go"), - nil, // No ratchet ID - false, // Not a path response + nil, + false, ) if err != nil { return fmt.Errorf("failed to create announce: %v", err) diff --git a/pkg/identity/identity.go b/pkg/identity/identity.go index f8fac50..0dd066b 100644 --- a/pkg/identity/identity.go +++ b/pkg/identity/identity.go @@ -287,26 +287,25 @@ func Remember(packet []byte, destHash []byte, publicKey []byte, appData []byte) func ValidateAnnounce(packet []byte, destHash []byte, publicKey []byte, signature []byte, appData []byte) bool { if len(publicKey) != KEYSIZE/8 { - log.Printf("[DEBUG-7] Invalid public key length: %d", len(publicKey)) return false } - log.Printf("[DEBUG-7] Validating announce for destination hash: %x", destHash) - - announced := &Identity{} - announced.publicKey = publicKey[:KEYSIZE/16] - announced.verificationKey = publicKey[KEYSIZE/16:] + // Split public key into encryption and verification keys + announced := &Identity{ + publicKey: publicKey[:KEYSIZE/16], + verificationKey: publicKey[KEYSIZE/16:], + } + // Verify signature signedData := append(destHash, publicKey...) signedData = append(signedData, appData...) if !announced.Verify(signedData, signature) { - log.Printf("[DEBUG-7] Signature verification failed") return false } + // Store in known destinations Remember(packet, destHash, publicKey, appData) - log.Printf("[DEBUG-7] Announce validated and remembered successfully") return true } @@ -315,14 +314,13 @@ func FromPublicKey(publicKey []byte) *Identity { return nil } - i := &Identity{ + return &Identity{ publicKey: publicKey[:KEYSIZE/16], verificationKey: publicKey[KEYSIZE/16:], ratchets: make(map[string][]byte), ratchetExpiry: make(map[string]int64), + mutex: &sync.RWMutex{}, } - - return i } func (i *Identity) Hex() string { @@ -831,3 +829,16 @@ func (i *Identity) CleanupExpiredRatchets() { log.Printf("[DEBUG-7] Cleaned up %d expired ratchets, %d remaining", cleaned, len(i.ratchets)) } + +// ValidateAnnounce validates an announce packet's signature +func (i *Identity) ValidateAnnounce(data []byte, destHash []byte, appData []byte) bool { + if i == nil || len(data) < ed25519.SignatureSize { + return false + } + + signatureStart := len(data) - ed25519.SignatureSize + signature := data[signatureStart:] + signedData := append(destHash, appData...) + + return ed25519.Verify(i.verificationKey, signedData, signature) +} diff --git a/pkg/interfaces/interface.go b/pkg/interfaces/interface.go index f153816..186bdc7 100644 --- a/pkg/interfaces/interface.go +++ b/pkg/interfaces/interface.go @@ -29,14 +29,14 @@ const ( DEBUG_LEVEL = 4 // Default debug level for interface logging - // Add more debug levels - DEBUG_CRITICAL = 1 // Critical errors - DEBUG_ERROR = 2 // Non-critical errors - DEBUG_INFO = 3 // Important information - DEBUG_VERBOSE = 4 // Detailed information - DEBUG_TRACE = 5 // Very detailed tracing - DEBUG_PACKETS = 6 // Packet-level details - DEBUG_ALL = 7 // Everything + // Debug levels + DEBUG_CRITICAL = 1 + DEBUG_ERROR = 2 + DEBUG_INFO = 3 + DEBUG_VERBOSE = 4 + DEBUG_TRACE = 5 + DEBUG_PACKETS = 6 + DEBUG_ALL = 7 ) type Interface interface { @@ -60,6 +60,7 @@ type Interface interface { Stop() error GetMTU() int GetConn() net.Conn + GetBandwidthAvailable() bool common.NetworkInterface } @@ -126,8 +127,7 @@ func (i *BaseInterface) ProcessIncoming(data []byte) { func (i *BaseInterface) ProcessOutgoing(data []byte) error { if !i.Online || i.Detached { - log.Printf("[DEBUG-1] Interface %s: Cannot process outgoing packet - interface offline or detached", - i.Name) + log.Printf("[DEBUG-1] Interface %s: Cannot process outgoing packet - interface offline or detached", i.Name) return fmt.Errorf("interface offline or detached") } @@ -135,8 +135,7 @@ func (i *BaseInterface) ProcessOutgoing(data []byte) error { i.TxBytes += uint64(len(data)) i.mutex.Unlock() - log.Printf("[DEBUG-%d] Interface %s: Processed outgoing packet of %d bytes, total TX: %d", - DEBUG_LEVEL, i.Name, len(data), i.TxBytes) + log.Printf("[DEBUG-%d] Interface %s: Processed outgoing packet of %d bytes, total TX: %d", DEBUG_LEVEL, i.Name, len(data), i.TxBytes) return nil } @@ -146,7 +145,7 @@ func (i *BaseInterface) SendPathRequest(packet []byte) error { } frame := make([]byte, 0, len(packet)+1) - frame = append(frame, 0x01) // Path request type + frame = append(frame, 0x01) frame = append(frame, packet...) return i.ProcessOutgoing(frame) @@ -158,7 +157,7 @@ func (i *BaseInterface) SendLinkPacket(dest []byte, data []byte, timestamp time. } frame := make([]byte, 0, len(dest)+len(data)+9) - frame = append(frame, 0x02) // Link packet type + frame = append(frame, 0x02) frame = append(frame, dest...) ts := make([]byte, 8) @@ -190,8 +189,7 @@ func (i *BaseInterface) Enable() { i.Enabled = true i.Online = true - log.Printf("[DEBUG-%d] Interface %s: State changed - Enabled: %v->%v, Online: %v->%v", - DEBUG_INFO, i.Name, prevState, i.Enabled, !i.Online, i.Online) + log.Printf("[DEBUG-%d] Interface %s: State changed - Enabled: %v->%v, Online: %v->%v", DEBUG_INFO, i.Name, prevState, i.Enabled, !i.Online, i.Online) } func (i *BaseInterface) Disable() { @@ -239,13 +237,11 @@ func (i *BaseInterface) Stop() error { } func (i *BaseInterface) Send(data []byte, address string) error { - log.Printf("[DEBUG-%d] Interface %s: Sending %d bytes to %s", - DEBUG_LEVEL, i.Name, len(data), address) + log.Printf("[DEBUG-%d] Interface %s: Sending %d bytes to %s", DEBUG_LEVEL, i.Name, len(data), address) err := i.ProcessOutgoing(data) if err != nil { - log.Printf("[DEBUG-1] Interface %s: Failed to send data: %v", - i.Name, err) + log.Printf("[DEBUG-1] Interface %s: Failed to send data: %v", i.Name, err) return err } @@ -264,21 +260,17 @@ func (i *BaseInterface) GetBandwidthAvailable() bool { now := time.Now() timeSinceLastTx := now.Sub(i.lastTx) - // If no recent transmission, bandwidth is available if timeSinceLastTx > time.Second { - log.Printf("[DEBUG-%d] Interface %s: Bandwidth available (idle for %.2fs)", - DEBUG_VERBOSE, i.Name, timeSinceLastTx.Seconds()) + log.Printf("[DEBUG-%d] Interface %s: Bandwidth available (idle for %.2fs)", DEBUG_VERBOSE, i.Name, timeSinceLastTx.Seconds()) return true } - // Calculate current usage over the last second bytesPerSec := float64(i.TxBytes) / timeSinceLastTx.Seconds() - currentUsage := bytesPerSec * 8 // Convert to bits + currentUsage := bytesPerSec * 8 maxUsage := float64(i.Bitrate) * PROPAGATION_RATE available := currentUsage < maxUsage - log.Printf("[DEBUG-%d] Interface %s: Bandwidth stats - Current: %.2f bps, Max: %.2f bps, Usage: %.1f%%, Available: %v", - DEBUG_VERBOSE, i.Name, currentUsage, maxUsage, (currentUsage/maxUsage)*100, available) + log.Printf("[DEBUG-%d] Interface %s: Bandwidth stats - Current: %.2f bps, Max: %.2f bps, Usage: %.1f%%, Available: %v", DEBUG_VERBOSE, i.Name, currentUsage, maxUsage, (currentUsage/maxUsage)*100, available) return available } @@ -290,6 +282,5 @@ func (i *BaseInterface) updateBandwidthStats(bytes uint64) { i.TxBytes += bytes i.lastTx = time.Now() - log.Printf("[DEBUG-%d] Interface %s: Updated bandwidth stats - TX bytes: %d, Last TX: %v", - DEBUG_LEVEL, i.Name, i.TxBytes, i.lastTx) + log.Printf("[DEBUG-%d] Interface %s: Updated bandwidth stats - TX bytes: %d, Last TX: %v", DEBUG_LEVEL, i.Name, i.TxBytes, i.lastTx) } diff --git a/pkg/interfaces/tcp.go b/pkg/interfaces/tcp.go index 24a1d0d..41802a5 100644 --- a/pkg/interfaces/tcp.go +++ b/pkg/interfaces/tcp.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "net" + "runtime" "sync" "syscall" "time" @@ -50,7 +51,6 @@ type TCPClientInterface struct { enabled bool TxBytes uint64 RxBytes uint64 - startTime time.Time lastTx time.Time lastRx time.Time } @@ -103,6 +103,19 @@ func (tc *TCPClientInterface) Start() error { return err } tc.conn = conn + + // Set platform-specific timeouts + switch runtime.GOOS { + case "linux": + if err := tc.setTimeoutsLinux(); err != nil { + log.Printf("[DEBUG-2] Failed to set Linux TCP timeouts: %v", err) + } + case "darwin": + if err := tc.setTimeoutsOSX(); err != nil { + log.Printf("[DEBUG-2] Failed to set OSX TCP timeouts: %v", err) + } + } + tc.Online = true go tc.readLoop() return nil @@ -126,31 +139,31 @@ func (tc *TCPClientInterface) readLoop() { return } - // Update RX bytes - tc.mutex.Lock() - tc.RxBytes += uint64(n) - tc.mutex.Unlock() + // Update RX bytes for raw received data + tc.UpdateStats(uint64(n), true) for i := 0; i < n; i++ { b := buffer[i] if tc.kissFraming { // KISS framing logic - if inFrame && b == KISS_FEND { - inFrame = false - tc.handlePacket(dataBuffer) - dataBuffer = dataBuffer[:0] - } else if b == KISS_FEND { - inFrame = true - } else if inFrame { + if b == KISS_FEND { + if inFrame && len(dataBuffer) > 0 { + tc.handlePacket(dataBuffer) + dataBuffer = dataBuffer[:0] + } + inFrame = !inFrame + continue + } + + if inFrame { if b == KISS_FESC { escape = true } else { if escape { if b == KISS_TFEND { b = KISS_FEND - } - if b == KISS_TFESC { + } else if b == KISS_TFESC { b = KISS_FESC } escape = false @@ -160,13 +173,16 @@ func (tc *TCPClientInterface) readLoop() { } } else { // HDLC framing logic - if inFrame && b == HDLC_FLAG { - inFrame = false - tc.handlePacket(dataBuffer) - dataBuffer = dataBuffer[:0] - } else if b == HDLC_FLAG { - inFrame = true - } else if inFrame { + if b == HDLC_FLAG { + if inFrame && len(dataBuffer) > 0 { + tc.handlePacket(dataBuffer) + dataBuffer = dataBuffer[:0] + } + inFrame = !inFrame + continue + } + + if inFrame { if b == HDLC_ESC { escape = true } else { @@ -241,15 +257,8 @@ func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error { frame = append(frame, HDLC_FLAG) } - tc.mutex.Lock() - tc.TxBytes += uint64(len(frame)) - lastTx := time.Now() - tc.lastTx = lastTx - tc.mutex.Unlock() - - log.Printf("[DEBUG-5] Interface %s TX: %d bytes, total: %d, rate: %.2f Kbps", - tc.GetName(), len(frame), tc.TxBytes, - float64(tc.TxBytes*8)/(time.Since(tc.startTime).Seconds()*1000)) + // Update TX stats before sending + tc.UpdateStats(uint64(len(frame)), false) _, err := tc.conn.Write(frame) return err @@ -478,6 +487,37 @@ func (tc *TCPClientInterface) GetStats() (tx uint64, rx uint64, lastTx time.Time return tc.TxBytes, tc.RxBytes, tc.lastTx, tc.lastRx } +func (tc *TCPClientInterface) setTimeoutsLinux() error { + tcpConn, ok := tc.conn.(*net.TCPConn) + if !ok { + return fmt.Errorf("not a TCP connection") + } + + if !tc.i2pTunneled { + if err := tcpConn.SetKeepAlive(true); err != nil { + return err + } + if err := tcpConn.SetKeepAlivePeriod(time.Duration(TCP_PROBE_INTERVAL) * time.Second); err != nil { + return err + } + } + + return nil +} + +func (tc *TCPClientInterface) setTimeoutsOSX() error { + tcpConn, ok := tc.conn.(*net.TCPConn) + if !ok { + return fmt.Errorf("not a TCP connection") + } + + if err := tcpConn.SetKeepAlive(true); err != nil { + return err + } + + return nil +} + type TCPServerInterface struct { BaseInterface connections map[string]net.Conn @@ -575,7 +615,31 @@ func (ts *TCPServerInterface) Start() error { ts.mutex.Lock() defer ts.mutex.Unlock() + addr := fmt.Sprintf("%s:%d", ts.bindAddr, ts.bindPort) + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("failed to start TCP server: %w", err) + } + ts.Online = true + + // Accept connections in a goroutine + go func() { + for { + conn, err := listener.Accept() + if err != nil { + if !ts.Online { + return // Normal shutdown + } + log.Printf("[DEBUG-2] Error accepting connection: %v", err) + continue + } + + // Handle each connection in a separate goroutine + go ts.handleConnection(conn) + } + }() + return nil } @@ -598,3 +662,62 @@ func (ts *TCPServerInterface) GetRxBytes() uint64 { defer ts.mutex.RUnlock() return ts.RxBytes } + +func (ts *TCPServerInterface) handleConnection(conn net.Conn) { + addr := conn.RemoteAddr().String() + ts.mutex.Lock() + ts.connections[addr] = conn + ts.mutex.Unlock() + + defer func() { + ts.mutex.Lock() + delete(ts.connections, addr) + ts.mutex.Unlock() + conn.Close() + }() + + buffer := make([]byte, ts.MTU) + for { + n, err := conn.Read(buffer) + if err != nil { + return + } + + ts.mutex.Lock() + ts.RxBytes += uint64(n) + ts.mutex.Unlock() + + if ts.packetCallback != nil { + ts.packetCallback(buffer[:n], ts) + } + } +} + +func (ts *TCPServerInterface) ProcessOutgoing(data []byte) error { + ts.mutex.RLock() + defer ts.mutex.RUnlock() + + if !ts.Online { + return fmt.Errorf("interface offline") + } + + var frame []byte + if ts.kissFraming { + frame = append([]byte{KISS_FEND}, escapeKISS(data)...) + frame = append(frame, KISS_FEND) + } else { + frame = append([]byte{HDLC_FLAG}, escapeHDLC(data)...) + frame = append(frame, HDLC_FLAG) + } + + ts.TxBytes += uint64(len(frame)) + + for _, conn := range ts.connections { + if _, err := conn.Write(frame); err != nil { + log.Printf("[DEBUG-4] Error writing to connection %s: %v", + conn.RemoteAddr(), err) + } + } + + return nil +} diff --git a/pkg/link/link.go b/pkg/link/link.go index 0d361ea..a91bf81 100644 --- a/pkg/link/link.go +++ b/pkg/link/link.go @@ -43,6 +43,9 @@ const ( PROVE_NONE = 0x00 PROVE_ALL = 0x01 PROVE_APP = 0x02 + + WATCHDOG_MIN_SLEEP = 0.025 + WATCHDOG_INTERVAL = 0.1 ) type Link struct { @@ -82,6 +85,13 @@ type Link struct { proofStrategy byte proofCallback func(*packet.Packet) bool trackPhyStats bool + + watchdogLock bool + watchdogActive bool + establishmentTimeout time.Duration + keepalive time.Duration + staleTime time.Duration + initiator bool } func NewLink(dest *destination.Destination, transport *transport.Transport, establishedCallback func(*Link), closedCallback func(*Link)) *Link { @@ -97,6 +107,13 @@ func NewLink(dest *destination.Destination, transport *transport.Transport, esta lastDataReceived: time.Time{}, lastDataSent: time.Time{}, pathFinder: pathfinder.NewPathFinder(), + + watchdogLock: false, + watchdogActive: false, + establishmentTimeout: time.Duration(ESTABLISHMENT_TIMEOUT_PER_HOP * float64(time.Second)), + keepalive: time.Duration(KEEPALIVE * float64(time.Second)), + staleTime: time.Duration(STALE_TIME * float64(time.Second)), + initiator: false, } } @@ -803,3 +820,51 @@ func min(a, b int) int { } return b } + +func (l *Link) startWatchdog() { + if l.watchdogActive { + return + } + + l.watchdogActive = true + go l.watchdog() +} + +func (l *Link) watchdog() { + for l.status != STATUS_CLOSED { + l.mutex.Lock() + if l.watchdogLock { + l.mutex.Unlock() + time.Sleep(time.Duration(WATCHDOG_MIN_SLEEP * float64(time.Second))) + continue + } + + var sleepTime float64 = WATCHDOG_INTERVAL + + switch l.status { + case STATUS_ACTIVE: + lastActivity := l.lastInbound + if l.lastOutbound.After(lastActivity) { + lastActivity = l.lastOutbound + } + + if time.Since(lastActivity) > l.keepalive { + if l.initiator { + l.SendPacket([]byte{}) // Keepalive packet + } + + if time.Since(lastActivity) > l.staleTime { + l.status = STATUS_CLOSED + l.teardownReason = STATUS_FAILED + if l.closedCallback != nil { + l.closedCallback(l) + } + } + } + } + + l.mutex.Unlock() + time.Sleep(time.Duration(sleepTime * float64(time.Second))) + } + l.watchdogActive = false +} diff --git a/pkg/packet/packet.go b/pkg/packet/packet.go index f9598e6..38a60cd 100644 --- a/pkg/packet/packet.go +++ b/pkg/packet/packet.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "time" + + "github.com/Sudo-Ivan/reticulum-go/pkg/identity" ) const ( @@ -206,3 +208,29 @@ func (p *Packet) Serialize() ([]byte, error) { return p.Raw, nil } + +func NewAnnouncePacket(destHash []byte, identity *identity.Identity, appData []byte, transportID []byte) (*Packet, error) { + // Create combined public key + pubKey := identity.GetPublicKey() + + // Create signed data + signedData := append(destHash, pubKey...) + signedData = append(signedData, appData...) + + // Sign the data + signature := identity.Sign(signedData) + + // Combine all data + data := append(pubKey, appData...) + data = append(data, signature...) + + p := &Packet{ + HeaderType: HeaderType2, + PacketType: PacketTypeAnnounce, + TransportID: transportID, + DestinationHash: destHash, + Data: data, + } + + return p, nil +} diff --git a/pkg/rate/rate.go b/pkg/rate/rate.go index 169a40e..fb8fc5c 100644 --- a/pkg/rate/rate.go +++ b/pkg/rate/rate.go @@ -5,6 +5,18 @@ import ( "time" ) +const ( + DefaultAnnounceRateTarget = 3600.0 // Default 1 hour between announces + DefaultAnnounceRateGrace = 3 // Default number of grace announces + DefaultAnnounceRatePenalty = 7200.0 // Default 2 hour penalty + DefaultBurstFreqNew = 3.5 // Default announces/sec for new interfaces + DefaultBurstFreq = 12.0 // Default announces/sec for established interfaces + DefaultBurstHold = 60 // Default seconds to hold after burst + DefaultBurstPenalty = 300 // Default seconds penalty after burst + DefaultMaxHeldAnnounces = 256 // Default max announces in hold queue + DefaultHeldReleaseInterval = 30 // Default seconds between releasing held announces +) + type Limiter struct { rate float64 interval time.Duration @@ -42,3 +54,140 @@ func (l *Limiter) Allow() bool { l.allowance -= 1.0 return true } + +// AnnounceRateControl handles per-destination announce rate limiting +type AnnounceRateControl struct { + rateTarget float64 + rateGrace int + ratePenalty float64 + + announceHistory map[string][]time.Time // Maps dest hash to announce times + mutex sync.RWMutex +} + +func NewAnnounceRateControl(target float64, grace int, penalty float64) *AnnounceRateControl { + return &AnnounceRateControl{ + rateTarget: target, + rateGrace: grace, + ratePenalty: penalty, + announceHistory: make(map[string][]time.Time), + } +} + +func (arc *AnnounceRateControl) AllowAnnounce(destHash string) bool { + arc.mutex.Lock() + defer arc.mutex.Unlock() + + history := arc.announceHistory[destHash] + now := time.Now() + + // Cleanup old history entries + cutoff := now.Add(-24 * time.Hour) + newHistory := []time.Time{} + for _, t := range history { + if t.After(cutoff) { + newHistory = append(newHistory, t) + } + } + history = newHistory + + // Allow if within grace period + if len(history) < arc.rateGrace { + arc.announceHistory[destHash] = append(history, now) + return true + } + + // Check rate + lastAnnounce := history[len(history)-1] + waitTime := arc.rateTarget + if len(history) > arc.rateGrace { + waitTime += arc.ratePenalty + } + + if now.Sub(lastAnnounce).Seconds() < waitTime { + return false + } + + arc.announceHistory[destHash] = append(history, now) + return true +} + +// IngressControl handles new destination announce rate limiting +type IngressControl struct { + enabled bool + burstFreqNew float64 + burstFreq float64 + burstHold time.Duration + burstPenalty time.Duration + maxHeldAnnounces int + heldReleaseInterval time.Duration + + heldAnnounces map[string][]byte // Maps announce hash to announce data + lastBurst time.Time + announceCount int + mutex sync.RWMutex +} + +func NewIngressControl(enabled bool) *IngressControl { + return &IngressControl{ + enabled: enabled, + burstFreqNew: DefaultBurstFreqNew, + burstFreq: DefaultBurstFreq, + burstHold: time.Duration(DefaultBurstHold) * time.Second, + burstPenalty: time.Duration(DefaultBurstPenalty) * time.Second, + maxHeldAnnounces: DefaultMaxHeldAnnounces, + heldReleaseInterval: time.Duration(DefaultHeldReleaseInterval) * time.Second, + heldAnnounces: make(map[string][]byte), + lastBurst: time.Now(), + } +} + +func (ic *IngressControl) ProcessAnnounce(announceHash string, announceData []byte, isNewDest bool) bool { + if !ic.enabled { + return true + } + + ic.mutex.Lock() + defer ic.mutex.Unlock() + + now := time.Now() + elapsed := now.Sub(ic.lastBurst) + + // Reset counter if enough time has passed + if elapsed > ic.burstHold+ic.burstPenalty { + ic.announceCount = 0 + ic.lastBurst = now + } + + // Check burst frequency + maxFreq := ic.burstFreq + if isNewDest { + maxFreq = ic.burstFreqNew + } + + ic.announceCount++ + burstFreq := float64(ic.announceCount) / elapsed.Seconds() + + // Hold announce if burst frequency exceeded + if burstFreq > maxFreq { + if len(ic.heldAnnounces) < ic.maxHeldAnnounces { + ic.heldAnnounces[announceHash] = announceData + } + return false + } + + return true +} + +func (ic *IngressControl) ReleaseHeldAnnounce() (string, []byte, bool) { + ic.mutex.Lock() + defer ic.mutex.Unlock() + + // Return first held announce if any exist + for hash, data := range ic.heldAnnounces { + delete(ic.heldAnnounces, hash) + return hash, data, true + } + + return "", nil, false +} diff --git a/pkg/transport/announce.go b/pkg/transport/announce.go new file mode 100644 index 0000000..d552902 --- /dev/null +++ b/pkg/transport/announce.go @@ -0,0 +1,170 @@ +package transport + +import ( + "crypto/sha256" + "encoding/hex" + "sort" + "sync" + "time" + + "github.com/Sudo-Ivan/reticulum-go/pkg/rate" +) + +const ( + MaxRetries = 3 + RetryInterval = 5 * time.Second + MaxQueueSize = 1000 + MinPriorityDelta = 0.1 + DefaultPropagationRate = 0.02 // 2% of bandwidth for announces +) + +type AnnounceEntry struct { + Data []byte + HopCount int + RetryCount int + LastRetry time.Time + SourceIface string + Priority float64 + Hash string +} + +type AnnounceManager struct { + announces map[string]*AnnounceEntry + announceQueue map[string][]*AnnounceEntry + rateLimiter *rate.Limiter + mutex sync.RWMutex +} + +func NewAnnounceManager() *AnnounceManager { + return &AnnounceManager{ + announces: make(map[string]*AnnounceEntry), + announceQueue: make(map[string][]*AnnounceEntry), + rateLimiter: rate.NewLimiter(DefaultPropagationRate, 1), + mutex: sync.RWMutex{}, + } +} + +func (am *AnnounceManager) ProcessAnnounce(data []byte, sourceIface string) error { + hash := sha256.Sum256(data) + hashStr := hex.EncodeToString(hash[:]) + + am.mutex.Lock() + defer am.mutex.Unlock() + + if entry, exists := am.announces[hashStr]; exists { + if entry.HopCount <= int(data[0]) { + return nil + } + entry.HopCount = int(data[0]) + entry.Data = data + entry.RetryCount = 0 + entry.LastRetry = time.Now() + entry.Priority = calculatePriority(data[0], 0) + return nil + } + + entry := &AnnounceEntry{ + Data: data, + HopCount: int(data[0]), + RetryCount: 0, + LastRetry: time.Now(), + SourceIface: sourceIface, + Priority: calculatePriority(data[0], 0), + Hash: hashStr, + } + + am.announces[hashStr] = entry + + for iface := range am.announceQueue { + if iface != sourceIface { + am.queueAnnounce(entry, iface) + } + } + + return nil +} + +func (am *AnnounceManager) queueAnnounce(entry *AnnounceEntry, iface string) { + queue := am.announceQueue[iface] + + if len(queue) >= MaxQueueSize { + // Remove lowest priority announce if queue is full + queue = queue[:len(queue)-1] + } + + insertIdx := sort.Search(len(queue), func(i int) bool { + return queue[i].Priority < entry.Priority + }) + + queue = append(queue[:insertIdx], append([]*AnnounceEntry{entry}, queue[insertIdx:]...)...) + am.announceQueue[iface] = queue +} + +func (am *AnnounceManager) GetNextAnnounce(iface string) *AnnounceEntry { + am.mutex.Lock() + defer am.mutex.Unlock() + + queue := am.announceQueue[iface] + if len(queue) == 0 { + return nil + } + + entry := queue[0] + now := time.Now() + + if entry.RetryCount >= MaxRetries { + am.announceQueue[iface] = queue[1:] + delete(am.announces, entry.Hash) + return am.GetNextAnnounce(iface) + } + + if now.Sub(entry.LastRetry) < RetryInterval { + return nil + } + + if !am.rateLimiter.Allow() { + return nil + } + + entry.RetryCount++ + entry.LastRetry = now + entry.Priority = calculatePriority(byte(entry.HopCount), entry.RetryCount) + + am.announceQueue[iface] = queue[1:] + am.queueAnnounce(entry, iface) + + return entry +} + +func calculatePriority(hopCount byte, retryCount int) float64 { + basePriority := 1.0 / float64(hopCount) + retryPenalty := float64(retryCount) * MinPriorityDelta + return basePriority - retryPenalty +} + +func (am *AnnounceManager) CleanupExpired() { + am.mutex.Lock() + defer am.mutex.Unlock() + + now := time.Now() + expiredHashes := make([]string, 0) + + for hash, entry := range am.announces { + if entry.RetryCount >= MaxRetries || now.Sub(entry.LastRetry) > RetryInterval*MaxRetries { + expiredHashes = append(expiredHashes, hash) + } + } + + for _, hash := range expiredHashes { + delete(am.announces, hash) + for iface, queue := range am.announceQueue { + newQueue := make([]*AnnounceEntry, 0, len(queue)) + for _, entry := range queue { + if entry.Hash != hash { + newQueue = append(newQueue, entry) + } + } + am.announceQueue[iface] = newQueue + } + } +} diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index d72a5e3..1506d50 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -13,6 +13,7 @@ import ( "github.com/Sudo-Ivan/reticulum-go/pkg/announce" "github.com/Sudo-Ivan/reticulum-go/pkg/common" + "github.com/Sudo-Ivan/reticulum-go/pkg/identity" "github.com/Sudo-Ivan/reticulum-go/pkg/interfaces" "github.com/Sudo-Ivan/reticulum-go/pkg/packet" "github.com/Sudo-Ivan/reticulum-go/pkg/pathfinder" @@ -616,65 +617,82 @@ func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) { } } -func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) { - // Validate minimum packet size (1 byte hop count + 32 bytes dest + 16 bytes identity + 4 bytes min app data) - if len(data) < 53 { - log.Printf("[DEBUG-3] Announce packet too small: %d bytes", len(data)) - return +func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) error { + // Validate minimum packet size (1 byte hop count + 32 bytes dest hash + 16 bytes min identity + 1 byte min app data) + if len(data) < 50 { + return fmt.Errorf("announce packet too small: %d bytes", len(data)) } - announceHash := sha256.Sum256(data) - log.Printf("[DEBUG-3] Processing announce %x from interface %s", - announceHash[:8], iface.GetName()) - - t.mutex.Lock() - if _, seen := t.seenAnnounces[string(announceHash[:])]; seen { - t.mutex.Unlock() - log.Printf("[DEBUG-4] Ignoring duplicate announce %x", announceHash[:8]) - return - } - t.seenAnnounces[string(announceHash[:])] = true - t.mutex.Unlock() - - // Don't forward if max hops reached - if data[0] >= MAX_HOPS { - log.Printf("[DEBUG-3] Announce exceeded max hops: %d", data[0]) - return - } - - // Parse announce fields + // Extract fields hopCount := data[0] destHash := data[1:33] - identity := data[33:49] + identityBytes := data[33:49] appData := data[49:] + // Check for duplicate announces + announceHash := sha256.Sum256(data) + hashStr := string(announceHash[:]) + + t.mutex.Lock() + if _, seen := t.seenAnnounces[hashStr]; seen { + t.mutex.Unlock() + log.Printf("[DEBUG-7] Ignoring duplicate announce %x", announceHash[:8]) + return nil + } + t.seenAnnounces[hashStr] = true + t.mutex.Unlock() + + // Validate announce signature and store destination + id := identity.FromPublicKey(identityBytes) + if id == nil || !id.ValidateAnnounce(data, destHash, appData) { + return fmt.Errorf("invalid announce signature") + } + + // Don't forward if max hops reached + if hopCount >= MAX_HOPS { + log.Printf("[DEBUG-7] Announce exceeded max hops: %d", hopCount) + return nil + } + // Add random delay before retransmission (0-2 seconds) delay := time.Duration(rand.Float64() * 2 * float64(time.Second)) time.Sleep(delay) // Check bandwidth allocation for announces if !t.announceRate.Allow() { - log.Printf("[DEBUG-3] Announce rate limit exceeded, dropping") - return + log.Printf("[DEBUG-7] Announce rate limit exceeded, dropping") + return nil } // Increment hop count for forwarding - data[0] = hopCount + 1 + forwardData := make([]byte, len(data)) + copy(forwardData, data) + forwardData[0] = hopCount + 1 // Forward to other interfaces + var lastErr error for name, outIface := range t.interfaces { if outIface == iface || !outIface.IsEnabled() { continue } + // Check interface mode restrictions + // if outIface.GetMode() == interfaces.ModeAccessPoint { + // log.Printf("[DEBUG-7] Blocking announce broadcast on %s due to AP mode", name) + // continue + // } + log.Printf("[DEBUG-7] Forwarding announce on interface %s", name) - if err := outIface.Send(data, ""); err != nil { + if err := outIface.Send(forwardData, ""); err != nil { log.Printf("[DEBUG-3] Failed to forward announce on %s: %v", name, err) + lastErr = err } } // Notify announce handlers - t.notifyAnnounceHandlers(destHash, identity, appData) + t.notifyAnnounceHandlers(destHash, identityBytes, appData) + + return lastErr } func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface) {