This commit is contained in:
Sudo-Ivan
2025-01-01 01:41:16 -06:00
parent ae40d2879c
commit 73af84e24f
5 changed files with 455 additions and 262 deletions

View File

@@ -66,6 +66,7 @@ type Announce struct {
handlers []AnnounceHandler
ratchetID []byte
packet []byte
hash []byte
}
func New(dest *identity.Identity, appData []byte, pathResponse bool) (*Announce, error) {
@@ -388,5 +389,24 @@ func NewAnnounce(identity *identity.Identity, appData []byte, ratchetID []byte,
a.packet = packet
// Generate hash
a.Hash()
return a, nil
}
func (a *Announce) Hash() []byte {
if a.hash == nil {
// Generate hash from announce data
h := sha256.New()
h.Write(a.destinationHash)
h.Write(a.identity.GetPublicKey())
h.Write([]byte{a.hops})
h.Write(a.appData)
if a.ratchetID != nil {
h.Write(a.ratchetID)
}
a.hash = h.Sum(nil)
}
return a.hash
}

View File

@@ -3,6 +3,7 @@ package interfaces
import (
"encoding/binary"
"fmt"
"log"
"net"
"sync"
"time"
@@ -25,6 +26,17 @@ const (
TYPE_TCP = 0x02
PROPAGATION_RATE = 0.02 // 2% of interface bandwidth
DEBUG_LEVEL = 4 // Default debug level for interface logging
// Add more debug levels
DEBUG_CRITICAL = 1 // Critical errors
DEBUG_ERROR = 2 // Non-critical errors
DEBUG_INFO = 3 // Important information
DEBUG_VERBOSE = 4 // Detailed information
DEBUG_TRACE = 5 // Very detailed tracing
DEBUG_PACKETS = 6 // Packet-level details
DEBUG_ALL = 7 // Everything
)
type Interface interface {
@@ -99,6 +111,10 @@ func (i *BaseInterface) GetPacketCallback() common.PacketCallback {
}
func (i *BaseInterface) ProcessIncoming(data []byte) {
i.mutex.Lock()
i.RxBytes += uint64(len(data))
i.mutex.Unlock()
i.mutex.RLock()
callback := i.packetCallback
i.mutex.RUnlock()
@@ -106,15 +122,21 @@ func (i *BaseInterface) ProcessIncoming(data []byte) {
if callback != nil {
callback(data, i)
}
i.RxBytes += uint64(len(data))
}
func (i *BaseInterface) ProcessOutgoing(data []byte) error {
if !i.Online || i.Detached {
log.Printf("[DEBUG-1] Interface %s: Cannot process outgoing packet - interface offline or detached",
i.Name)
return fmt.Errorf("interface offline or detached")
}
i.mutex.Lock()
i.TxBytes += uint64(len(data))
i.mutex.Unlock()
log.Printf("[DEBUG-%d] Interface %s: Processed outgoing packet of %d bytes, total TX: %d",
DEBUG_LEVEL, i.Name, len(data), i.TxBytes)
return nil
}
@@ -163,8 +185,13 @@ func (i *BaseInterface) IsEnabled() bool {
func (i *BaseInterface) Enable() {
i.mutex.Lock()
defer i.mutex.Unlock()
prevState := i.Enabled
i.Enabled = true
i.Online = true
log.Printf("[DEBUG-%d] Interface %s: State changed - Enabled: %v->%v, Online: %v->%v",
DEBUG_INFO, i.Name, prevState, i.Enabled, !i.Online, i.Online)
}
func (i *BaseInterface) Disable() {
@@ -172,6 +199,7 @@ func (i *BaseInterface) Disable() {
defer i.mutex.Unlock()
i.Enabled = false
i.Online = false
log.Printf("[DEBUG-2] Interface %s: Disabled and offline", i.Name)
}
func (i *BaseInterface) GetName() string {
@@ -211,7 +239,18 @@ func (i *BaseInterface) Stop() error {
}
func (i *BaseInterface) Send(data []byte, address string) error {
return i.ProcessOutgoing(data)
log.Printf("[DEBUG-%d] Interface %s: Sending %d bytes to %s",
DEBUG_LEVEL, i.Name, len(data), address)
err := i.ProcessOutgoing(data)
if err != nil {
log.Printf("[DEBUG-1] Interface %s: Failed to send data: %v",
i.Name, err)
return err
}
i.updateBandwidthStats(uint64(len(data)))
return nil
}
func (i *BaseInterface) GetConn() net.Conn {
@@ -222,18 +261,26 @@ func (i *BaseInterface) GetBandwidthAvailable() bool {
i.mutex.RLock()
defer i.mutex.RUnlock()
// If no transmission in last second, bandwidth is available
if time.Since(i.lastTx) > time.Second {
now := time.Now()
timeSinceLastTx := now.Sub(i.lastTx)
// If no recent transmission, bandwidth is available
if timeSinceLastTx > time.Second {
log.Printf("[DEBUG-%d] Interface %s: Bandwidth available (idle for %.2fs)",
DEBUG_VERBOSE, i.Name, timeSinceLastTx.Seconds())
return true
}
// Calculate current bandwidth usage
bytesPerSec := float64(i.TxBytes) / time.Since(i.lastTx).Seconds()
currentUsage := bytesPerSec * 8 // Convert to bits/sec
// Check if usage is below threshold
// Calculate current usage over the last second
bytesPerSec := float64(i.TxBytes) / timeSinceLastTx.Seconds()
currentUsage := bytesPerSec * 8 // Convert to bits
maxUsage := float64(i.Bitrate) * PROPAGATION_RATE
return currentUsage < maxUsage
available := currentUsage < maxUsage
log.Printf("[DEBUG-%d] Interface %s: Bandwidth stats - Current: %.2f bps, Max: %.2f bps, Usage: %.1f%%, Available: %v",
DEBUG_VERBOSE, i.Name, currentUsage, maxUsage, (currentUsage/maxUsage)*100, available)
return available
}
func (i *BaseInterface) updateBandwidthStats(bytes uint64) {
@@ -242,4 +289,7 @@ func (i *BaseInterface) updateBandwidthStats(bytes uint64) {
i.TxBytes += bytes
i.lastTx = time.Now()
log.Printf("[DEBUG-%d] Interface %s: Updated bandwidth stats - TX bytes: %d, Last TX: %v",
DEBUG_LEVEL, i.Name, i.TxBytes, i.lastTx)
}

View File

@@ -2,6 +2,7 @@ package interfaces
import (
"fmt"
"log"
"net"
"sync"
"syscall"
@@ -47,6 +48,11 @@ type TCPClientInterface struct {
packetType byte
mutex sync.RWMutex
enabled bool
TxBytes uint64
RxBytes uint64
startTime time.Time
lastTx time.Time
lastRx time.Time
}
func NewTCPClientInterface(name string, targetHost string, targetPort int, kissFraming bool, i2pTunneled bool, enabled bool) (*TCPClientInterface, error) {
@@ -120,6 +126,11 @@ func (tc *TCPClientInterface) readLoop() {
return
}
// Update RX bytes
tc.mutex.Lock()
tc.RxBytes += uint64(n)
tc.mutex.Unlock()
for i := 0; i < n; i++ {
b := buffer[i]
@@ -178,8 +189,15 @@ func (tc *TCPClientInterface) handlePacket(data []byte) {
tc.mutex.Lock()
tc.packetType = data[0]
tc.RxBytes += uint64(len(data))
lastRx := time.Now()
tc.lastRx = lastRx
tc.mutex.Unlock()
log.Printf("[DEBUG-5] Interface %s RX: %d bytes, total: %d, rate: %.2f Kbps",
tc.GetName(), len(data), tc.RxBytes,
float64(tc.RxBytes*8)/(time.Since(tc.startTime).Seconds()*1000))
payload := data[1:]
switch tc.packetType {
@@ -219,13 +237,18 @@ func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error {
frame = append(frame, HDLC_FLAG)
}
if _, err := tc.conn.Write(frame); err != nil {
tc.teardown()
return fmt.Errorf("write failed: %v", err)
}
tc.mutex.Lock()
tc.TxBytes += uint64(len(frame))
lastTx := time.Now()
tc.lastTx = lastTx
tc.mutex.Unlock()
tc.BaseInterface.ProcessOutgoing(data)
return nil
log.Printf("[DEBUG-5] Interface %s TX: %d bytes, total: %d, rate: %.2f Kbps",
tc.GetName(), len(frame), tc.TxBytes,
float64(tc.TxBytes*8)/(time.Since(tc.startTime).Seconds()*1000))
_, err := tc.conn.Write(frame)
return err
}
func (tc *TCPClientInterface) teardown() {
@@ -415,6 +438,42 @@ func (tc *TCPClientInterface) GetRTT() time.Duration {
return 0
}
func (tc *TCPClientInterface) GetTxBytes() uint64 {
tc.mutex.RLock()
defer tc.mutex.RUnlock()
return tc.TxBytes
}
func (tc *TCPClientInterface) GetRxBytes() uint64 {
tc.mutex.RLock()
defer tc.mutex.RUnlock()
return tc.RxBytes
}
func (tc *TCPClientInterface) UpdateStats(bytes uint64, isRx bool) {
tc.mutex.Lock()
defer tc.mutex.Unlock()
now := time.Now()
if isRx {
tc.RxBytes += bytes
tc.lastRx = now
log.Printf("[DEBUG-5] Interface %s RX stats: bytes=%d total=%d last=%v",
tc.Name, bytes, tc.RxBytes, tc.lastRx)
} else {
tc.TxBytes += bytes
tc.lastTx = now
log.Printf("[DEBUG-5] Interface %s TX stats: bytes=%d total=%d last=%v",
tc.Name, bytes, tc.TxBytes, tc.lastTx)
}
}
func (tc *TCPClientInterface) GetStats() (tx uint64, rx uint64, lastTx time.Time, lastRx time.Time) {
tc.mutex.RLock()
defer tc.mutex.RUnlock()
return tc.TxBytes, tc.RxBytes, tc.lastTx, tc.lastRx
}
type TCPServerInterface struct {
BaseInterface
connections map[string]net.Conn
@@ -425,6 +484,8 @@ type TCPServerInterface struct {
kissFraming bool
i2pTunneled bool
packetCallback common.PacketCallback
TxBytes uint64
RxBytes uint64
}
func NewTCPServerInterface(name string, bindAddr string, bindPort int, kissFraming bool, i2pTunneled bool, preferIPv6 bool) (*TCPServerInterface, error) {
@@ -521,3 +582,15 @@ func (ts *TCPServerInterface) Stop() error {
ts.Online = false
return nil
}
func (ts *TCPServerInterface) GetTxBytes() uint64 {
ts.mutex.RLock()
defer ts.mutex.RUnlock()
return ts.TxBytes
}
func (ts *TCPServerInterface) GetRxBytes() uint64 {
ts.mutex.RLock()
defer ts.mutex.RUnlock()
return ts.RxBytes
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/Sudo-Ivan/reticulum-go/pkg/announce"
"github.com/Sudo-Ivan/reticulum-go/pkg/common"
"github.com/Sudo-Ivan/reticulum-go/pkg/interfaces"
"github.com/Sudo-Ivan/reticulum-go/pkg/packet"
"github.com/Sudo-Ivan/reticulum-go/pkg/pathfinder"
"github.com/Sudo-Ivan/reticulum-go/pkg/rate"
@@ -535,61 +536,75 @@ func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) {
}
packetType := data[0]
log.Printf("[DEBUG-4] Transport handling packet type 0x%02x from interface %s, size: %d bytes",
packetType, iface.GetName(), len(data))
// Update interface stats before processing
if tcpIface, ok := iface.(*interfaces.TCPClientInterface); ok {
tcpIface.UpdateStats(uint64(len(data)), true) // true for RX
}
switch packetType {
case 0x01: // Path Request
t.handlePathRequest(data[1:], iface)
case 0x02: // Link Packet
t.handleLinkPacket(data[1:], iface)
case 0x03: // Path Response
t.handlePathResponse(data[1:], iface)
case 0x04: // Announce
case 0x01: // Announce packet
t.handleAnnouncePacket(data[1:], iface)
case 0x02: // Link packet
t.handleLinkPacket(data[1:], iface)
case 0x03: // Path response
t.handlePathResponse(data[1:], iface)
case 0x04: // Transport packet
t.handleTransportPacket(data[1:], iface)
default:
log.Printf("[DEBUG-3] Unknown packet type 0x%02x from %s", packetType, iface.GetName())
}
}
func (t *Transport) handlePathRequest(data []byte, iface common.NetworkInterface) {
if len(data) < 33 { // 32 bytes hash + 1 byte TTL minimum
func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) {
announceHash := sha256.Sum256(data)
log.Printf("[DEBUG-3] Processing announce %x from interface %s",
announceHash[:8], iface.GetName())
if t.seenAnnounces[string(announceHash[:])] {
log.Printf("[DEBUG-4] Ignoring duplicate announce %x", announceHash[:8])
return
}
destHash := data[:32]
ttl := data[32]
var tag []byte
recursive := false
// Record this announce
t.seenAnnounces[string(announceHash[:])] = true
if len(data) > 33 {
tag = data[33 : len(data)-1]
recursive = data[len(data)-1] == 0x01
// Extract announce fields
if len(data) < 53 { // Minimum size for announce packet
return
}
// Check if we have a path to the destination
if t.HasPath(destHash) {
// Create and send path response
hops := t.HopsTo(destHash)
nextHop := t.NextHop(destHash)
// Don't forward if max hops reached
if data[0] >= MAX_HOPS {
return
}
response := make([]byte, 0, 64)
response = append(response, 0x03) // Path Response type
response = append(response, destHash...)
response = append(response, byte(hops))
response = append(response, nextHop...)
if len(tag) > 0 {
response = append(response, tag...)
}
// Add random delay before retransmission (0-2 seconds)
delay := time.Duration(rand.Float64() * 2 * float64(time.Second))
time.Sleep(delay)
iface.Send(response, "")
} else if recursive && ttl > 0 {
// Forward path request to other interfaces
newData := make([]byte, len(data))
copy(newData, data)
newData[32] = ttl - 1 // Decrease TTL
// Check bandwidth allocation for announces
if !t.announceRate.Allow() {
return
}
for name, otherIface := range t.interfaces {
if name != iface.GetName() && otherIface.IsEnabled() {
otherIface.Send(newData, "")
}
// Increment hop count and retransmit
data[0]++
t.broadcastAnnouncePacket(data)
}
func (t *Transport) broadcastAnnouncePacket(data []byte) error {
t.mutex.RLock()
defer t.mutex.RUnlock()
for _, iface := range t.interfaces {
if err := iface.Send(data, ""); err != nil {
return fmt.Errorf("failed to broadcast announce: %w", err)
}
}
return nil
}
func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface) {
@@ -647,58 +662,8 @@ func (t *Transport) handlePathResponse(data []byte, iface common.NetworkInterfac
}
}
func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) {
if len(data) < 32 {
return
}
p := &packet.Packet{
Data: data,
Header: [2]byte{
0x04, // Announce packet type
0x00, // Initial hop count
},
}
announceHash := sha256.Sum256(data)
if t.seenAnnounces[string(announceHash[:])] {
return
}
// Record this announce
t.seenAnnounces[string(announceHash[:])] = true
// Process the announce
if err := t.handleAnnounce(p); err != nil {
log.Printf("Error handling announce: %v", err)
return
}
// Broadcast to other interfaces based on interface mode
t.mutex.RLock()
for name, otherIface := range t.interfaces {
// Skip the interface we received from
if name == iface.GetName() {
continue
}
// Check interface modes for propagation rules
srcMode := iface.GetMode()
dstMode := otherIface.GetMode()
// Skip propagation based on interface modes
if srcMode == common.IF_MODE_ACCESS_POINT && dstMode != common.IF_MODE_FULL {
continue
}
if srcMode == common.IF_MODE_ROAMING && dstMode == common.IF_MODE_ACCESS_POINT {
continue
}
if err := otherIface.Send(p.Data, ""); err != nil {
log.Printf("Error broadcasting announce to %s: %v", name, err)
}
}
t.mutex.RUnlock()
func (t *Transport) handleTransportPacket(data []byte, iface common.NetworkInterface) {
// Handle transport packet
}
func (t *Transport) findLink(dest []byte) *Link {
@@ -900,49 +865,3 @@ func (l *Link) GetStatus() int {
defer l.mutex.RUnlock()
return l.status
}
func (t *Transport) handleAnnounce(p *packet.Packet) error {
// Skip if we've seen this announce before
announceHash := sha256.Sum256(p.Data)
if t.seenAnnounces[string(announceHash[:])] {
return nil
}
// Record this announce
t.seenAnnounces[string(announceHash[:])] = true
// Extract announce fields
if len(p.Data) < 53 { // Minimum size for announce packet
return errors.New("invalid announce packet size")
}
// Don't forward if max hops reached
if p.Header[1] >= MAX_HOPS {
return nil
}
// Add random delay before retransmission (0-2 seconds)
delay := time.Duration(rand.Float64() * 2 * float64(time.Second))
time.Sleep(delay)
// Check bandwidth allocation for announces
if !t.announceRate.Allow() {
return nil
}
// Increment hop count and retransmit
p.Header[1]++
return t.broadcastAnnouncePacket(p)
}
func (t *Transport) broadcastAnnouncePacket(p *packet.Packet) error {
t.mutex.RLock()
defer t.mutex.RUnlock()
for _, iface := range t.interfaces {
if err := iface.Send(p.Data, ""); err != nil {
return fmt.Errorf("failed to broadcast announce: %w", err)
}
}
return nil
}