From ae40d2879c02d8b94ef2b8aaa3b04bb5b6d6d651 Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Wed, 1 Jan 2025 00:58:37 -0600 Subject: [PATCH] update and format --- cmd/reticulum-go/main.go | 27 ++++++++++++++---- pkg/announce/announce.go | 48 +++++++++++++++---------------- pkg/buffer/buffer.go | 2 +- pkg/channel/channel.go | 10 +++---- pkg/common/interfaces.go | 39 ++++++++++++++++++++------ pkg/interfaces/interface.go | 46 ++++++++++++++++++++++++------ pkg/packet/packet.go | 16 +++++------ pkg/rate/rate.go | 56 ++++++++++++++++++------------------- 8 files changed, 154 insertions(+), 90 deletions(-) diff --git a/cmd/reticulum-go/main.go b/cmd/reticulum-go/main.go index 695cce9..0a49339 100644 --- a/cmd/reticulum-go/main.go +++ b/cmd/reticulum-go/main.go @@ -15,10 +15,10 @@ import ( "github.com/Sudo-Ivan/reticulum-go/pkg/buffer" "github.com/Sudo-Ivan/reticulum-go/pkg/channel" "github.com/Sudo-Ivan/reticulum-go/pkg/common" + "github.com/Sudo-Ivan/reticulum-go/pkg/identity" "github.com/Sudo-Ivan/reticulum-go/pkg/interfaces" "github.com/Sudo-Ivan/reticulum-go/pkg/packet" "github.com/Sudo-Ivan/reticulum-go/pkg/transport" - "github.com/Sudo-Ivan/reticulum-go/pkg/identity" ) var ( @@ -177,14 +177,24 @@ func main() { announce, err := announce.NewAnnounce( dest, []byte("Reticulum-Go"), // App data - nil, // No ratchet ID - false, // Not a path response + nil, // No ratchet ID + false, // Not a path response ) if err != nil { log.Fatalf("Failed to create announce: %v", err) } - // Propagate announce to all interfaces periodically + // Send initial announce immediately + for _, iface := range r.interfaces { + if netIface, ok := iface.(common.NetworkInterface); ok { + debugLog(2, "Sending initial announce on interface %s", netIface.GetName()) + if err := announce.Propagate([]common.NetworkInterface{netIface}); err != nil { + debugLog(1, "Failed to propagate initial announce: %v", err) + } + } + } + + // Then start periodic announces go func() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() @@ -391,11 +401,14 @@ func (r *Reticulum) Stop() error { } func (r *Reticulum) handleAnnounce(data []byte, iface common.NetworkInterface) { + debugLog(2, "Received announce packet on interface %s (%d bytes)", iface.GetName(), len(data)) + a := &announce.Announce{} if err := a.HandleAnnounce(data); err != nil { debugLog(1, "Error handling announce: %v", err) return } + debugLog(3, "Successfully parsed announce packet") // Add random delay before propagation (0-2 seconds) delay := time.Duration(rand.Float64() * 2 * float64(time.Second)) @@ -421,8 +434,10 @@ func (r *Reticulum) handleAnnounce(data []byte, iface common.NetworkInterface) { } // Check if interface has bandwidth available for announces - if err := a.Propagate([]common.NetworkInterface{otherIface}); err != nil { - debugLog(1, "Error propagating announce: %v", err) + if netIface, ok := otherIface.(common.NetworkInterface); ok { + if err := a.Propagate([]common.NetworkInterface{netIface}); err != nil { + debugLog(1, "Error propagating announce: %v", err) + } } } } diff --git a/pkg/announce/announce.go b/pkg/announce/announce.go index 6a8a7fb..8d27082 100644 --- a/pkg/announce/announce.go +++ b/pkg/announce/announce.go @@ -5,7 +5,7 @@ import ( "crypto/sha256" "encoding/binary" "errors" - "log" + "fmt" "sync" "time" @@ -74,14 +74,14 @@ func New(dest *identity.Identity, appData []byte, pathResponse bool) (*Announce, } a := &Announce{ - mutex: &sync.RWMutex{}, - identity: dest, - appData: appData, - hops: 0, - timestamp: time.Now().Unix(), - pathResponse: pathResponse, - retries: 0, - handlers: make([]AnnounceHandler, 0), + mutex: &sync.RWMutex{}, + identity: dest, + appData: appData, + hops: 0, + timestamp: time.Now().Unix(), + pathResponse: pathResponse, + retries: 0, + handlers: make([]AnnounceHandler, 0), } // Generate truncated hash from public key @@ -106,27 +106,25 @@ func New(dest *identity.Identity, appData []byte, pathResponse bool) (*Announce, } func (a *Announce) Propagate(interfaces []common.NetworkInterface) error { - packet := a.CreatePacket() + a.mutex.RLock() + defer a.mutex.RUnlock() - // Enhanced logging - log.Printf("Creating announce packet:") - log.Printf(" Destination Hash: %x", a.destinationHash) - log.Printf(" Identity Public Key: %x", a.identity.GetPublicKey()) - log.Printf(" App Data: %s", string(a.appData)) - log.Printf(" Signature: %x", a.signature) - log.Printf(" Total Packet Size: %d bytes", len(packet)) - log.Printf(" Raw Packet: %x", packet) + // Use cached packet if available, otherwise create new one + var packet []byte + if a.packet != nil { + packet = a.packet + } else { + packet = a.CreatePacket() + a.packet = packet + } - // Propagate to interfaces for _, iface := range interfaces { - log.Printf("Propagating on interface %s:", iface.GetName()) - log.Printf(" Interface Type: %d", iface.GetType()) - log.Printf(" MTU: %d bytes", iface.GetMTU()) + if !iface.IsEnabled() || !iface.GetBandwidthAvailable() { + continue + } if err := iface.Send(packet, ""); err != nil { - log.Printf(" Failed to propagate: %v", err) - } else { - log.Printf(" Successfully propagated") + return fmt.Errorf("failed to propagate on interface %s: %w", iface.GetName(), err) } } diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index 86ccbc2..cfe9bb4 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -74,7 +74,7 @@ func NewRawChannelReader(streamID int, ch *channel.Channel) *RawChannelReader { buffer: bytes.NewBuffer(nil), callbacks: make([]func(int), 0), } - + ch.AddMessageHandler(reader.HandleMessage) return reader } diff --git a/pkg/channel/channel.go b/pkg/channel/channel.go index 8c1a1a1..c83924e 100644 --- a/pkg/channel/channel.go +++ b/pkg/channel/channel.go @@ -84,11 +84,11 @@ func NewChannel(link transport.LinkInterface) *Channel { return &Channel{ link: link, messageHandlers: make([]func(MessageBase) bool, 0), - mutex: sync.RWMutex{}, - windowMax: WindowMaxSlow, - windowMin: WindowMinSlow, - window: WindowInitial, - maxTries: 3, + mutex: sync.RWMutex{}, + windowMax: WindowMaxSlow, + windowMin: WindowMinSlow, + window: WindowInitial, + maxTries: 3, } } diff --git a/pkg/common/interfaces.go b/pkg/common/interfaces.go index 22a3d9f..8c2f519 100644 --- a/pkg/common/interfaces.go +++ b/pkg/common/interfaces.go @@ -1,10 +1,10 @@ package common import ( + "encoding/binary" "net" "sync" "time" - "encoding/binary" ) // NetworkInterface defines the interface for all network communication methods @@ -15,20 +15,21 @@ type NetworkInterface interface { Enable() Disable() Detach() - + // Network operations Send(data []byte, address string) error GetConn() net.Conn GetMTU() int GetName() string - + // Interface properties GetType() InterfaceType GetMode() InterfaceMode IsEnabled() bool IsOnline() bool IsDetached() bool - + GetBandwidthAvailable() bool + // Packet handling ProcessIncoming([]byte) ProcessOutgoing([]byte) error @@ -55,6 +56,7 @@ type BaseInterface struct { TxBytes uint64 RxBytes uint64 + lastTx time.Time Mutex sync.RWMutex Owner interface{} @@ -70,6 +72,7 @@ func NewBaseInterface(name string, ifaceType InterfaceType, enabled bool) BaseIn Enabled: enabled, MTU: DEFAULT_MTU, Bitrate: BITRATE_MINIMUM, + lastTx: time.Now(), } } @@ -175,16 +178,34 @@ func (i *BaseInterface) SendPathRequest(data []byte) error { func (i *BaseInterface) SendLinkPacket(dest []byte, data []byte, timestamp time.Time) error { // Create link packet packet := make([]byte, 0, len(dest)+len(data)+9) // 1 byte type + dest + 8 byte timestamp - packet = append(packet, 0x02) // Link packet type + packet = append(packet, 0x02) // Link packet type packet = append(packet, dest...) - + // Add timestamp ts := make([]byte, 8) binary.BigEndian.PutUint64(ts, uint64(timestamp.Unix())) packet = append(packet, ts...) - + // Add data packet = append(packet, data...) - + return i.Send(packet, "") -} \ No newline at end of file +} + +func (i *BaseInterface) GetBandwidthAvailable() bool { + i.Mutex.RLock() + defer i.Mutex.RUnlock() + + // If no transmission in last second, bandwidth is available + if time.Since(i.lastTx) > time.Second { + return true + } + + // Calculate current bandwidth usage + bytesPerSec := float64(i.TxBytes) / time.Since(i.lastTx).Seconds() + currentUsage := bytesPerSec * 8 // Convert to bits/sec + + // Check if usage is below threshold (2% of total bitrate) + maxUsage := float64(i.Bitrate) * 0.02 // 2% propagation rate + return currentUsage < maxUsage +} diff --git a/pkg/interfaces/interface.go b/pkg/interfaces/interface.go index f58193d..590ba4e 100644 --- a/pkg/interfaces/interface.go +++ b/pkg/interfaces/interface.go @@ -11,25 +11,27 @@ import ( ) const ( - BITRATE_MINIMUM = 5 // Minimum required bitrate in bits/sec + BITRATE_MINIMUM = 1200 // Minimum bitrate in bits/second MODE_FULL = 0x01 - + // Interface modes MODE_GATEWAY = 0x02 MODE_ACCESS_POINT = 0x03 - MODE_ROAMING = 0x04 - MODE_BOUNDARY = 0x05 + MODE_ROAMING = 0x04 + MODE_BOUNDARY = 0x05 // Interface types TYPE_UDP = 0x01 TYPE_TCP = 0x02 + + PROPAGATION_RATE = 0.02 // 2% of interface bandwidth ) type Interface interface { GetName() string GetType() common.InterfaceType GetMode() common.InterfaceMode - IsOnline() bool + IsOnline() bool IsDetached() bool IsEnabled() bool Detach() @@ -46,6 +48,7 @@ type Interface interface { Stop() error GetMTU() int GetConn() net.Conn + common.NetworkInterface } type BaseInterface struct { @@ -61,7 +64,8 @@ type BaseInterface struct { Bitrate int64 TxBytes uint64 RxBytes uint64 - + lastTx time.Time + mutex sync.RWMutex packetCallback common.PacketCallback } @@ -78,6 +82,7 @@ func NewBaseInterface(name string, ifType common.InterfaceType, enabled bool) Ba OUT: false, MTU: common.DEFAULT_MTU, Bitrate: BITRATE_MINIMUM, + lastTx: time.Now(), } } @@ -150,7 +155,7 @@ func (i *BaseInterface) Detach() { } func (i *BaseInterface) IsEnabled() bool { - i.mutex.RLock() + i.mutex.RLock() defer i.mutex.RUnlock() return i.Enabled && i.Online && !i.Detached } @@ -197,7 +202,6 @@ func (i *BaseInterface) IsDetached() bool { return i.Detached } -// Default implementations that should be overridden by specific interfaces func (i *BaseInterface) Start() error { return nil } @@ -213,3 +217,29 @@ func (i *BaseInterface) Send(data []byte, address string) error { func (i *BaseInterface) GetConn() net.Conn { return nil } + +func (i *BaseInterface) GetBandwidthAvailable() bool { + i.mutex.RLock() + defer i.mutex.RUnlock() + + // If no transmission in last second, bandwidth is available + if time.Since(i.lastTx) > time.Second { + return true + } + + // Calculate current bandwidth usage + bytesPerSec := float64(i.TxBytes) / time.Since(i.lastTx).Seconds() + currentUsage := bytesPerSec * 8 // Convert to bits/sec + + // Check if usage is below threshold + maxUsage := float64(i.Bitrate) * PROPAGATION_RATE + return currentUsage < maxUsage +} + +func (i *BaseInterface) updateBandwidthStats(bytes uint64) { + i.mutex.Lock() + defer i.mutex.Unlock() + + i.TxBytes += bytes + i.lastTx = time.Now() +} diff --git a/pkg/packet/packet.go b/pkg/packet/packet.go index d43af17..1eb561d 100644 --- a/pkg/packet/packet.go +++ b/pkg/packet/packet.go @@ -105,12 +105,12 @@ func (p *Packet) Serialize() ([]byte, error) { } type AnnouncePacket struct { - Header [2]byte - DestHash []byte - PublicKey []byte - AppData []byte - RandomBlob []byte - Signature []byte - HopCount byte - Timestamp time.Time + Header [2]byte + DestHash []byte + PublicKey []byte + AppData []byte + RandomBlob []byte + Signature []byte + HopCount byte + Timestamp time.Time } diff --git a/pkg/rate/rate.go b/pkg/rate/rate.go index 0a75bea..169a40e 100644 --- a/pkg/rate/rate.go +++ b/pkg/rate/rate.go @@ -1,44 +1,44 @@ package rate import ( - "sync" - "time" + "sync" + "time" ) type Limiter struct { - rate float64 - interval time.Duration - lastUpdate time.Time - allowance float64 - mutex sync.Mutex + rate float64 + interval time.Duration + lastUpdate time.Time + allowance float64 + mutex sync.Mutex } func NewLimiter(rate float64, interval time.Duration) *Limiter { - return &Limiter{ - rate: rate, - interval: interval, - lastUpdate: time.Now(), - allowance: rate, - } + return &Limiter{ + rate: rate, + interval: interval, + lastUpdate: time.Now(), + allowance: rate, + } } func (l *Limiter) Allow() bool { - l.mutex.Lock() - defer l.mutex.Unlock() + l.mutex.Lock() + defer l.mutex.Unlock() - now := time.Now() - elapsed := now.Sub(l.lastUpdate) - l.lastUpdate = now + now := time.Now() + elapsed := now.Sub(l.lastUpdate) + l.lastUpdate = now - l.allowance += elapsed.Seconds() * l.rate - if l.allowance > l.rate { - l.allowance = l.rate - } + l.allowance += elapsed.Seconds() * l.rate + if l.allowance > l.rate { + l.allowance = l.rate + } - if l.allowance < 1.0 { - return false - } + if l.allowance < 1.0 { + return false + } - l.allowance -= 1.0 - return true -} \ No newline at end of file + l.allowance -= 1.0 + return true +}