From 63454b3bbbb902d8f8528ed50a5747ea2d775ca8 Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Tue, 30 Dec 2025 21:14:42 -0600 Subject: [PATCH] refactor: enhance UDPInterface with improved concurrency handling and consistent mutex naming --- pkg/interfaces/udp.go | 130 +++++++++++++++++++++++++++++------------- 1 file changed, 91 insertions(+), 39 deletions(-) diff --git a/pkg/interfaces/udp.go b/pkg/interfaces/udp.go index dd15788..3ea0bb2 100644 --- a/pkg/interfaces/udp.go +++ b/pkg/interfaces/udp.go @@ -14,8 +14,9 @@ type UDPInterface struct { conn *net.UDPConn addr *net.UDPAddr targetAddr *net.UDPAddr - mutex sync.RWMutex readBuffer []byte + done chan struct{} + stopOnce sync.Once } func NewUDPInterface(name string, addr string, target string, enabled bool) (*UDPInterface, error) { @@ -36,10 +37,11 @@ func NewUDPInterface(name string, addr string, target string, enabled bool) (*UD BaseInterface: NewBaseInterface(name, common.IF_TYPE_UDP, enabled), addr: udpAddr, targetAddr: targetAddr, - readBuffer: make([]byte, 1064), + readBuffer: make([]byte, common.NUM_1064), + done: make(chan struct{}), } - ui.MTU = 1064 + ui.MTU = common.NUM_1064 return ui, nil } @@ -57,24 +59,30 @@ func (ui *UDPInterface) GetMode() common.InterfaceMode { } func (ui *UDPInterface) IsOnline() bool { - ui.mutex.RLock() - defer ui.mutex.RUnlock() + ui.Mutex.RLock() + defer ui.Mutex.RUnlock() return ui.Online } func (ui *UDPInterface) IsDetached() bool { - ui.mutex.RLock() - defer ui.mutex.RUnlock() + ui.Mutex.RLock() + defer ui.Mutex.RUnlock() return ui.Detached } func (ui *UDPInterface) Detach() { - ui.mutex.Lock() - defer ui.mutex.Unlock() + ui.Mutex.Lock() + defer ui.Mutex.Unlock() ui.Detached = true + ui.Online = false if ui.conn != nil { ui.conn.Close() // #nosec G104 } + ui.stopOnce.Do(func() { + if ui.done != nil { + close(ui.done) + } + }) } func (ui *UDPInterface) Send(data []byte, addr string) error { @@ -88,10 +96,9 @@ func (ui *UDPInterface) Send(data []byte, addr string) error { return fmt.Errorf("no target address configured") } - // Update TX stats before sending - ui.mutex.Lock() + ui.Mutex.Lock() ui.TxBytes += uint64(len(data)) - ui.mutex.Unlock() + ui.Mutex.Unlock() _, err := ui.conn.WriteTo(data, ui.targetAddr) if err != nil { @@ -103,14 +110,14 @@ func (ui *UDPInterface) Send(data []byte, addr string) error { } func (ui *UDPInterface) SetPacketCallback(callback common.PacketCallback) { - ui.mutex.Lock() - defer ui.mutex.Unlock() + ui.Mutex.Lock() + defer ui.Mutex.Unlock() ui.packetCallback = callback } func (ui *UDPInterface) GetPacketCallback() common.PacketCallback { - ui.mutex.RLock() - defer ui.mutex.RUnlock() + ui.Mutex.RLock() + defer ui.Mutex.RUnlock() return ui.packetCallback } @@ -134,9 +141,9 @@ func (ui *UDPInterface) ProcessOutgoing(data []byte) error { return fmt.Errorf("UDP write failed: %v", err) } - ui.mutex.Lock() + ui.Mutex.Lock() ui.TxBytes += uint64(len(data)) - ui.mutex.Unlock() + ui.Mutex.Unlock() return nil } @@ -146,14 +153,14 @@ func (ui *UDPInterface) GetConn() net.Conn { } func (ui *UDPInterface) GetTxBytes() uint64 { - ui.mutex.RLock() - defer ui.mutex.RUnlock() + ui.Mutex.RLock() + defer ui.Mutex.RUnlock() return ui.TxBytes } func (ui *UDPInterface) GetRxBytes() uint64 { - ui.mutex.RLock() - defer ui.mutex.RUnlock() + ui.Mutex.RLock() + defer ui.Mutex.RUnlock() return ui.RxBytes } @@ -166,18 +173,36 @@ func (ui *UDPInterface) GetBitrate() int { } func (ui *UDPInterface) Enable() { - ui.mutex.Lock() - defer ui.mutex.Unlock() + ui.Mutex.Lock() + defer ui.Mutex.Unlock() ui.Online = true } func (ui *UDPInterface) Disable() { - ui.mutex.Lock() - defer ui.mutex.Unlock() + ui.Mutex.Lock() + defer ui.Mutex.Unlock() ui.Online = false } func (ui *UDPInterface) Start() error { + ui.Mutex.Lock() + if ui.conn != nil { + ui.Mutex.Unlock() + return fmt.Errorf("UDP interface already started") + } + // Only recreate done if it's nil or was closed + select { + case <-ui.done: + ui.done = make(chan struct{}) + ui.stopOnce = sync.Once{} + default: + if ui.done == nil { + ui.done = make(chan struct{}) + ui.stopOnce = sync.Once{} + } + } + ui.Mutex.Unlock() + conn, err := net.ListenUDP("udp", ui.addr) if err != nil { return err @@ -187,15 +212,17 @@ func (ui *UDPInterface) Start() error { // Enable broadcast mode if we have a target address if ui.targetAddr != nil { // Get the raw connection file descriptor to set SO_BROADCAST - if err := conn.SetReadBuffer(1064); err != nil { + if err := conn.SetReadBuffer(common.NUM_1064); err != nil { debug.Log(debug.DEBUG_ERROR, "Failed to set read buffer size", "error", err) } - if err := conn.SetWriteBuffer(1064); err != nil { + if err := conn.SetWriteBuffer(common.NUM_1064); err != nil { debug.Log(debug.DEBUG_ERROR, "Failed to set write buffer size", "error", err) } } + ui.Mutex.Lock() ui.Online = true + ui.Mutex.Unlock() // Start the read loop in a goroutine go ui.readLoop() @@ -203,19 +230,43 @@ func (ui *UDPInterface) Start() error { return nil } +func (ui *UDPInterface) Stop() error { + ui.Detach() + return nil +} + func (ui *UDPInterface) readLoop() { - buffer := make([]byte, 1064) - for ui.IsOnline() && !ui.IsDetached() { - n, remoteAddr, err := ui.conn.ReadFromUDP(buffer) + buffer := make([]byte, common.NUM_1064) + for { + ui.Mutex.RLock() + online := ui.Online + detached := ui.Detached + conn := ui.conn + done := ui.done + ui.Mutex.RUnlock() + + if !online || detached || conn == nil { + return + } + + select { + case <-done: + return + default: + } + + n, remoteAddr, err := conn.ReadFromUDP(buffer) if err != nil { - if ui.IsOnline() { + ui.Mutex.RLock() + stillOnline := ui.Online + ui.Mutex.RUnlock() + if stillOnline { debug.Log(debug.DEBUG_ERROR, "Error reading from UDP interface", "name", ui.Name, "error", err) } return } - // Update RX stats - ui.mutex.Lock() + ui.Mutex.Lock() // #nosec G115 - Network read sizes are always positive and within safe range ui.RxBytes += uint64(n) @@ -224,16 +275,17 @@ func (ui *UDPInterface) readLoop() { debug.Log(debug.DEBUG_ALL, "UDP interface discovered peer", "name", ui.Name, "peer", remoteAddr.String()) ui.targetAddr = remoteAddr } - ui.mutex.Unlock() + callback := ui.packetCallback + ui.Mutex.Unlock() - if ui.packetCallback != nil { - ui.packetCallback(buffer[:n], ui) + if callback != nil { + callback(buffer[:n], ui) } } } func (ui *UDPInterface) IsEnabled() bool { - ui.mutex.RLock() - defer ui.mutex.RUnlock() + ui.Mutex.RLock() + defer ui.Mutex.RUnlock() return ui.Enabled && ui.Online && !ui.Detached }