feat(interfaces): update AutoInterface with multicast address generation and duplicate data handling
This commit is contained in:
@@ -5,9 +5,9 @@ package interfaces
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -34,8 +34,16 @@ const (
|
||||
|
||||
MCAST_ADDR_TYPE_PERMANENT = "0"
|
||||
MCAST_ADDR_TYPE_TEMPORARY = "1"
|
||||
|
||||
MULTI_IF_DEQUE_LEN = 48
|
||||
MULTI_IF_DEQUE_TTL = 750 * time.Millisecond
|
||||
)
|
||||
|
||||
type DequeEntry struct {
|
||||
hash [32]byte
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type AutoInterface struct {
|
||||
BaseInterface
|
||||
groupID []byte
|
||||
@@ -45,7 +53,6 @@ type AutoInterface struct {
|
||||
discoveryScope string
|
||||
multicastAddrType string
|
||||
mcastDiscoveryAddr string
|
||||
ifacNetname string
|
||||
peers map[string]*Peer
|
||||
linkLocalAddrs []string
|
||||
adoptedInterfaces map[string]*AdoptedInterface
|
||||
@@ -60,6 +67,7 @@ type AutoInterface struct {
|
||||
peerJobInterval time.Duration
|
||||
peeringTimeout time.Duration
|
||||
mcastEchoTimeout time.Duration
|
||||
mifDeque []DequeEntry
|
||||
done chan struct{}
|
||||
stopOnce sync.Once
|
||||
}
|
||||
@@ -76,6 +84,24 @@ type Peer struct {
|
||||
addr *net.UDPAddr
|
||||
}
|
||||
|
||||
func descopeLinkLocal(addr string) string {
|
||||
// Drop scope specifier expressed as %ifname (macOS)
|
||||
if i := strings.Index(addr, "%"); i != -1 {
|
||||
addr = addr[:i]
|
||||
}
|
||||
|
||||
// Drop embedded scope specifier (NetBSD, OpenBSD)
|
||||
// Python: re.sub(r"fe80:[0-9a-f]*::","fe80::", link_local_addr)
|
||||
if strings.HasPrefix(addr, "fe80:") {
|
||||
parts := strings.Split(addr, ":")
|
||||
// Check for fe80:[scope]::...
|
||||
if len(parts) >= 3 && parts[2] == "" && parts[1] != "" {
|
||||
return "fe80::" + strings.Join(parts[3:], ":")
|
||||
}
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
func NewAutoInterface(name string, config *common.InterfaceConfig) (*AutoInterface, error) {
|
||||
groupID := DEFAULT_GROUP_ID
|
||||
if config.GroupID != "" {
|
||||
@@ -88,6 +114,9 @@ func NewAutoInterface(name string, config *common.InterfaceConfig) (*AutoInterfa
|
||||
}
|
||||
|
||||
multicastAddrType := MCAST_ADDR_TYPE_TEMPORARY
|
||||
if config.MulticastAddrType != "" {
|
||||
multicastAddrType = normalizeMulticastType(config.MulticastAddrType)
|
||||
}
|
||||
|
||||
discoveryPort := DEFAULT_DISCOVERY_PORT
|
||||
if config.DiscoveryPort != 0 {
|
||||
@@ -101,8 +130,13 @@ func NewAutoInterface(name string, config *common.InterfaceConfig) (*AutoInterfa
|
||||
|
||||
groupHash := sha256.Sum256([]byte(groupID))
|
||||
|
||||
ifacNetname := hex.EncodeToString(groupHash[:])[:16]
|
||||
mcastAddr := fmt.Sprintf("ff%s%s::%s", discoveryScope, multicastAddrType, ifacNetname)
|
||||
// Python-compatible multicast address generation
|
||||
// gt = "0:" + "{:02x}".format(g[3]+(g[2]<<8)) + ":" + ...
|
||||
gt := "0"
|
||||
for i := 1; i <= 6; i++ {
|
||||
gt += fmt.Sprintf(":%02x%02x", groupHash[i*2], groupHash[i*2+1])
|
||||
}
|
||||
mcastAddr := fmt.Sprintf("ff%s%s:%s", multicastAddrType, discoveryScope, gt)
|
||||
|
||||
ai := &AutoInterface{
|
||||
BaseInterface: BaseInterface{
|
||||
@@ -124,7 +158,6 @@ func NewAutoInterface(name string, config *common.InterfaceConfig) (*AutoInterfa
|
||||
discoveryScope: discoveryScope,
|
||||
multicastAddrType: multicastAddrType,
|
||||
mcastDiscoveryAddr: mcastAddr,
|
||||
ifacNetname: ifacNetname,
|
||||
peers: make(map[string]*Peer),
|
||||
linkLocalAddrs: make([]string, 0),
|
||||
adoptedInterfaces: make(map[string]*AdoptedInterface),
|
||||
@@ -138,6 +171,7 @@ func NewAutoInterface(name string, config *common.InterfaceConfig) (*AutoInterfa
|
||||
peerJobInterval: PEER_JOB_INTERVAL,
|
||||
peeringTimeout: PEERING_TIMEOUT,
|
||||
mcastEchoTimeout: MCAST_ECHO_TIMEOUT,
|
||||
mifDeque: make([]DequeEntry, 0, MULTI_IF_DEQUE_LEN),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
@@ -272,7 +306,7 @@ func (ai *AutoInterface) configureInterface(iface *net.Interface) error {
|
||||
for _, addr := range addrs {
|
||||
if ipnet, ok := addr.(*net.IPNet); ok {
|
||||
if ipnet.IP.To4() == nil && ipnet.IP.IsLinkLocalUnicast() {
|
||||
linkLocalAddr = ipnet.IP.String()
|
||||
linkLocalAddr = descopeLinkLocal(ipnet.IP.String())
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -381,12 +415,17 @@ func (ai *AutoInterface) handleDiscovery(conn *net.UDPConn, ifaceName string) {
|
||||
return
|
||||
}
|
||||
|
||||
if n >= len(ai.groupHash) {
|
||||
receivedHash := buf[:len(ai.groupHash)]
|
||||
if bytes.Equal(receivedHash, ai.groupHash) {
|
||||
// Python: discovery_token = RNS.Identity.full_hash(self.group_id+ipv6_src[0].encode("utf-8"))
|
||||
peerIP := descopeLinkLocal(remoteAddr.IP.String())
|
||||
tokenSource := append(ai.groupID, []byte(peerIP)...)
|
||||
expectedHash := sha256.Sum256(tokenSource)
|
||||
|
||||
if n >= len(expectedHash) {
|
||||
receivedHash := buf[:len(expectedHash)]
|
||||
if bytes.Equal(receivedHash, expectedHash[:]) {
|
||||
ai.handlePeerAnnounce(remoteAddr, ifaceName)
|
||||
} else {
|
||||
debug.Log(debug.DEBUG_TRACE, "Received discovery with mismatched group hash", "interface", ifaceName)
|
||||
debug.Log(debug.DEBUG_TRACE, "Received discovery with mismatched group hash", "interface", ifaceName, "peer", peerIP)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -405,7 +444,7 @@ func (ai *AutoInterface) handleData(conn *net.UDPConn, ifaceName string) {
|
||||
default:
|
||||
}
|
||||
|
||||
n, _, err := conn.ReadFromUDP(buf)
|
||||
n, remoteAddr, err := conn.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
if ai.IsOnline() {
|
||||
debug.Log(debug.DEBUG_ERROR, "Data read error", "interface", ifaceName, "error", err)
|
||||
@@ -413,8 +452,41 @@ func (ai *AutoInterface) handleData(conn *net.UDPConn, ifaceName string) {
|
||||
return
|
||||
}
|
||||
|
||||
data := buf[:n]
|
||||
dataHash := sha256.Sum256(data)
|
||||
now := time.Now()
|
||||
|
||||
ai.Mutex.Lock()
|
||||
// Check for duplicate in mifDeque
|
||||
isDuplicate := false
|
||||
for i := 0; i < len(ai.mifDeque); i++ {
|
||||
if ai.mifDeque[i].hash == dataHash && now.Sub(ai.mifDeque[i].timestamp) < MULTI_IF_DEQUE_TTL {
|
||||
isDuplicate = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if isDuplicate {
|
||||
ai.Mutex.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// Add to deque
|
||||
ai.mifDeque = append(ai.mifDeque, DequeEntry{hash: dataHash, timestamp: now})
|
||||
if len(ai.mifDeque) > MULTI_IF_DEQUE_LEN {
|
||||
ai.mifDeque = ai.mifDeque[1:]
|
||||
}
|
||||
|
||||
// Refresh peer if known
|
||||
peerIP := descopeLinkLocal(remoteAddr.IP.String())
|
||||
peerKey := peerIP + "%" + ifaceName
|
||||
if peer, exists := ai.peers[peerKey]; exists {
|
||||
peer.lastHeard = now
|
||||
}
|
||||
ai.Mutex.Unlock()
|
||||
|
||||
if callback := ai.GetPacketCallback(); callback != nil {
|
||||
callback(buf[:n], ai)
|
||||
callback(data, ai)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -485,7 +557,11 @@ func (ai *AutoInterface) sendPeerAnnounce() {
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := ai.outboundConn.WriteToUDP(ai.groupHash, mcastAddr); err != nil {
|
||||
// Python: discovery_token = RNS.Identity.full_hash(self.group_id+link_local_address.encode("utf-8"))
|
||||
tokenSource := append(ai.groupID, []byte(adoptedIface.linkLocalAddr)...)
|
||||
token := sha256.Sum256(tokenSource)
|
||||
|
||||
if _, err := ai.outboundConn.WriteToUDP(token[:], mcastAddr); err != nil {
|
||||
debug.Log(debug.DEBUG_VERBOSE, "Failed to send peer announce", "interface", ifaceName, "error", err)
|
||||
} else {
|
||||
debug.Log(debug.DEBUG_TRACE, "Sent peer announce", "interface", adoptedIface.name)
|
||||
|
||||
Reference in New Issue
Block a user