refactor: enhance AutoInterface with done channel and improved locking for concurrency

This commit is contained in:
2025-12-30 21:13:56 -06:00
parent 6cb90d3c4b
commit b0d2d4778f

View File

@@ -53,12 +53,13 @@ type AutoInterface struct {
timedOutInterfaces map[string]time.Time
allowedInterfaces []string
ignoredInterfaces []string
mutex sync.RWMutex
outboundConn *net.UDPConn
announceInterval time.Duration
peerJobInterval time.Duration
peeringTimeout time.Duration
mcastEchoTimeout time.Duration
done chan struct{}
stopOnce sync.Once
}
type AdoptedInterface struct {
@@ -135,6 +136,7 @@ func NewAutoInterface(name string, config *common.InterfaceConfig) (*AutoInterfa
peerJobInterval: PEER_JOB_INTERVAL,
peeringTimeout: PEERING_TIMEOUT,
mcastEchoTimeout: MCAST_ECHO_TIMEOUT,
done: make(chan struct{}),
}
debug.Log(debug.DEBUG_INFO, "AutoInterface configured", "name", name, "group", groupID, "mcast_addr", mcastAddr)
@@ -170,6 +172,20 @@ func normalizeMulticastType(mtype string) string {
}
func (ai *AutoInterface) Start() error {
ai.Mutex.Lock()
// Only recreate done if it's nil or was closed
select {
case <-ai.done:
ai.done = make(chan struct{})
ai.stopOnce = sync.Once{}
default:
if ai.done == nil {
ai.done = make(chan struct{})
ai.stopOnce = sync.Once{}
}
}
ai.Mutex.Unlock()
interfaces, err := net.Interfaces()
if err != nil {
return fmt.Errorf("failed to list interfaces: %v", err)
@@ -186,7 +202,9 @@ func (ai *AutoInterface) Start() error {
continue
}
if err := ai.configureInterface(&iface); err != nil {
ifaceCopy := iface
// bearer:disable go_gosec_memory_memory_aliasing
if err := ai.configureInterface(&ifaceCopy); err != nil {
debug.Log(debug.DEBUG_VERBOSE, "Failed to configure interface", "name", iface.Name, "error", err)
continue
}
@@ -262,7 +280,7 @@ func (ai *AutoInterface) configureInterface(iface *net.Interface) error {
return fmt.Errorf("no link-local IPv6 address found")
}
ai.mutex.Lock()
ai.Mutex.Lock()
ai.adoptedInterfaces[iface.Name] = &AdoptedInterface{
name: iface.Name,
linkLocalAddr: linkLocalAddr,
@@ -270,7 +288,7 @@ func (ai *AutoInterface) configureInterface(iface *net.Interface) error {
}
ai.linkLocalAddrs = append(ai.linkLocalAddrs, linkLocalAddr)
ai.multicastEchoes[iface.Name] = time.Now()
ai.mutex.Unlock()
ai.Mutex.Unlock()
if err := ai.startDiscoveryListener(iface); err != nil {
return fmt.Errorf("failed to start discovery listener: %v", err)
@@ -296,13 +314,13 @@ func (ai *AutoInterface) startDiscoveryListener(iface *net.Interface) error {
return err
}
if err := conn.SetReadBuffer(1024); err != nil {
if err := conn.SetReadBuffer(common.NUM_1024); err != nil {
debug.Log(debug.DEBUG_ERROR, "Failed to set discovery read buffer", "error", err)
}
ai.mutex.Lock()
ai.Mutex.Lock()
ai.discoveryServers[iface.Name] = conn
ai.mutex.Unlock()
ai.Mutex.Unlock()
go ai.handleDiscovery(conn, iface.Name)
debug.Log(debug.DEBUG_VERBOSE, "Discovery listener started", "interface", iface.Name, "addr", ai.mcastDiscoveryAddr)
@@ -331,9 +349,9 @@ func (ai *AutoInterface) startDataListener(iface *net.Interface) error {
debug.Log(debug.DEBUG_ERROR, "Failed to set data read buffer", "error", err)
}
ai.mutex.Lock()
ai.Mutex.Lock()
ai.interfaceServers[iface.Name] = conn
ai.mutex.Unlock()
ai.Mutex.Unlock()
go ai.handleData(conn, iface.Name)
debug.Log(debug.DEBUG_VERBOSE, "Data listener started", "interface", iface.Name, "addr", addr)
@@ -341,8 +359,18 @@ func (ai *AutoInterface) startDataListener(iface *net.Interface) error {
}
func (ai *AutoInterface) handleDiscovery(conn *net.UDPConn, ifaceName string) {
buf := make([]byte, 1024)
buf := make([]byte, common.NUM_1024)
for {
ai.Mutex.RLock()
done := ai.done
ai.Mutex.RUnlock()
select {
case <-done:
return
default:
}
n, remoteAddr, err := conn.ReadFromUDP(buf)
if err != nil {
if ai.IsOnline() {
@@ -365,6 +393,16 @@ func (ai *AutoInterface) handleDiscovery(conn *net.UDPConn, ifaceName string) {
func (ai *AutoInterface) handleData(conn *net.UDPConn, ifaceName string) {
buf := make([]byte, ai.GetMTU())
for {
ai.Mutex.RLock()
done := ai.done
ai.Mutex.RUnlock()
select {
case <-done:
return
default:
}
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
if ai.IsOnline() {
@@ -380,8 +418,8 @@ func (ai *AutoInterface) handleData(conn *net.UDPConn, ifaceName string) {
}
func (ai *AutoInterface) handlePeerAnnounce(addr *net.UDPAddr, ifaceName string) {
ai.mutex.Lock()
defer ai.mutex.Unlock()
ai.Mutex.Lock()
defer ai.Mutex.Unlock()
peerIP := addr.IP.String()
@@ -412,17 +450,22 @@ func (ai *AutoInterface) announceLoop() {
ticker := time.NewTicker(ai.announceInterval)
defer ticker.Stop()
for range ticker.C {
if !ai.IsOnline() {
for {
select {
case <-ticker.C:
if !ai.IsOnline() {
return
}
ai.sendPeerAnnounce()
case <-ai.done:
return
}
ai.sendPeerAnnounce()
}
}
func (ai *AutoInterface) sendPeerAnnounce() {
ai.mutex.RLock()
defer ai.mutex.RUnlock()
ai.Mutex.RLock()
defer ai.Mutex.RUnlock()
for ifaceName, adoptedIface := range ai.adoptedInterfaces {
mcastAddr := &net.UDPAddr{
@@ -452,33 +495,38 @@ func (ai *AutoInterface) peerJobs() {
ticker := time.NewTicker(ai.peerJobInterval)
defer ticker.Stop()
for range ticker.C {
if !ai.IsOnline() {
for {
select {
case <-ticker.C:
if !ai.IsOnline() {
return
}
ai.Mutex.Lock()
now := time.Now()
for peerKey, peer := range ai.peers {
if now.Sub(peer.lastHeard) > ai.peeringTimeout {
delete(ai.peers, peerKey)
debug.Log(debug.DEBUG_VERBOSE, "Removed timed out peer", "peer", peerKey)
}
}
for ifaceName, echoTime := range ai.multicastEchoes {
if now.Sub(echoTime) > ai.mcastEchoTimeout {
if _, exists := ai.timedOutInterfaces[ifaceName]; !exists {
debug.Log(debug.DEBUG_INFO, "Interface timed out", "interface", ifaceName)
ai.timedOutInterfaces[ifaceName] = now
}
} else {
delete(ai.timedOutInterfaces, ifaceName)
}
}
ai.Mutex.Unlock()
case <-ai.done:
return
}
ai.mutex.Lock()
now := time.Now()
for peerKey, peer := range ai.peers {
if now.Sub(peer.lastHeard) > ai.peeringTimeout {
delete(ai.peers, peerKey)
debug.Log(debug.DEBUG_VERBOSE, "Removed timed out peer", "peer", peerKey)
}
}
for ifaceName, echoTime := range ai.multicastEchoes {
if now.Sub(echoTime) > ai.mcastEchoTimeout {
if _, exists := ai.timedOutInterfaces[ifaceName]; !exists {
debug.Log(debug.DEBUG_INFO, "Interface timed out", "interface", ifaceName)
ai.timedOutInterfaces[ifaceName] = now
}
} else {
delete(ai.timedOutInterfaces, ifaceName)
}
}
ai.mutex.Unlock()
}
}
@@ -487,8 +535,8 @@ func (ai *AutoInterface) Send(data []byte, address string) error {
return fmt.Errorf("interface offline")
}
ai.mutex.RLock()
defer ai.mutex.RUnlock()
ai.Mutex.RLock()
defer ai.Mutex.RUnlock()
if len(ai.peers) == 0 {
debug.Log(debug.DEBUG_TRACE, "No peers available for sending")
@@ -526,9 +574,7 @@ func (ai *AutoInterface) Send(data []byte, address string) error {
}
func (ai *AutoInterface) Stop() error {
ai.mutex.Lock()
defer ai.mutex.Unlock()
ai.Mutex.Lock()
ai.Online = false
ai.IN = false
ai.OUT = false
@@ -544,6 +590,13 @@ func (ai *AutoInterface) Stop() error {
if ai.outboundConn != nil {
ai.outboundConn.Close() // #nosec G104
}
ai.Mutex.Unlock()
ai.stopOnce.Do(func() {
if ai.done != nil {
close(ai.done)
}
})
debug.Log(debug.DEBUG_INFO, "AutoInterface stopped")
return nil