This commit is contained in:
Sudo-Ivan
2025-01-01 17:00:11 -06:00
parent 6cdc02346f
commit 0862830431
9 changed files with 673 additions and 125 deletions

View File

@@ -56,6 +56,7 @@ type Reticulum struct {
pathRequests map[string]*common.PathRequest
announceHistory map[string]announceRecord
announceHistoryMu sync.RWMutex
identity *identity.Identity
}
type announceRecord struct {
@@ -78,6 +79,12 @@ func NewReticulum(cfg *common.ReticulumConfig) (*Reticulum, error) {
t := transport.NewTransport(cfg)
debugLog(3, "Transport initialized")
identity, err := identity.NewIdentity()
if err != nil {
return nil, fmt.Errorf("failed to create identity: %v", err)
}
debugLog(2, "Created new identity: %x", identity.Hash())
r := &Reticulum{
config: cfg,
transport: t,
@@ -87,6 +94,7 @@ func NewReticulum(cfg *common.ReticulumConfig) (*Reticulum, error) {
announceHandlers: make(map[string][]announce.AnnounceHandler),
pathRequests: make(map[string]*common.PathRequest),
announceHistory: make(map[string]announceRecord),
identity: identity,
}
// Initialize interfaces from config
@@ -240,20 +248,12 @@ func main() {
log.Fatalf("Failed to create Reticulum instance: %v", err)
}
// Create identity and destination
identity, err := identity.NewIdentity()
if err != nil {
log.Fatalf("Failed to create identity: %v", err)
}
debugLog(2, "Created new identity: %x", identity.Hash())
// Create announce
// Create announce using r.identity
announce, err := announce.NewAnnounce(
identity,
[]byte("nomadnetwork.node"),
nil, // No ratchet ID
false, // Not a path response
r.identity,
[]byte("HELLO WORLD"),
nil,
false,
)
if err != nil {
log.Fatalf("Failed to create announce: %v", err)
@@ -386,19 +386,12 @@ func initializeDirectories() error {
func (r *Reticulum) Start() error {
debugLog(2, "Starting Reticulum...")
// Create identity for announces
identity, err := identity.NewIdentity()
if err != nil {
return fmt.Errorf("failed to create identity: %v", err)
}
debugLog(2, "Created new identity: %x", identity.Hash())
// Create announce
// Create announce using r.identity
announce, err := announce.NewAnnounce(
identity,
r.identity,
[]byte("Reticulum-Go"),
nil, // No ratchet ID
false, // Not a path response
nil,
false,
)
if err != nil {
return fmt.Errorf("failed to create announce: %v", err)

View File

@@ -287,26 +287,25 @@ func Remember(packet []byte, destHash []byte, publicKey []byte, appData []byte)
func ValidateAnnounce(packet []byte, destHash []byte, publicKey []byte, signature []byte, appData []byte) bool {
if len(publicKey) != KEYSIZE/8 {
log.Printf("[DEBUG-7] Invalid public key length: %d", len(publicKey))
return false
}
log.Printf("[DEBUG-7] Validating announce for destination hash: %x", destHash)
announced := &Identity{}
announced.publicKey = publicKey[:KEYSIZE/16]
announced.verificationKey = publicKey[KEYSIZE/16:]
// Split public key into encryption and verification keys
announced := &Identity{
publicKey: publicKey[:KEYSIZE/16],
verificationKey: publicKey[KEYSIZE/16:],
}
// Verify signature
signedData := append(destHash, publicKey...)
signedData = append(signedData, appData...)
if !announced.Verify(signedData, signature) {
log.Printf("[DEBUG-7] Signature verification failed")
return false
}
// Store in known destinations
Remember(packet, destHash, publicKey, appData)
log.Printf("[DEBUG-7] Announce validated and remembered successfully")
return true
}
@@ -315,14 +314,13 @@ func FromPublicKey(publicKey []byte) *Identity {
return nil
}
i := &Identity{
return &Identity{
publicKey: publicKey[:KEYSIZE/16],
verificationKey: publicKey[KEYSIZE/16:],
ratchets: make(map[string][]byte),
ratchetExpiry: make(map[string]int64),
mutex: &sync.RWMutex{},
}
return i
}
func (i *Identity) Hex() string {
@@ -831,3 +829,16 @@ func (i *Identity) CleanupExpiredRatchets() {
log.Printf("[DEBUG-7] Cleaned up %d expired ratchets, %d remaining", cleaned, len(i.ratchets))
}
// ValidateAnnounce validates an announce packet's signature
func (i *Identity) ValidateAnnounce(data []byte, destHash []byte, appData []byte) bool {
if i == nil || len(data) < ed25519.SignatureSize {
return false
}
signatureStart := len(data) - ed25519.SignatureSize
signature := data[signatureStart:]
signedData := append(destHash, appData...)
return ed25519.Verify(i.verificationKey, signedData, signature)
}

View File

@@ -29,14 +29,14 @@ const (
DEBUG_LEVEL = 4 // Default debug level for interface logging
// Add more debug levels
DEBUG_CRITICAL = 1 // Critical errors
DEBUG_ERROR = 2 // Non-critical errors
DEBUG_INFO = 3 // Important information
DEBUG_VERBOSE = 4 // Detailed information
DEBUG_TRACE = 5 // Very detailed tracing
DEBUG_PACKETS = 6 // Packet-level details
DEBUG_ALL = 7 // Everything
// Debug levels
DEBUG_CRITICAL = 1
DEBUG_ERROR = 2
DEBUG_INFO = 3
DEBUG_VERBOSE = 4
DEBUG_TRACE = 5
DEBUG_PACKETS = 6
DEBUG_ALL = 7
)
type Interface interface {
@@ -60,6 +60,7 @@ type Interface interface {
Stop() error
GetMTU() int
GetConn() net.Conn
GetBandwidthAvailable() bool
common.NetworkInterface
}
@@ -126,8 +127,7 @@ func (i *BaseInterface) ProcessIncoming(data []byte) {
func (i *BaseInterface) ProcessOutgoing(data []byte) error {
if !i.Online || i.Detached {
log.Printf("[DEBUG-1] Interface %s: Cannot process outgoing packet - interface offline or detached",
i.Name)
log.Printf("[DEBUG-1] Interface %s: Cannot process outgoing packet - interface offline or detached", i.Name)
return fmt.Errorf("interface offline or detached")
}
@@ -135,8 +135,7 @@ func (i *BaseInterface) ProcessOutgoing(data []byte) error {
i.TxBytes += uint64(len(data))
i.mutex.Unlock()
log.Printf("[DEBUG-%d] Interface %s: Processed outgoing packet of %d bytes, total TX: %d",
DEBUG_LEVEL, i.Name, len(data), i.TxBytes)
log.Printf("[DEBUG-%d] Interface %s: Processed outgoing packet of %d bytes, total TX: %d", DEBUG_LEVEL, i.Name, len(data), i.TxBytes)
return nil
}
@@ -146,7 +145,7 @@ func (i *BaseInterface) SendPathRequest(packet []byte) error {
}
frame := make([]byte, 0, len(packet)+1)
frame = append(frame, 0x01) // Path request type
frame = append(frame, 0x01)
frame = append(frame, packet...)
return i.ProcessOutgoing(frame)
@@ -158,7 +157,7 @@ func (i *BaseInterface) SendLinkPacket(dest []byte, data []byte, timestamp time.
}
frame := make([]byte, 0, len(dest)+len(data)+9)
frame = append(frame, 0x02) // Link packet type
frame = append(frame, 0x02)
frame = append(frame, dest...)
ts := make([]byte, 8)
@@ -190,8 +189,7 @@ func (i *BaseInterface) Enable() {
i.Enabled = true
i.Online = true
log.Printf("[DEBUG-%d] Interface %s: State changed - Enabled: %v->%v, Online: %v->%v",
DEBUG_INFO, i.Name, prevState, i.Enabled, !i.Online, i.Online)
log.Printf("[DEBUG-%d] Interface %s: State changed - Enabled: %v->%v, Online: %v->%v", DEBUG_INFO, i.Name, prevState, i.Enabled, !i.Online, i.Online)
}
func (i *BaseInterface) Disable() {
@@ -239,13 +237,11 @@ func (i *BaseInterface) Stop() error {
}
func (i *BaseInterface) Send(data []byte, address string) error {
log.Printf("[DEBUG-%d] Interface %s: Sending %d bytes to %s",
DEBUG_LEVEL, i.Name, len(data), address)
log.Printf("[DEBUG-%d] Interface %s: Sending %d bytes to %s", DEBUG_LEVEL, i.Name, len(data), address)
err := i.ProcessOutgoing(data)
if err != nil {
log.Printf("[DEBUG-1] Interface %s: Failed to send data: %v",
i.Name, err)
log.Printf("[DEBUG-1] Interface %s: Failed to send data: %v", i.Name, err)
return err
}
@@ -264,21 +260,17 @@ func (i *BaseInterface) GetBandwidthAvailable() bool {
now := time.Now()
timeSinceLastTx := now.Sub(i.lastTx)
// If no recent transmission, bandwidth is available
if timeSinceLastTx > time.Second {
log.Printf("[DEBUG-%d] Interface %s: Bandwidth available (idle for %.2fs)",
DEBUG_VERBOSE, i.Name, timeSinceLastTx.Seconds())
log.Printf("[DEBUG-%d] Interface %s: Bandwidth available (idle for %.2fs)", DEBUG_VERBOSE, i.Name, timeSinceLastTx.Seconds())
return true
}
// Calculate current usage over the last second
bytesPerSec := float64(i.TxBytes) / timeSinceLastTx.Seconds()
currentUsage := bytesPerSec * 8 // Convert to bits
currentUsage := bytesPerSec * 8
maxUsage := float64(i.Bitrate) * PROPAGATION_RATE
available := currentUsage < maxUsage
log.Printf("[DEBUG-%d] Interface %s: Bandwidth stats - Current: %.2f bps, Max: %.2f bps, Usage: %.1f%%, Available: %v",
DEBUG_VERBOSE, i.Name, currentUsage, maxUsage, (currentUsage/maxUsage)*100, available)
log.Printf("[DEBUG-%d] Interface %s: Bandwidth stats - Current: %.2f bps, Max: %.2f bps, Usage: %.1f%%, Available: %v", DEBUG_VERBOSE, i.Name, currentUsage, maxUsage, (currentUsage/maxUsage)*100, available)
return available
}
@@ -290,6 +282,5 @@ func (i *BaseInterface) updateBandwidthStats(bytes uint64) {
i.TxBytes += bytes
i.lastTx = time.Now()
log.Printf("[DEBUG-%d] Interface %s: Updated bandwidth stats - TX bytes: %d, Last TX: %v",
DEBUG_LEVEL, i.Name, i.TxBytes, i.lastTx)
log.Printf("[DEBUG-%d] Interface %s: Updated bandwidth stats - TX bytes: %d, Last TX: %v", DEBUG_LEVEL, i.Name, i.TxBytes, i.lastTx)
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"log"
"net"
"runtime"
"sync"
"syscall"
"time"
@@ -50,7 +51,6 @@ type TCPClientInterface struct {
enabled bool
TxBytes uint64
RxBytes uint64
startTime time.Time
lastTx time.Time
lastRx time.Time
}
@@ -103,6 +103,19 @@ func (tc *TCPClientInterface) Start() error {
return err
}
tc.conn = conn
// Set platform-specific timeouts
switch runtime.GOOS {
case "linux":
if err := tc.setTimeoutsLinux(); err != nil {
log.Printf("[DEBUG-2] Failed to set Linux TCP timeouts: %v", err)
}
case "darwin":
if err := tc.setTimeoutsOSX(); err != nil {
log.Printf("[DEBUG-2] Failed to set OSX TCP timeouts: %v", err)
}
}
tc.Online = true
go tc.readLoop()
return nil
@@ -126,31 +139,31 @@ func (tc *TCPClientInterface) readLoop() {
return
}
// Update RX bytes
tc.mutex.Lock()
tc.RxBytes += uint64(n)
tc.mutex.Unlock()
// Update RX bytes for raw received data
tc.UpdateStats(uint64(n), true)
for i := 0; i < n; i++ {
b := buffer[i]
if tc.kissFraming {
// KISS framing logic
if inFrame && b == KISS_FEND {
inFrame = false
tc.handlePacket(dataBuffer)
dataBuffer = dataBuffer[:0]
} else if b == KISS_FEND {
inFrame = true
} else if inFrame {
if b == KISS_FEND {
if inFrame && len(dataBuffer) > 0 {
tc.handlePacket(dataBuffer)
dataBuffer = dataBuffer[:0]
}
inFrame = !inFrame
continue
}
if inFrame {
if b == KISS_FESC {
escape = true
} else {
if escape {
if b == KISS_TFEND {
b = KISS_FEND
}
if b == KISS_TFESC {
} else if b == KISS_TFESC {
b = KISS_FESC
}
escape = false
@@ -160,13 +173,16 @@ func (tc *TCPClientInterface) readLoop() {
}
} else {
// HDLC framing logic
if inFrame && b == HDLC_FLAG {
inFrame = false
tc.handlePacket(dataBuffer)
dataBuffer = dataBuffer[:0]
} else if b == HDLC_FLAG {
inFrame = true
} else if inFrame {
if b == HDLC_FLAG {
if inFrame && len(dataBuffer) > 0 {
tc.handlePacket(dataBuffer)
dataBuffer = dataBuffer[:0]
}
inFrame = !inFrame
continue
}
if inFrame {
if b == HDLC_ESC {
escape = true
} else {
@@ -241,15 +257,8 @@ func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error {
frame = append(frame, HDLC_FLAG)
}
tc.mutex.Lock()
tc.TxBytes += uint64(len(frame))
lastTx := time.Now()
tc.lastTx = lastTx
tc.mutex.Unlock()
log.Printf("[DEBUG-5] Interface %s TX: %d bytes, total: %d, rate: %.2f Kbps",
tc.GetName(), len(frame), tc.TxBytes,
float64(tc.TxBytes*8)/(time.Since(tc.startTime).Seconds()*1000))
// Update TX stats before sending
tc.UpdateStats(uint64(len(frame)), false)
_, err := tc.conn.Write(frame)
return err
@@ -478,6 +487,37 @@ func (tc *TCPClientInterface) GetStats() (tx uint64, rx uint64, lastTx time.Time
return tc.TxBytes, tc.RxBytes, tc.lastTx, tc.lastRx
}
func (tc *TCPClientInterface) setTimeoutsLinux() error {
tcpConn, ok := tc.conn.(*net.TCPConn)
if !ok {
return fmt.Errorf("not a TCP connection")
}
if !tc.i2pTunneled {
if err := tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err := tcpConn.SetKeepAlivePeriod(time.Duration(TCP_PROBE_INTERVAL) * time.Second); err != nil {
return err
}
}
return nil
}
func (tc *TCPClientInterface) setTimeoutsOSX() error {
tcpConn, ok := tc.conn.(*net.TCPConn)
if !ok {
return fmt.Errorf("not a TCP connection")
}
if err := tcpConn.SetKeepAlive(true); err != nil {
return err
}
return nil
}
type TCPServerInterface struct {
BaseInterface
connections map[string]net.Conn
@@ -575,7 +615,31 @@ func (ts *TCPServerInterface) Start() error {
ts.mutex.Lock()
defer ts.mutex.Unlock()
addr := fmt.Sprintf("%s:%d", ts.bindAddr, ts.bindPort)
listener, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("failed to start TCP server: %w", err)
}
ts.Online = true
// Accept connections in a goroutine
go func() {
for {
conn, err := listener.Accept()
if err != nil {
if !ts.Online {
return // Normal shutdown
}
log.Printf("[DEBUG-2] Error accepting connection: %v", err)
continue
}
// Handle each connection in a separate goroutine
go ts.handleConnection(conn)
}
}()
return nil
}
@@ -598,3 +662,62 @@ func (ts *TCPServerInterface) GetRxBytes() uint64 {
defer ts.mutex.RUnlock()
return ts.RxBytes
}
func (ts *TCPServerInterface) handleConnection(conn net.Conn) {
addr := conn.RemoteAddr().String()
ts.mutex.Lock()
ts.connections[addr] = conn
ts.mutex.Unlock()
defer func() {
ts.mutex.Lock()
delete(ts.connections, addr)
ts.mutex.Unlock()
conn.Close()
}()
buffer := make([]byte, ts.MTU)
for {
n, err := conn.Read(buffer)
if err != nil {
return
}
ts.mutex.Lock()
ts.RxBytes += uint64(n)
ts.mutex.Unlock()
if ts.packetCallback != nil {
ts.packetCallback(buffer[:n], ts)
}
}
}
func (ts *TCPServerInterface) ProcessOutgoing(data []byte) error {
ts.mutex.RLock()
defer ts.mutex.RUnlock()
if !ts.Online {
return fmt.Errorf("interface offline")
}
var frame []byte
if ts.kissFraming {
frame = append([]byte{KISS_FEND}, escapeKISS(data)...)
frame = append(frame, KISS_FEND)
} else {
frame = append([]byte{HDLC_FLAG}, escapeHDLC(data)...)
frame = append(frame, HDLC_FLAG)
}
ts.TxBytes += uint64(len(frame))
for _, conn := range ts.connections {
if _, err := conn.Write(frame); err != nil {
log.Printf("[DEBUG-4] Error writing to connection %s: %v",
conn.RemoteAddr(), err)
}
}
return nil
}

View File

@@ -43,6 +43,9 @@ const (
PROVE_NONE = 0x00
PROVE_ALL = 0x01
PROVE_APP = 0x02
WATCHDOG_MIN_SLEEP = 0.025
WATCHDOG_INTERVAL = 0.1
)
type Link struct {
@@ -82,6 +85,13 @@ type Link struct {
proofStrategy byte
proofCallback func(*packet.Packet) bool
trackPhyStats bool
watchdogLock bool
watchdogActive bool
establishmentTimeout time.Duration
keepalive time.Duration
staleTime time.Duration
initiator bool
}
func NewLink(dest *destination.Destination, transport *transport.Transport, establishedCallback func(*Link), closedCallback func(*Link)) *Link {
@@ -97,6 +107,13 @@ func NewLink(dest *destination.Destination, transport *transport.Transport, esta
lastDataReceived: time.Time{},
lastDataSent: time.Time{},
pathFinder: pathfinder.NewPathFinder(),
watchdogLock: false,
watchdogActive: false,
establishmentTimeout: time.Duration(ESTABLISHMENT_TIMEOUT_PER_HOP * float64(time.Second)),
keepalive: time.Duration(KEEPALIVE * float64(time.Second)),
staleTime: time.Duration(STALE_TIME * float64(time.Second)),
initiator: false,
}
}
@@ -803,3 +820,51 @@ func min(a, b int) int {
}
return b
}
func (l *Link) startWatchdog() {
if l.watchdogActive {
return
}
l.watchdogActive = true
go l.watchdog()
}
func (l *Link) watchdog() {
for l.status != STATUS_CLOSED {
l.mutex.Lock()
if l.watchdogLock {
l.mutex.Unlock()
time.Sleep(time.Duration(WATCHDOG_MIN_SLEEP * float64(time.Second)))
continue
}
var sleepTime float64 = WATCHDOG_INTERVAL
switch l.status {
case STATUS_ACTIVE:
lastActivity := l.lastInbound
if l.lastOutbound.After(lastActivity) {
lastActivity = l.lastOutbound
}
if time.Since(lastActivity) > l.keepalive {
if l.initiator {
l.SendPacket([]byte{}) // Keepalive packet
}
if time.Since(lastActivity) > l.staleTime {
l.status = STATUS_CLOSED
l.teardownReason = STATUS_FAILED
if l.closedCallback != nil {
l.closedCallback(l)
}
}
}
}
l.mutex.Unlock()
time.Sleep(time.Duration(sleepTime * float64(time.Second)))
}
l.watchdogActive = false
}

View File

@@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"time"
"github.com/Sudo-Ivan/reticulum-go/pkg/identity"
)
const (
@@ -206,3 +208,29 @@ func (p *Packet) Serialize() ([]byte, error) {
return p.Raw, nil
}
func NewAnnouncePacket(destHash []byte, identity *identity.Identity, appData []byte, transportID []byte) (*Packet, error) {
// Create combined public key
pubKey := identity.GetPublicKey()
// Create signed data
signedData := append(destHash, pubKey...)
signedData = append(signedData, appData...)
// Sign the data
signature := identity.Sign(signedData)
// Combine all data
data := append(pubKey, appData...)
data = append(data, signature...)
p := &Packet{
HeaderType: HeaderType2,
PacketType: PacketTypeAnnounce,
TransportID: transportID,
DestinationHash: destHash,
Data: data,
}
return p, nil
}

View File

@@ -5,6 +5,18 @@ import (
"time"
)
const (
DefaultAnnounceRateTarget = 3600.0 // Default 1 hour between announces
DefaultAnnounceRateGrace = 3 // Default number of grace announces
DefaultAnnounceRatePenalty = 7200.0 // Default 2 hour penalty
DefaultBurstFreqNew = 3.5 // Default announces/sec for new interfaces
DefaultBurstFreq = 12.0 // Default announces/sec for established interfaces
DefaultBurstHold = 60 // Default seconds to hold after burst
DefaultBurstPenalty = 300 // Default seconds penalty after burst
DefaultMaxHeldAnnounces = 256 // Default max announces in hold queue
DefaultHeldReleaseInterval = 30 // Default seconds between releasing held announces
)
type Limiter struct {
rate float64
interval time.Duration
@@ -42,3 +54,140 @@ func (l *Limiter) Allow() bool {
l.allowance -= 1.0
return true
}
// AnnounceRateControl handles per-destination announce rate limiting
type AnnounceRateControl struct {
rateTarget float64
rateGrace int
ratePenalty float64
announceHistory map[string][]time.Time // Maps dest hash to announce times
mutex sync.RWMutex
}
func NewAnnounceRateControl(target float64, grace int, penalty float64) *AnnounceRateControl {
return &AnnounceRateControl{
rateTarget: target,
rateGrace: grace,
ratePenalty: penalty,
announceHistory: make(map[string][]time.Time),
}
}
func (arc *AnnounceRateControl) AllowAnnounce(destHash string) bool {
arc.mutex.Lock()
defer arc.mutex.Unlock()
history := arc.announceHistory[destHash]
now := time.Now()
// Cleanup old history entries
cutoff := now.Add(-24 * time.Hour)
newHistory := []time.Time{}
for _, t := range history {
if t.After(cutoff) {
newHistory = append(newHistory, t)
}
}
history = newHistory
// Allow if within grace period
if len(history) < arc.rateGrace {
arc.announceHistory[destHash] = append(history, now)
return true
}
// Check rate
lastAnnounce := history[len(history)-1]
waitTime := arc.rateTarget
if len(history) > arc.rateGrace {
waitTime += arc.ratePenalty
}
if now.Sub(lastAnnounce).Seconds() < waitTime {
return false
}
arc.announceHistory[destHash] = append(history, now)
return true
}
// IngressControl handles new destination announce rate limiting
type IngressControl struct {
enabled bool
burstFreqNew float64
burstFreq float64
burstHold time.Duration
burstPenalty time.Duration
maxHeldAnnounces int
heldReleaseInterval time.Duration
heldAnnounces map[string][]byte // Maps announce hash to announce data
lastBurst time.Time
announceCount int
mutex sync.RWMutex
}
func NewIngressControl(enabled bool) *IngressControl {
return &IngressControl{
enabled: enabled,
burstFreqNew: DefaultBurstFreqNew,
burstFreq: DefaultBurstFreq,
burstHold: time.Duration(DefaultBurstHold) * time.Second,
burstPenalty: time.Duration(DefaultBurstPenalty) * time.Second,
maxHeldAnnounces: DefaultMaxHeldAnnounces,
heldReleaseInterval: time.Duration(DefaultHeldReleaseInterval) * time.Second,
heldAnnounces: make(map[string][]byte),
lastBurst: time.Now(),
}
}
func (ic *IngressControl) ProcessAnnounce(announceHash string, announceData []byte, isNewDest bool) bool {
if !ic.enabled {
return true
}
ic.mutex.Lock()
defer ic.mutex.Unlock()
now := time.Now()
elapsed := now.Sub(ic.lastBurst)
// Reset counter if enough time has passed
if elapsed > ic.burstHold+ic.burstPenalty {
ic.announceCount = 0
ic.lastBurst = now
}
// Check burst frequency
maxFreq := ic.burstFreq
if isNewDest {
maxFreq = ic.burstFreqNew
}
ic.announceCount++
burstFreq := float64(ic.announceCount) / elapsed.Seconds()
// Hold announce if burst frequency exceeded
if burstFreq > maxFreq {
if len(ic.heldAnnounces) < ic.maxHeldAnnounces {
ic.heldAnnounces[announceHash] = announceData
}
return false
}
return true
}
func (ic *IngressControl) ReleaseHeldAnnounce() (string, []byte, bool) {
ic.mutex.Lock()
defer ic.mutex.Unlock()
// Return first held announce if any exist
for hash, data := range ic.heldAnnounces {
delete(ic.heldAnnounces, hash)
return hash, data, true
}
return "", nil, false
}

170
pkg/transport/announce.go Normal file
View File

@@ -0,0 +1,170 @@
package transport
import (
"crypto/sha256"
"encoding/hex"
"sort"
"sync"
"time"
"github.com/Sudo-Ivan/reticulum-go/pkg/rate"
)
const (
MaxRetries = 3
RetryInterval = 5 * time.Second
MaxQueueSize = 1000
MinPriorityDelta = 0.1
DefaultPropagationRate = 0.02 // 2% of bandwidth for announces
)
type AnnounceEntry struct {
Data []byte
HopCount int
RetryCount int
LastRetry time.Time
SourceIface string
Priority float64
Hash string
}
type AnnounceManager struct {
announces map[string]*AnnounceEntry
announceQueue map[string][]*AnnounceEntry
rateLimiter *rate.Limiter
mutex sync.RWMutex
}
func NewAnnounceManager() *AnnounceManager {
return &AnnounceManager{
announces: make(map[string]*AnnounceEntry),
announceQueue: make(map[string][]*AnnounceEntry),
rateLimiter: rate.NewLimiter(DefaultPropagationRate, 1),
mutex: sync.RWMutex{},
}
}
func (am *AnnounceManager) ProcessAnnounce(data []byte, sourceIface string) error {
hash := sha256.Sum256(data)
hashStr := hex.EncodeToString(hash[:])
am.mutex.Lock()
defer am.mutex.Unlock()
if entry, exists := am.announces[hashStr]; exists {
if entry.HopCount <= int(data[0]) {
return nil
}
entry.HopCount = int(data[0])
entry.Data = data
entry.RetryCount = 0
entry.LastRetry = time.Now()
entry.Priority = calculatePriority(data[0], 0)
return nil
}
entry := &AnnounceEntry{
Data: data,
HopCount: int(data[0]),
RetryCount: 0,
LastRetry: time.Now(),
SourceIface: sourceIface,
Priority: calculatePriority(data[0], 0),
Hash: hashStr,
}
am.announces[hashStr] = entry
for iface := range am.announceQueue {
if iface != sourceIface {
am.queueAnnounce(entry, iface)
}
}
return nil
}
func (am *AnnounceManager) queueAnnounce(entry *AnnounceEntry, iface string) {
queue := am.announceQueue[iface]
if len(queue) >= MaxQueueSize {
// Remove lowest priority announce if queue is full
queue = queue[:len(queue)-1]
}
insertIdx := sort.Search(len(queue), func(i int) bool {
return queue[i].Priority < entry.Priority
})
queue = append(queue[:insertIdx], append([]*AnnounceEntry{entry}, queue[insertIdx:]...)...)
am.announceQueue[iface] = queue
}
func (am *AnnounceManager) GetNextAnnounce(iface string) *AnnounceEntry {
am.mutex.Lock()
defer am.mutex.Unlock()
queue := am.announceQueue[iface]
if len(queue) == 0 {
return nil
}
entry := queue[0]
now := time.Now()
if entry.RetryCount >= MaxRetries {
am.announceQueue[iface] = queue[1:]
delete(am.announces, entry.Hash)
return am.GetNextAnnounce(iface)
}
if now.Sub(entry.LastRetry) < RetryInterval {
return nil
}
if !am.rateLimiter.Allow() {
return nil
}
entry.RetryCount++
entry.LastRetry = now
entry.Priority = calculatePriority(byte(entry.HopCount), entry.RetryCount)
am.announceQueue[iface] = queue[1:]
am.queueAnnounce(entry, iface)
return entry
}
func calculatePriority(hopCount byte, retryCount int) float64 {
basePriority := 1.0 / float64(hopCount)
retryPenalty := float64(retryCount) * MinPriorityDelta
return basePriority - retryPenalty
}
func (am *AnnounceManager) CleanupExpired() {
am.mutex.Lock()
defer am.mutex.Unlock()
now := time.Now()
expiredHashes := make([]string, 0)
for hash, entry := range am.announces {
if entry.RetryCount >= MaxRetries || now.Sub(entry.LastRetry) > RetryInterval*MaxRetries {
expiredHashes = append(expiredHashes, hash)
}
}
for _, hash := range expiredHashes {
delete(am.announces, hash)
for iface, queue := range am.announceQueue {
newQueue := make([]*AnnounceEntry, 0, len(queue))
for _, entry := range queue {
if entry.Hash != hash {
newQueue = append(newQueue, entry)
}
}
am.announceQueue[iface] = newQueue
}
}
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/Sudo-Ivan/reticulum-go/pkg/announce"
"github.com/Sudo-Ivan/reticulum-go/pkg/common"
"github.com/Sudo-Ivan/reticulum-go/pkg/identity"
"github.com/Sudo-Ivan/reticulum-go/pkg/interfaces"
"github.com/Sudo-Ivan/reticulum-go/pkg/packet"
"github.com/Sudo-Ivan/reticulum-go/pkg/pathfinder"
@@ -616,65 +617,82 @@ func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) {
}
}
func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) {
// Validate minimum packet size (1 byte hop count + 32 bytes dest + 16 bytes identity + 4 bytes min app data)
if len(data) < 53 {
log.Printf("[DEBUG-3] Announce packet too small: %d bytes", len(data))
return
func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) error {
// Validate minimum packet size (1 byte hop count + 32 bytes dest hash + 16 bytes min identity + 1 byte min app data)
if len(data) < 50 {
return fmt.Errorf("announce packet too small: %d bytes", len(data))
}
announceHash := sha256.Sum256(data)
log.Printf("[DEBUG-3] Processing announce %x from interface %s",
announceHash[:8], iface.GetName())
t.mutex.Lock()
if _, seen := t.seenAnnounces[string(announceHash[:])]; seen {
t.mutex.Unlock()
log.Printf("[DEBUG-4] Ignoring duplicate announce %x", announceHash[:8])
return
}
t.seenAnnounces[string(announceHash[:])] = true
t.mutex.Unlock()
// Don't forward if max hops reached
if data[0] >= MAX_HOPS {
log.Printf("[DEBUG-3] Announce exceeded max hops: %d", data[0])
return
}
// Parse announce fields
// Extract fields
hopCount := data[0]
destHash := data[1:33]
identity := data[33:49]
identityBytes := data[33:49]
appData := data[49:]
// Check for duplicate announces
announceHash := sha256.Sum256(data)
hashStr := string(announceHash[:])
t.mutex.Lock()
if _, seen := t.seenAnnounces[hashStr]; seen {
t.mutex.Unlock()
log.Printf("[DEBUG-7] Ignoring duplicate announce %x", announceHash[:8])
return nil
}
t.seenAnnounces[hashStr] = true
t.mutex.Unlock()
// Validate announce signature and store destination
id := identity.FromPublicKey(identityBytes)
if id == nil || !id.ValidateAnnounce(data, destHash, appData) {
return fmt.Errorf("invalid announce signature")
}
// Don't forward if max hops reached
if hopCount >= MAX_HOPS {
log.Printf("[DEBUG-7] Announce exceeded max hops: %d", hopCount)
return nil
}
// Add random delay before retransmission (0-2 seconds)
delay := time.Duration(rand.Float64() * 2 * float64(time.Second))
time.Sleep(delay)
// Check bandwidth allocation for announces
if !t.announceRate.Allow() {
log.Printf("[DEBUG-3] Announce rate limit exceeded, dropping")
return
log.Printf("[DEBUG-7] Announce rate limit exceeded, dropping")
return nil
}
// Increment hop count for forwarding
data[0] = hopCount + 1
forwardData := make([]byte, len(data))
copy(forwardData, data)
forwardData[0] = hopCount + 1
// Forward to other interfaces
var lastErr error
for name, outIface := range t.interfaces {
if outIface == iface || !outIface.IsEnabled() {
continue
}
// Check interface mode restrictions
// if outIface.GetMode() == interfaces.ModeAccessPoint {
// log.Printf("[DEBUG-7] Blocking announce broadcast on %s due to AP mode", name)
// continue
// }
log.Printf("[DEBUG-7] Forwarding announce on interface %s", name)
if err := outIface.Send(data, ""); err != nil {
if err := outIface.Send(forwardData, ""); err != nil {
log.Printf("[DEBUG-3] Failed to forward announce on %s: %v", name, err)
lastErr = err
}
}
// Notify announce handlers
t.notifyAnnounceHandlers(destHash, identity, appData)
t.notifyAnnounceHandlers(destHash, identityBytes, appData)
return lastErr
}
func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface) {