diff --git a/cmd/reticulum-go/main.go b/cmd/reticulum-go/main.go index 0a49339..eea3335 100644 --- a/cmd/reticulum-go/main.go +++ b/cmd/reticulum-go/main.go @@ -7,6 +7,7 @@ import ( "math/rand" "os" "os/signal" + "sync" "syscall" "time" @@ -22,7 +23,7 @@ import ( ) var ( - debugLevel = flag.Int("debug", 4, "Debug level (0-7)") + debugLevel = flag.Int("debug", 7, "Debug level (0-7)") ) func debugLog(level int, format string, v ...interface{}) { @@ -31,14 +32,37 @@ func debugLog(level int, format string, v ...interface{}) { } } +const ( + ANNOUNCE_RATE_TARGET = 3600 // Default target time between announces (1 hour) + ANNOUNCE_RATE_GRACE = 3 // Number of grace announces before enforcing rate + ANNOUNCE_RATE_PENALTY = 7200 // Additional penalty time for rate violations + MAX_ANNOUNCE_HOPS = 128 // Maximum number of hops for announces + DEBUG_CRITICAL = 1 // Critical errors + DEBUG_ERROR = 2 // Non-critical errors + DEBUG_INFO = 3 // Important information + DEBUG_VERBOSE = 4 // Detailed information + DEBUG_TRACE = 5 // Very detailed tracing + DEBUG_PACKETS = 6 // Packet-level details + DEBUG_ALL = 7 // Everything +) + type Reticulum struct { - config *common.ReticulumConfig - transport *transport.Transport - interfaces []interfaces.Interface - channels map[string]*channel.Channel - buffers map[string]*buffer.Buffer - announceHandlers map[string][]announce.AnnounceHandler - pathRequests map[string]*common.PathRequest + config *common.ReticulumConfig + transport *transport.Transport + interfaces []interfaces.Interface + channels map[string]*channel.Channel + buffers map[string]*buffer.Buffer + announceHandlers map[string][]announce.AnnounceHandler + pathRequests map[string]*common.PathRequest + announceHistory map[string]announceRecord + announceHistoryMu sync.RWMutex +} + +type announceRecord struct { + lastSeen time.Time + seenCount int + violations int + interfaces map[string]bool } func NewReticulum(cfg *common.ReticulumConfig) (*Reticulum, error) { @@ -54,7 +78,7 @@ func NewReticulum(cfg *common.ReticulumConfig) (*Reticulum, error) { t := transport.NewTransport(cfg) debugLog(3, "Transport initialized") - return &Reticulum{ + r := &Reticulum{ config: cfg, transport: t, interfaces: make([]interfaces.Interface, 0), @@ -62,15 +86,63 @@ func NewReticulum(cfg *common.ReticulumConfig) (*Reticulum, error) { buffers: make(map[string]*buffer.Buffer), announceHandlers: make(map[string][]announce.AnnounceHandler), pathRequests: make(map[string]*common.PathRequest), - }, nil + announceHistory: make(map[string]announceRecord), + } + + // Initialize interfaces from config + for name, ifaceConfig := range cfg.Interfaces { + if !ifaceConfig.Enabled { + continue + } + + var iface interfaces.Interface + var err error + + switch ifaceConfig.Type { + case "TCPClientInterface": + iface, err = interfaces.NewTCPClientInterface( + name, + ifaceConfig.TargetHost, + ifaceConfig.TargetPort, + ifaceConfig.Enabled, + true, // IN + true, // OUT + ) + case "UDPInterface": + iface, err = interfaces.NewUDPInterface( + name, + ifaceConfig.Address, + ifaceConfig.TargetHost, + ifaceConfig.Enabled, + ) + case "AutoInterface": + iface, err = interfaces.NewAutoInterface(name, ifaceConfig) + default: + debugLog(1, "Unknown interface type: %s", ifaceConfig.Type) + continue + } + + if err != nil { + if cfg.PanicOnInterfaceErr { + return nil, fmt.Errorf("failed to create interface %s: %v", name, err) + } + debugLog(1, "Error creating interface %s: %v", name, err) + continue + } + + debugLog(2, "Configuring interface %s (type=%s)...", name, ifaceConfig.Type) + r.interfaces = append(r.interfaces, iface) + } + + return r, nil } func (r *Reticulum) handleInterface(iface common.NetworkInterface) { - debugLog(2, "Setting up interface %s", iface.GetName()) + debugLog(DEBUG_INFO, "Setting up interface %s (type=%T)", iface.GetName(), iface) ch := channel.NewChannel(&transportWrapper{r.transport}) r.channels[iface.GetName()] = ch - debugLog(3, "Created channel for interface %s", iface.GetName()) + debugLog(DEBUG_VERBOSE, "Created channel for interface %s with transport wrapper", iface.GetName()) rw := buffer.CreateBidirectionalBuffer( 1, @@ -78,12 +150,16 @@ func (r *Reticulum) handleInterface(iface common.NetworkInterface) { ch, func(size int) { data := make([]byte, size) + debugLog(DEBUG_PACKETS, "Interface %s: Reading %d bytes from buffer", iface.GetName(), size) iface.ProcessIncoming(data) - if len(data) > 0 && data[0] == announce.PACKET_TYPE_ANNOUNCE { - r.handleAnnounce(data, iface) - } else { - r.transport.HandlePacket(data, iface) + if len(data) > 0 { + debugLog(DEBUG_TRACE, "Interface %s: Received packet type 0x%02x", iface.GetName(), data[0]) + if data[0] == announce.PACKET_TYPE_ANNOUNCE { + r.handleAnnounce(data, iface) + } else { + r.transport.HandlePacket(data, iface) + } } debugLog(5, "Processed %d bytes from interface %s", size, iface.GetName()) @@ -93,6 +169,7 @@ func (r *Reticulum) handleInterface(iface common.NetworkInterface) { r.buffers[iface.GetName()] = &buffer.Buffer{ ReadWriter: rw, } + debugLog(DEBUG_VERBOSE, "Created bidirectional buffer for interface %s", iface.GetName()) iface.SetPacketCallback(func(data []byte, ni common.NetworkInterface) { if buf, ok := r.buffers[ni.GetName()]; ok { @@ -112,10 +189,14 @@ func (r *Reticulum) monitorInterfaces() { for range ticker.C { for _, iface := range r.interfaces { if tcpClient, ok := iface.(*interfaces.TCPClientInterface); ok { - debugLog(4, "Interface %s status - Connected: %v, RTT: %v", + debugLog(DEBUG_VERBOSE, "Interface %s status - Connected: %v, RTT: %v, TX: %d bytes (%.2f Kbps), RX: %d bytes (%.2f Kbps)", iface.GetName(), tcpClient.IsConnected(), tcpClient.GetRTT(), + tcpClient.GetTxBytes(), + float64(tcpClient.GetTxBytes()*8)/(5*1024), // Calculate Kbps over 5s interval + tcpClient.GetRxBytes(), + float64(tcpClient.GetRxBytes()*8)/(5*1024), ) } } @@ -159,53 +240,65 @@ func main() { log.Fatalf("Failed to create Reticulum instance: %v", err) } - go r.monitorInterfaces() - - // Register announce handler using the instance's transport - handler := &AnnounceHandler{ - aspectFilter: []string{"*"}, // Handle all aspects - } - r.transport.RegisterAnnounceHandler(handler) - - // Create a destination to announce - dest, err := identity.NewIdentity() + // Create identity and destination + identity, err := identity.NewIdentity() if err != nil { log.Fatalf("Failed to create identity: %v", err) } - // Create announce for the destination + debugLog(2, "Created new identity: %x", identity.Hash()) + + // Create announce announce, err := announce.NewAnnounce( - dest, - []byte("Reticulum-Go"), // App data - nil, // No ratchet ID - false, // Not a path response + identity, + []byte("nomadnetwork.node"), + nil, // No ratchet ID + false, // Not a path response ) if err != nil { log.Fatalf("Failed to create announce: %v", err) } - // Send initial announce immediately + // Start monitoring interfaces + go r.monitorInterfaces() + + // Register announce handler + handler := &AnnounceHandler{ + aspectFilter: []string{"*"}, + } + r.transport.RegisterAnnounceHandler(handler) + + // Start Reticulum + if err := r.Start(); err != nil { + log.Fatalf("Failed to start Reticulum: %v", err) + } + + // Send initial announces after interfaces are ready + time.Sleep(2 * time.Second) // Give interfaces time to connect for _, iface := range r.interfaces { if netIface, ok := iface.(common.NetworkInterface); ok { - debugLog(2, "Sending initial announce on interface %s", netIface.GetName()) - if err := announce.Propagate([]common.NetworkInterface{netIface}); err != nil { - debugLog(1, "Failed to propagate initial announce: %v", err) + if netIface.IsEnabled() && netIface.IsOnline() { + debugLog(2, "Sending initial announce on interface %s", netIface.GetName()) + if err := announce.Propagate([]common.NetworkInterface{netIface}); err != nil { + debugLog(1, "Failed to propagate initial announce: %v", err) + } } } } - // Then start periodic announces + // Start periodic announces go func() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() - for { - select { - case <-ticker.C: - for _, iface := range r.interfaces { - if netIface, ok := iface.(common.NetworkInterface); ok { + for range ticker.C { + debugLog(3, "Starting periodic announce cycle") + for _, iface := range r.interfaces { + if netIface, ok := iface.(common.NetworkInterface); ok { + if netIface.IsEnabled() && netIface.IsOnline() { + debugLog(2, "Sending periodic announce on interface %s", netIface.GetName()) if err := announce.Propagate([]common.NetworkInterface{netIface}); err != nil { - debugLog(1, "Failed to propagate announce: %v", err) + debugLog(1, "Failed to propagate periodic announce: %v", err) } } } @@ -213,10 +306,6 @@ func main() { } }() - if err := r.Start(); err != nil { - log.Fatalf("Failed to start Reticulum: %v", err) - } - sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan @@ -298,75 +387,75 @@ func initializeDirectories() error { func (r *Reticulum) Start() error { debugLog(2, "Starting Reticulum...") + // Create identity for announces + identity, err := identity.NewIdentity() + if err != nil { + return fmt.Errorf("failed to create identity: %v", err) + } + debugLog(2, "Created new identity: %x", identity.Hash()) + + // Create announce + announce, err := announce.NewAnnounce( + identity, + []byte("Reticulum-Go"), + nil, // No ratchet ID + false, // Not a path response + ) + if err != nil { + return fmt.Errorf("failed to create announce: %v", err) + } + + // Start transport if err := r.transport.Start(); err != nil { return fmt.Errorf("failed to start transport: %v", err) } debugLog(3, "Transport started successfully") - for name, ifaceConfig := range r.config.Interfaces { - if !ifaceConfig.Enabled { - debugLog(2, "Skipping disabled interface %s", name) - continue + // Start interfaces + for _, iface := range r.interfaces { + debugLog(2, "Starting interface %s...", iface.GetName()) + if err := iface.Start(); err != nil { + return fmt.Errorf("failed to start interface %s: %v", iface.GetName(), err) } + r.handleInterface(iface) + debugLog(3, "Interface %s started successfully", iface.GetName()) + } - debugLog(2, "Configuring interface %s (type=%s)...", name, ifaceConfig.Type) - var iface interfaces.Interface + // Wait for interfaces to be ready + time.Sleep(2 * time.Second) - switch ifaceConfig.Type { - case "TCPClientInterface": - client, err := interfaces.NewTCPClientInterface( - ifaceConfig.Name, - ifaceConfig.TargetHost, - ifaceConfig.TargetPort, - ifaceConfig.KISSFraming, - ifaceConfig.I2PTunneled, - ifaceConfig.Enabled, - ) - if err != nil { - if r.config.PanicOnInterfaceErr { - return fmt.Errorf("failed to create TCP client interface %s: %v", name, err) + // Send initial announces + for _, iface := range r.interfaces { + if netIface, ok := iface.(common.NetworkInterface); ok { + if netIface.IsEnabled() && netIface.IsOnline() { + debugLog(2, "Sending initial announce on interface %s", netIface.GetName()) + if err := announce.Propagate([]common.NetworkInterface{netIface}); err != nil { + debugLog(1, "Failed to propagate initial announce: %v", err) } - debugLog(1, "Failed to create TCP client interface %s: %v", name, err) - continue } - iface = client - - case "TCPServerInterface": - server, err := interfaces.NewTCPServerInterface( - ifaceConfig.Name, - ifaceConfig.Address, - ifaceConfig.Port, - ifaceConfig.KISSFraming, - ifaceConfig.I2PTunneled, - ifaceConfig.PreferIPv6, - ) - if err != nil { - if r.config.PanicOnInterfaceErr { - return fmt.Errorf("failed to create TCP server interface %s: %v", name, err) - } - debugLog(1, "Failed to create TCP server interface %s: %v", name, err) - continue - } - iface = server - } - - if iface != nil { - debugLog(2, "Starting interface %s...", name) - if err := iface.Start(); err != nil { - if r.config.PanicOnInterfaceErr { - return fmt.Errorf("failed to start interface %s: %v", name, err) - } - debugLog(1, "Failed to start interface %s: %v", name, err) - continue - } - - netIface := iface.(common.NetworkInterface) - r.handleInterface(netIface) - r.interfaces = append(r.interfaces, iface) - debugLog(3, "Interface %s started successfully", name) } } + // Start periodic announces + go func() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + debugLog(3, "Starting periodic announce cycle") + for _, iface := range r.interfaces { + if netIface, ok := iface.(common.NetworkInterface); ok { + if netIface.IsEnabled() && netIface.IsOnline() { + debugLog(2, "Sending periodic announce on interface %s", netIface.GetName()) + if err := announce.Propagate([]common.NetworkInterface{netIface}); err != nil { + debugLog(1, "Failed to propagate periodic announce: %v", err) + } + } + } + } + } + }() + debugLog(2, "Reticulum started successfully") return nil } @@ -408,13 +497,51 @@ func (r *Reticulum) handleAnnounce(data []byte, iface common.NetworkInterface) { debugLog(1, "Error handling announce: %v", err) return } - debugLog(3, "Successfully parsed announce packet") + + // Check announce history + announceKey := fmt.Sprintf("%x", a.Hash()) + r.announceHistoryMu.Lock() + record, exists := r.announceHistory[announceKey] + + if exists { + // Check if this interface has already seen this announce + if record.interfaces[iface.GetName()] { + r.announceHistoryMu.Unlock() + debugLog(4, "Duplicate announce from %s, ignoring", iface.GetName()) + return + } + + // Check rate limiting + timeSinceLastSeen := time.Since(record.lastSeen) + if timeSinceLastSeen < time.Duration(ANNOUNCE_RATE_TARGET)*time.Second { + if record.seenCount > ANNOUNCE_RATE_GRACE { + record.violations++ + waitTime := ANNOUNCE_RATE_TARGET + (record.violations * ANNOUNCE_RATE_PENALTY) + r.announceHistoryMu.Unlock() + debugLog(3, "Rate limit exceeded for announce %s, waiting %d seconds", announceKey, waitTime) + return + } + } + + record.seenCount++ + record.lastSeen = time.Now() + record.interfaces[iface.GetName()] = true + } else { + record = announceRecord{ + lastSeen: time.Now(), + seenCount: 1, + interfaces: make(map[string]bool), + } + record.interfaces[iface.GetName()] = true + r.announceHistory[announceKey] = record + } + r.announceHistoryMu.Unlock() // Add random delay before propagation (0-2 seconds) delay := time.Duration(rand.Float64() * 2 * float64(time.Second)) time.Sleep(delay) - // Check interface modes and propagate according to RNS rules + // Propagate to other interfaces according to RNS rules for _, otherIface := range r.interfaces { if otherIface.GetName() == iface.GetName() { continue @@ -433,10 +560,14 @@ func (r *Reticulum) handleAnnounce(data []byte, iface common.NetworkInterface) { continue } - // Check if interface has bandwidth available for announces + // Check if interface has bandwidth available if netIface, ok := otherIface.(common.NetworkInterface); ok { - if err := a.Propagate([]common.NetworkInterface{netIface}); err != nil { - debugLog(1, "Error propagating announce: %v", err) + if netIface.GetBandwidthAvailable() { + if err := a.Propagate([]common.NetworkInterface{netIface}); err != nil { + debugLog(1, "Error propagating announce: %v", err) + } + } else { + debugLog(3, "Interface %s has insufficient bandwidth for announce", netIface.GetName()) } } } diff --git a/pkg/announce/announce.go b/pkg/announce/announce.go index 8d27082..5bf0ad3 100644 --- a/pkg/announce/announce.go +++ b/pkg/announce/announce.go @@ -66,6 +66,7 @@ type Announce struct { handlers []AnnounceHandler ratchetID []byte packet []byte + hash []byte } func New(dest *identity.Identity, appData []byte, pathResponse bool) (*Announce, error) { @@ -388,5 +389,24 @@ func NewAnnounce(identity *identity.Identity, appData []byte, ratchetID []byte, a.packet = packet + // Generate hash + a.Hash() + return a, nil } + +func (a *Announce) Hash() []byte { + if a.hash == nil { + // Generate hash from announce data + h := sha256.New() + h.Write(a.destinationHash) + h.Write(a.identity.GetPublicKey()) + h.Write([]byte{a.hops}) + h.Write(a.appData) + if a.ratchetID != nil { + h.Write(a.ratchetID) + } + a.hash = h.Sum(nil) + } + return a.hash +} diff --git a/pkg/interfaces/interface.go b/pkg/interfaces/interface.go index 590ba4e..f153816 100644 --- a/pkg/interfaces/interface.go +++ b/pkg/interfaces/interface.go @@ -3,6 +3,7 @@ package interfaces import ( "encoding/binary" "fmt" + "log" "net" "sync" "time" @@ -25,6 +26,17 @@ const ( TYPE_TCP = 0x02 PROPAGATION_RATE = 0.02 // 2% of interface bandwidth + + DEBUG_LEVEL = 4 // Default debug level for interface logging + + // Add more debug levels + DEBUG_CRITICAL = 1 // Critical errors + DEBUG_ERROR = 2 // Non-critical errors + DEBUG_INFO = 3 // Important information + DEBUG_VERBOSE = 4 // Detailed information + DEBUG_TRACE = 5 // Very detailed tracing + DEBUG_PACKETS = 6 // Packet-level details + DEBUG_ALL = 7 // Everything ) type Interface interface { @@ -99,6 +111,10 @@ func (i *BaseInterface) GetPacketCallback() common.PacketCallback { } func (i *BaseInterface) ProcessIncoming(data []byte) { + i.mutex.Lock() + i.RxBytes += uint64(len(data)) + i.mutex.Unlock() + i.mutex.RLock() callback := i.packetCallback i.mutex.RUnlock() @@ -106,15 +122,21 @@ func (i *BaseInterface) ProcessIncoming(data []byte) { if callback != nil { callback(data, i) } - - i.RxBytes += uint64(len(data)) } func (i *BaseInterface) ProcessOutgoing(data []byte) error { if !i.Online || i.Detached { + log.Printf("[DEBUG-1] Interface %s: Cannot process outgoing packet - interface offline or detached", + i.Name) return fmt.Errorf("interface offline or detached") } + + i.mutex.Lock() i.TxBytes += uint64(len(data)) + i.mutex.Unlock() + + log.Printf("[DEBUG-%d] Interface %s: Processed outgoing packet of %d bytes, total TX: %d", + DEBUG_LEVEL, i.Name, len(data), i.TxBytes) return nil } @@ -163,8 +185,13 @@ func (i *BaseInterface) IsEnabled() bool { func (i *BaseInterface) Enable() { i.mutex.Lock() defer i.mutex.Unlock() + + prevState := i.Enabled i.Enabled = true i.Online = true + + log.Printf("[DEBUG-%d] Interface %s: State changed - Enabled: %v->%v, Online: %v->%v", + DEBUG_INFO, i.Name, prevState, i.Enabled, !i.Online, i.Online) } func (i *BaseInterface) Disable() { @@ -172,6 +199,7 @@ func (i *BaseInterface) Disable() { defer i.mutex.Unlock() i.Enabled = false i.Online = false + log.Printf("[DEBUG-2] Interface %s: Disabled and offline", i.Name) } func (i *BaseInterface) GetName() string { @@ -211,7 +239,18 @@ func (i *BaseInterface) Stop() error { } func (i *BaseInterface) Send(data []byte, address string) error { - return i.ProcessOutgoing(data) + log.Printf("[DEBUG-%d] Interface %s: Sending %d bytes to %s", + DEBUG_LEVEL, i.Name, len(data), address) + + err := i.ProcessOutgoing(data) + if err != nil { + log.Printf("[DEBUG-1] Interface %s: Failed to send data: %v", + i.Name, err) + return err + } + + i.updateBandwidthStats(uint64(len(data))) + return nil } func (i *BaseInterface) GetConn() net.Conn { @@ -222,18 +261,26 @@ func (i *BaseInterface) GetBandwidthAvailable() bool { i.mutex.RLock() defer i.mutex.RUnlock() - // If no transmission in last second, bandwidth is available - if time.Since(i.lastTx) > time.Second { + now := time.Now() + timeSinceLastTx := now.Sub(i.lastTx) + + // If no recent transmission, bandwidth is available + if timeSinceLastTx > time.Second { + log.Printf("[DEBUG-%d] Interface %s: Bandwidth available (idle for %.2fs)", + DEBUG_VERBOSE, i.Name, timeSinceLastTx.Seconds()) return true } - // Calculate current bandwidth usage - bytesPerSec := float64(i.TxBytes) / time.Since(i.lastTx).Seconds() - currentUsage := bytesPerSec * 8 // Convert to bits/sec - - // Check if usage is below threshold + // Calculate current usage over the last second + bytesPerSec := float64(i.TxBytes) / timeSinceLastTx.Seconds() + currentUsage := bytesPerSec * 8 // Convert to bits maxUsage := float64(i.Bitrate) * PROPAGATION_RATE - return currentUsage < maxUsage + + available := currentUsage < maxUsage + log.Printf("[DEBUG-%d] Interface %s: Bandwidth stats - Current: %.2f bps, Max: %.2f bps, Usage: %.1f%%, Available: %v", + DEBUG_VERBOSE, i.Name, currentUsage, maxUsage, (currentUsage/maxUsage)*100, available) + + return available } func (i *BaseInterface) updateBandwidthStats(bytes uint64) { @@ -242,4 +289,7 @@ func (i *BaseInterface) updateBandwidthStats(bytes uint64) { i.TxBytes += bytes i.lastTx = time.Now() + + log.Printf("[DEBUG-%d] Interface %s: Updated bandwidth stats - TX bytes: %d, Last TX: %v", + DEBUG_LEVEL, i.Name, i.TxBytes, i.lastTx) } diff --git a/pkg/interfaces/tcp.go b/pkg/interfaces/tcp.go index a04f1a1..4e87cae 100644 --- a/pkg/interfaces/tcp.go +++ b/pkg/interfaces/tcp.go @@ -2,6 +2,7 @@ package interfaces import ( "fmt" + "log" "net" "sync" "syscall" @@ -47,6 +48,11 @@ type TCPClientInterface struct { packetType byte mutex sync.RWMutex enabled bool + TxBytes uint64 + RxBytes uint64 + startTime time.Time + lastTx time.Time + lastRx time.Time } func NewTCPClientInterface(name string, targetHost string, targetPort int, kissFraming bool, i2pTunneled bool, enabled bool) (*TCPClientInterface, error) { @@ -120,6 +126,11 @@ func (tc *TCPClientInterface) readLoop() { return } + // Update RX bytes + tc.mutex.Lock() + tc.RxBytes += uint64(n) + tc.mutex.Unlock() + for i := 0; i < n; i++ { b := buffer[i] @@ -178,8 +189,15 @@ func (tc *TCPClientInterface) handlePacket(data []byte) { tc.mutex.Lock() tc.packetType = data[0] + tc.RxBytes += uint64(len(data)) + lastRx := time.Now() + tc.lastRx = lastRx tc.mutex.Unlock() + log.Printf("[DEBUG-5] Interface %s RX: %d bytes, total: %d, rate: %.2f Kbps", + tc.GetName(), len(data), tc.RxBytes, + float64(tc.RxBytes*8)/(time.Since(tc.startTime).Seconds()*1000)) + payload := data[1:] switch tc.packetType { @@ -219,13 +237,18 @@ func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error { frame = append(frame, HDLC_FLAG) } - if _, err := tc.conn.Write(frame); err != nil { - tc.teardown() - return fmt.Errorf("write failed: %v", err) - } + tc.mutex.Lock() + tc.TxBytes += uint64(len(frame)) + lastTx := time.Now() + tc.lastTx = lastTx + tc.mutex.Unlock() - tc.BaseInterface.ProcessOutgoing(data) - return nil + log.Printf("[DEBUG-5] Interface %s TX: %d bytes, total: %d, rate: %.2f Kbps", + tc.GetName(), len(frame), tc.TxBytes, + float64(tc.TxBytes*8)/(time.Since(tc.startTime).Seconds()*1000)) + + _, err := tc.conn.Write(frame) + return err } func (tc *TCPClientInterface) teardown() { @@ -415,6 +438,42 @@ func (tc *TCPClientInterface) GetRTT() time.Duration { return 0 } +func (tc *TCPClientInterface) GetTxBytes() uint64 { + tc.mutex.RLock() + defer tc.mutex.RUnlock() + return tc.TxBytes +} + +func (tc *TCPClientInterface) GetRxBytes() uint64 { + tc.mutex.RLock() + defer tc.mutex.RUnlock() + return tc.RxBytes +} + +func (tc *TCPClientInterface) UpdateStats(bytes uint64, isRx bool) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + now := time.Now() + if isRx { + tc.RxBytes += bytes + tc.lastRx = now + log.Printf("[DEBUG-5] Interface %s RX stats: bytes=%d total=%d last=%v", + tc.Name, bytes, tc.RxBytes, tc.lastRx) + } else { + tc.TxBytes += bytes + tc.lastTx = now + log.Printf("[DEBUG-5] Interface %s TX stats: bytes=%d total=%d last=%v", + tc.Name, bytes, tc.TxBytes, tc.lastTx) + } +} + +func (tc *TCPClientInterface) GetStats() (tx uint64, rx uint64, lastTx time.Time, lastRx time.Time) { + tc.mutex.RLock() + defer tc.mutex.RUnlock() + return tc.TxBytes, tc.RxBytes, tc.lastTx, tc.lastRx +} + type TCPServerInterface struct { BaseInterface connections map[string]net.Conn @@ -425,6 +484,8 @@ type TCPServerInterface struct { kissFraming bool i2pTunneled bool packetCallback common.PacketCallback + TxBytes uint64 + RxBytes uint64 } func NewTCPServerInterface(name string, bindAddr string, bindPort int, kissFraming bool, i2pTunneled bool, preferIPv6 bool) (*TCPServerInterface, error) { @@ -521,3 +582,15 @@ func (ts *TCPServerInterface) Stop() error { ts.Online = false return nil } + +func (ts *TCPServerInterface) GetTxBytes() uint64 { + ts.mutex.RLock() + defer ts.mutex.RUnlock() + return ts.TxBytes +} + +func (ts *TCPServerInterface) GetRxBytes() uint64 { + ts.mutex.RLock() + defer ts.mutex.RUnlock() + return ts.RxBytes +} diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index d5ee7e0..7ec9170 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -13,6 +13,7 @@ import ( "github.com/Sudo-Ivan/reticulum-go/pkg/announce" "github.com/Sudo-Ivan/reticulum-go/pkg/common" + "github.com/Sudo-Ivan/reticulum-go/pkg/interfaces" "github.com/Sudo-Ivan/reticulum-go/pkg/packet" "github.com/Sudo-Ivan/reticulum-go/pkg/pathfinder" "github.com/Sudo-Ivan/reticulum-go/pkg/rate" @@ -535,61 +536,75 @@ func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) { } packetType := data[0] + log.Printf("[DEBUG-4] Transport handling packet type 0x%02x from interface %s, size: %d bytes", + packetType, iface.GetName(), len(data)) + + // Update interface stats before processing + if tcpIface, ok := iface.(*interfaces.TCPClientInterface); ok { + tcpIface.UpdateStats(uint64(len(data)), true) // true for RX + } + switch packetType { - case 0x01: // Path Request - t.handlePathRequest(data[1:], iface) - case 0x02: // Link Packet - t.handleLinkPacket(data[1:], iface) - case 0x03: // Path Response - t.handlePathResponse(data[1:], iface) - case 0x04: // Announce + case 0x01: // Announce packet t.handleAnnouncePacket(data[1:], iface) + case 0x02: // Link packet + t.handleLinkPacket(data[1:], iface) + case 0x03: // Path response + t.handlePathResponse(data[1:], iface) + case 0x04: // Transport packet + t.handleTransportPacket(data[1:], iface) + default: + log.Printf("[DEBUG-3] Unknown packet type 0x%02x from %s", packetType, iface.GetName()) } } -func (t *Transport) handlePathRequest(data []byte, iface common.NetworkInterface) { - if len(data) < 33 { // 32 bytes hash + 1 byte TTL minimum +func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) { + announceHash := sha256.Sum256(data) + log.Printf("[DEBUG-3] Processing announce %x from interface %s", + announceHash[:8], iface.GetName()) + + if t.seenAnnounces[string(announceHash[:])] { + log.Printf("[DEBUG-4] Ignoring duplicate announce %x", announceHash[:8]) return } - destHash := data[:32] - ttl := data[32] - var tag []byte - recursive := false + // Record this announce + t.seenAnnounces[string(announceHash[:])] = true - if len(data) > 33 { - tag = data[33 : len(data)-1] - recursive = data[len(data)-1] == 0x01 + // Extract announce fields + if len(data) < 53 { // Minimum size for announce packet + return } - // Check if we have a path to the destination - if t.HasPath(destHash) { - // Create and send path response - hops := t.HopsTo(destHash) - nextHop := t.NextHop(destHash) + // Don't forward if max hops reached + if data[0] >= MAX_HOPS { + return + } - response := make([]byte, 0, 64) - response = append(response, 0x03) // Path Response type - response = append(response, destHash...) - response = append(response, byte(hops)) - response = append(response, nextHop...) - if len(tag) > 0 { - response = append(response, tag...) - } + // Add random delay before retransmission (0-2 seconds) + delay := time.Duration(rand.Float64() * 2 * float64(time.Second)) + time.Sleep(delay) - iface.Send(response, "") - } else if recursive && ttl > 0 { - // Forward path request to other interfaces - newData := make([]byte, len(data)) - copy(newData, data) - newData[32] = ttl - 1 // Decrease TTL + // Check bandwidth allocation for announces + if !t.announceRate.Allow() { + return + } - for name, otherIface := range t.interfaces { - if name != iface.GetName() && otherIface.IsEnabled() { - otherIface.Send(newData, "") - } + // Increment hop count and retransmit + data[0]++ + t.broadcastAnnouncePacket(data) +} + +func (t *Transport) broadcastAnnouncePacket(data []byte) error { + t.mutex.RLock() + defer t.mutex.RUnlock() + + for _, iface := range t.interfaces { + if err := iface.Send(data, ""); err != nil { + return fmt.Errorf("failed to broadcast announce: %w", err) } } + return nil } func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface) { @@ -647,58 +662,8 @@ func (t *Transport) handlePathResponse(data []byte, iface common.NetworkInterfac } } -func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) { - if len(data) < 32 { - return - } - - p := &packet.Packet{ - Data: data, - Header: [2]byte{ - 0x04, // Announce packet type - 0x00, // Initial hop count - }, - } - - announceHash := sha256.Sum256(data) - if t.seenAnnounces[string(announceHash[:])] { - return - } - - // Record this announce - t.seenAnnounces[string(announceHash[:])] = true - - // Process the announce - if err := t.handleAnnounce(p); err != nil { - log.Printf("Error handling announce: %v", err) - return - } - - // Broadcast to other interfaces based on interface mode - t.mutex.RLock() - for name, otherIface := range t.interfaces { - // Skip the interface we received from - if name == iface.GetName() { - continue - } - - // Check interface modes for propagation rules - srcMode := iface.GetMode() - dstMode := otherIface.GetMode() - - // Skip propagation based on interface modes - if srcMode == common.IF_MODE_ACCESS_POINT && dstMode != common.IF_MODE_FULL { - continue - } - if srcMode == common.IF_MODE_ROAMING && dstMode == common.IF_MODE_ACCESS_POINT { - continue - } - - if err := otherIface.Send(p.Data, ""); err != nil { - log.Printf("Error broadcasting announce to %s: %v", name, err) - } - } - t.mutex.RUnlock() +func (t *Transport) handleTransportPacket(data []byte, iface common.NetworkInterface) { + // Handle transport packet } func (t *Transport) findLink(dest []byte) *Link { @@ -900,49 +865,3 @@ func (l *Link) GetStatus() int { defer l.mutex.RUnlock() return l.status } - -func (t *Transport) handleAnnounce(p *packet.Packet) error { - // Skip if we've seen this announce before - announceHash := sha256.Sum256(p.Data) - if t.seenAnnounces[string(announceHash[:])] { - return nil - } - - // Record this announce - t.seenAnnounces[string(announceHash[:])] = true - - // Extract announce fields - if len(p.Data) < 53 { // Minimum size for announce packet - return errors.New("invalid announce packet size") - } - - // Don't forward if max hops reached - if p.Header[1] >= MAX_HOPS { - return nil - } - - // Add random delay before retransmission (0-2 seconds) - delay := time.Duration(rand.Float64() * 2 * float64(time.Second)) - time.Sleep(delay) - - // Check bandwidth allocation for announces - if !t.announceRate.Allow() { - return nil - } - - // Increment hop count and retransmit - p.Header[1]++ - return t.broadcastAnnouncePacket(p) -} - -func (t *Transport) broadcastAnnouncePacket(p *packet.Packet) error { - t.mutex.RLock() - defer t.mutex.RUnlock() - - for _, iface := range t.interfaces { - if err := iface.Send(p.Data, ""); err != nil { - return fmt.Errorf("failed to broadcast announce: %w", err) - } - } - return nil -}