This commit is contained in:
Sudo-Ivan
2025-01-01 02:03:12 -06:00
parent 73af84e24f
commit 3ffd5b72a1
5 changed files with 259 additions and 69 deletions

View File

@@ -384,11 +384,69 @@ func (t *Transport) UpdatePath(destinationHash []byte, nextHop []byte, interface
}
}
func (t *Transport) HandleAnnounce(destinationHash []byte, identity []byte, appData []byte, announceHash []byte) {
t.mutex.RLock()
defer t.mutex.RUnlock()
func (t *Transport) HandleAnnounce(data []byte, sourceIface common.NetworkInterface) error {
if len(data) < 53 { // Minimum size for announce packet
return fmt.Errorf("announce packet too small: %d bytes", len(data))
}
t.notifyAnnounceHandlers(destinationHash, identity, appData)
log.Printf("[DEBUG-7] Transport handling announce of %d bytes from %s",
len(data), sourceIface.GetName())
// Parse announce fields according to RNS spec
destHash := data[1:33]
identity := data[33:49]
appData := data[49:]
// Generate announce hash to check for duplicates
announceHash := sha256.Sum256(data)
hashStr := string(announceHash[:])
t.mutex.Lock()
if _, seen := t.seenAnnounces[hashStr]; seen {
t.mutex.Unlock()
log.Printf("[DEBUG-7] Ignoring duplicate announce %x", announceHash[:8])
return nil
}
t.seenAnnounces[hashStr] = true
t.mutex.Unlock()
// Don't forward if max hops reached
if data[0] >= MAX_HOPS {
log.Printf("[DEBUG-7] Announce exceeded max hops: %d", data[0])
return nil
}
// Add random delay before retransmission (0-2 seconds)
delay := time.Duration(rand.Float64() * 2 * float64(time.Second))
time.Sleep(delay)
// Check bandwidth allocation for announces
if !t.announceRate.Allow() {
log.Printf("[DEBUG-7] Announce rate limit exceeded, queuing...")
return nil
}
// Increment hop count
data[0]++
// Broadcast to all other interfaces
var lastErr error
for name, iface := range t.interfaces {
if iface == sourceIface || !iface.IsEnabled() {
continue
}
log.Printf("[DEBUG-7] Forwarding announce on interface %s", name)
if err := iface.Send(data, ""); err != nil {
log.Printf("[DEBUG-7] Failed to forward announce on %s: %v", name, err)
lastErr = err
}
}
// Notify handlers
t.notifyAnnounceHandlers(destHash, identity, appData)
return lastErr
}
func (t *Transport) NewDestination(identity interface{}, direction int, destType int, appName string, aspects ...string) *Destination {
@@ -541,13 +599,13 @@ func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) {
// Update interface stats before processing
if tcpIface, ok := iface.(*interfaces.TCPClientInterface); ok {
tcpIface.UpdateStats(uint64(len(data)), true) // true for RX
tcpIface.UpdateStats(uint64(len(data)), true)
}
switch packetType {
case 0x01: // Announce packet
case announce.PACKET_TYPE_ANNOUNCE:
t.handleAnnouncePacket(data[1:], iface)
case 0x02: // Link packet
case announce.PACKET_TYPE_LINK:
t.handleLinkPacket(data[1:], iface)
case 0x03: // Path response
t.handlePathResponse(data[1:], iface)
@@ -559,52 +617,64 @@ func (t *Transport) HandlePacket(data []byte, iface common.NetworkInterface) {
}
func (t *Transport) handleAnnouncePacket(data []byte, iface common.NetworkInterface) {
// Validate minimum packet size (1 byte hop count + 32 bytes dest + 16 bytes identity + 4 bytes min app data)
if len(data) < 53 {
log.Printf("[DEBUG-3] Announce packet too small: %d bytes", len(data))
return
}
announceHash := sha256.Sum256(data)
log.Printf("[DEBUG-3] Processing announce %x from interface %s",
announceHash[:8], iface.GetName())
if t.seenAnnounces[string(announceHash[:])] {
t.mutex.Lock()
if _, seen := t.seenAnnounces[string(announceHash[:])]; seen {
t.mutex.Unlock()
log.Printf("[DEBUG-4] Ignoring duplicate announce %x", announceHash[:8])
return
}
// Record this announce
t.seenAnnounces[string(announceHash[:])] = true
// Extract announce fields
if len(data) < 53 { // Minimum size for announce packet
return
}
t.mutex.Unlock()
// Don't forward if max hops reached
if data[0] >= MAX_HOPS {
log.Printf("[DEBUG-3] Announce exceeded max hops: %d", data[0])
return
}
// Parse announce fields
hopCount := data[0]
destHash := data[1:33]
identity := data[33:49]
appData := data[49:]
// Add random delay before retransmission (0-2 seconds)
delay := time.Duration(rand.Float64() * 2 * float64(time.Second))
time.Sleep(delay)
// Check bandwidth allocation for announces
if !t.announceRate.Allow() {
log.Printf("[DEBUG-3] Announce rate limit exceeded, dropping")
return
}
// Increment hop count and retransmit
data[0]++
t.broadcastAnnouncePacket(data)
}
// Increment hop count for forwarding
data[0] = hopCount + 1
func (t *Transport) broadcastAnnouncePacket(data []byte) error {
t.mutex.RLock()
defer t.mutex.RUnlock()
// Forward to other interfaces
for name, outIface := range t.interfaces {
if outIface == iface || !outIface.IsEnabled() {
continue
}
for _, iface := range t.interfaces {
if err := iface.Send(data, ""); err != nil {
return fmt.Errorf("failed to broadcast announce: %w", err)
log.Printf("[DEBUG-7] Forwarding announce on interface %s", name)
if err := outIface.Send(data, ""); err != nil {
log.Printf("[DEBUG-3] Failed to forward announce on %s: %v", name, err)
}
}
return nil
// Notify announce handlers
t.notifyAnnounceHandlers(destHash, identity, appData)
}
func (t *Transport) handleLinkPacket(data []byte, iface common.NetworkInterface) {