refactor: enhance TCPClientInterface and TCPServerInterface with improved concurrency handling and added connection timeout

This commit is contained in:
2025-12-30 21:14:30 -06:00
parent 22c54f2252
commit 73b982c6e0

View File

@@ -32,11 +32,15 @@ const (
TCP_PROBE_AFTER_SEC = 5 TCP_PROBE_AFTER_SEC = 5
TCP_PROBE_INTERVAL_SEC = 2 TCP_PROBE_INTERVAL_SEC = 2
TCP_PROBES_COUNT = 12 TCP_PROBES_COUNT = 12
TCP_CONNECT_TIMEOUT = 10 * time.Second
TCP_MILLISECONDS = 1000
I2P_USER_TIMEOUT_SEC = 45 I2P_USER_TIMEOUT_SEC = 45
I2P_PROBE_AFTER_SEC = 10 I2P_PROBE_AFTER_SEC = 10
I2P_PROBE_INTERVAL_SEC = 9 I2P_PROBE_INTERVAL_SEC = 9
I2P_PROBES_COUNT = 5 I2P_PROBES_COUNT = 5
SO_KEEPALIVE_ENABLE = 1
) )
type TCPClientInterface struct { type TCPClientInterface struct {
@@ -53,12 +57,8 @@ type TCPClientInterface struct {
maxReconnectTries int maxReconnectTries int
packetBuffer []byte packetBuffer []byte
packetType byte packetType byte
mutex sync.RWMutex done chan struct{}
enabled bool stopOnce sync.Once
TxBytes uint64
RxBytes uint64
lastTx time.Time
lastRx time.Time
} }
func NewTCPClientInterface(name string, targetHost string, targetPort int, kissFraming bool, i2pTunneled bool, enabled bool) (*TCPClientInterface, error) { func NewTCPClientInterface(name string, targetHost string, targetPort int, kissFraming bool, i2pTunneled bool, enabled bool) (*TCPClientInterface, error) {
@@ -69,10 +69,10 @@ func NewTCPClientInterface(name string, targetHost string, targetPort int, kissF
kissFraming: kissFraming, kissFraming: kissFraming,
i2pTunneled: i2pTunneled, i2pTunneled: i2pTunneled,
initiator: true, initiator: true,
enabled: enabled,
maxReconnectTries: RECONNECT_WAIT * TCP_PROBES_COUNT, maxReconnectTries: RECONNECT_WAIT * TCP_PROBES_COUNT,
packetBuffer: make([]byte, 0), packetBuffer: make([]byte, 0),
neverConnected: true, neverConnected: true,
done: make(chan struct{}),
} }
if enabled { if enabled {
@@ -90,25 +90,41 @@ func NewTCPClientInterface(name string, targetHost string, targetPort int, kissF
} }
func (tc *TCPClientInterface) Start() error { func (tc *TCPClientInterface) Start() error {
tc.mutex.Lock() tc.Mutex.Lock()
defer tc.mutex.Unlock() if !tc.Enabled || tc.Detached {
tc.Mutex.Unlock()
if !tc.Enabled { return fmt.Errorf("interface not enabled or detached")
return fmt.Errorf("interface not enabled")
} }
if tc.conn != nil { if tc.conn != nil {
tc.Online = true tc.Online = true
go tc.readLoop() go tc.readLoop()
tc.Mutex.Unlock()
return nil return nil
} }
// Only recreate done if it's nil or was closed
select {
case <-tc.done:
tc.done = make(chan struct{})
tc.stopOnce = sync.Once{}
default:
if tc.done == nil {
tc.done = make(chan struct{})
tc.stopOnce = sync.Once{}
}
}
tc.Mutex.Unlock()
addr := net.JoinHostPort(tc.targetAddr, fmt.Sprintf("%d", tc.targetPort)) addr := net.JoinHostPort(tc.targetAddr, fmt.Sprintf("%d", tc.targetPort))
conn, err := net.Dial("tcp", addr) conn, err := net.DialTimeout("tcp", addr, TCP_CONNECT_TIMEOUT)
if err != nil { if err != nil {
return err return err
} }
tc.Mutex.Lock()
tc.conn = conn tc.conn = conn
tc.Mutex.Unlock()
// Set platform-specific timeouts // Set platform-specific timeouts
switch runtime.GOOS { switch runtime.GOOS {
@@ -122,11 +138,33 @@ func (tc *TCPClientInterface) Start() error {
} }
} }
tc.Mutex.Lock()
tc.Online = true tc.Online = true
tc.Mutex.Unlock()
go tc.readLoop() go tc.readLoop()
return nil return nil
} }
func (tc *TCPClientInterface) Stop() error {
tc.Mutex.Lock()
tc.Enabled = false
tc.Online = false
if tc.conn != nil {
_ = tc.conn.Close()
tc.conn = nil
}
tc.Mutex.Unlock()
tc.stopOnce.Do(func() {
if tc.done != nil {
close(tc.done)
}
})
return nil
}
func (tc *TCPClientInterface) readLoop() { func (tc *TCPClientInterface) readLoop() {
buffer := make([]byte, tc.MTU) buffer := make([]byte, tc.MTU)
inFrame := false inFrame := false
@@ -134,10 +172,30 @@ func (tc *TCPClientInterface) readLoop() {
dataBuffer := make([]byte, 0) dataBuffer := make([]byte, 0)
for { for {
n, err := tc.conn.Read(buffer) tc.Mutex.RLock()
conn := tc.conn
done := tc.done
tc.Mutex.RUnlock()
if conn == nil {
return
}
select {
case <-done:
return
default:
}
n, err := conn.Read(buffer)
if err != nil { if err != nil {
tc.Mutex.Lock()
tc.Online = false tc.Online = false
if tc.initiator && !tc.Detached { detached := tc.Detached
initiator := tc.initiator
tc.Mutex.Unlock()
if initiator && !detached {
go tc.reconnect() go tc.reconnect()
} else { } else {
tc.teardown() tc.teardown()
@@ -145,7 +203,6 @@ func (tc *TCPClientInterface) readLoop() {
return return
} }
// Update RX bytes for raw received data
tc.UpdateStats(uint64(n), true) // #nosec G115 tc.UpdateStats(uint64(n), true) // #nosec G115
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@@ -181,16 +238,17 @@ func (tc *TCPClientInterface) handlePacket(data []byte) {
return return
} }
tc.mutex.Lock() tc.Mutex.Lock()
tc.RxBytes += uint64(len(data)) tc.RxBytes += uint64(len(data))
lastRx := time.Now() lastRx := time.Now()
tc.lastRx = lastRx tc.lastRx = lastRx
tc.mutex.Unlock() callback := tc.packetCallback
tc.Mutex.Unlock()
debug.Log(debug.DEBUG_ALL, "Received packet", "type", fmt.Sprintf("0x%02x", data[0]), "size", len(data)) debug.Log(debug.DEBUG_ALL, "Received packet", "type", fmt.Sprintf("0x%02x", data[0]), "size", len(data))
// For RNS packets, call the packet callback directly // For RNS packets, call the packet callback directly
if callback := tc.GetPacketCallback(); callback != nil { if callback != nil {
debug.Log(debug.DEBUG_ALL, "Calling packet callback for RNS packet") debug.Log(debug.DEBUG_ALL, "Calling packet callback for RNS packet")
callback(data, tc) callback(data, tc)
} else { } else {
@@ -212,7 +270,11 @@ func (tc *TCPClientInterface) Send(data []byte, address string) error {
} }
func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error { func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error {
if !tc.Online { tc.Mutex.RLock()
online := tc.Online
tc.Mutex.RUnlock()
if !online {
return fmt.Errorf("interface offline") return fmt.Errorf("interface offline")
} }
@@ -224,11 +286,19 @@ func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error {
frame = append([]byte{HDLC_FLAG}, escapeHDLC(data)...) frame = append([]byte{HDLC_FLAG}, escapeHDLC(data)...)
frame = append(frame, HDLC_FLAG) frame = append(frame, HDLC_FLAG)
// Update TX stats before sending tc.UpdateStats(uint64(len(frame)), false) // #nosec G115
tc.UpdateStats(uint64(len(frame)), false)
debug.Log(debug.DEBUG_ALL, "TCP interface writing to network", "name", tc.Name, "bytes", len(frame)) debug.Log(debug.DEBUG_ALL, "TCP interface writing to network", "name", tc.Name, "bytes", len(frame))
_, err := tc.conn.Write(frame)
tc.Mutex.RLock()
conn := tc.conn
tc.Mutex.RUnlock()
if conn == nil {
return fmt.Errorf("connection closed")
}
_, err := conn.Write(frame)
if err != nil { if err != nil {
debug.Log(debug.DEBUG_CRITICAL, "TCP interface write failed", "name", tc.Name, "error", err) debug.Log(debug.DEBUG_CRITICAL, "TCP interface write failed", "name", tc.Name, "error", err)
} }
@@ -240,7 +310,7 @@ func (tc *TCPClientInterface) teardown() {
tc.IN = false tc.IN = false
tc.OUT = false tc.OUT = false
if tc.conn != nil { if tc.conn != nil {
tc.conn.Close() // #nosec G104 _ = tc.conn.Close()
} }
} }
@@ -276,9 +346,9 @@ func (tc *TCPClientInterface) SetPacketCallback(cb common.PacketCallback) {
} }
func (tc *TCPClientInterface) IsEnabled() bool { func (tc *TCPClientInterface) IsEnabled() bool {
tc.mutex.RLock() tc.Mutex.RLock()
defer tc.mutex.RUnlock() defer tc.Mutex.RUnlock()
return tc.enabled && tc.Online && !tc.Detached return tc.Enabled && tc.Online && !tc.Detached
} }
func (tc *TCPClientInterface) GetName() string { func (tc *TCPClientInterface) GetName() string {
@@ -286,31 +356,31 @@ func (tc *TCPClientInterface) GetName() string {
} }
func (tc *TCPClientInterface) GetPacketCallback() common.PacketCallback { func (tc *TCPClientInterface) GetPacketCallback() common.PacketCallback {
tc.mutex.RLock() tc.Mutex.RLock()
defer tc.mutex.RUnlock() defer tc.Mutex.RUnlock()
return tc.packetCallback return tc.packetCallback
} }
func (tc *TCPClientInterface) IsDetached() bool { func (tc *TCPClientInterface) IsDetached() bool {
tc.mutex.RLock() tc.Mutex.RLock()
defer tc.mutex.RUnlock() defer tc.Mutex.RUnlock()
return tc.Detached return tc.Detached
} }
func (tc *TCPClientInterface) IsOnline() bool { func (tc *TCPClientInterface) IsOnline() bool {
tc.mutex.RLock() tc.Mutex.RLock()
defer tc.mutex.RUnlock() defer tc.Mutex.RUnlock()
return tc.Online return tc.Online
} }
func (tc *TCPClientInterface) reconnect() { func (tc *TCPClientInterface) reconnect() {
tc.mutex.Lock() tc.Mutex.Lock()
if tc.reconnecting { if tc.reconnecting {
tc.mutex.Unlock() tc.Mutex.Unlock()
return return
} }
tc.reconnecting = true tc.reconnecting = true
tc.mutex.Unlock() tc.Mutex.Unlock()
backoff := time.Second backoff := time.Second
maxBackoff := time.Minute * 5 maxBackoff := time.Minute * 5
@@ -323,13 +393,13 @@ func (tc *TCPClientInterface) reconnect() {
conn, err := net.Dial("tcp", addr) conn, err := net.Dial("tcp", addr)
if err == nil { if err == nil {
tc.mutex.Lock() tc.Mutex.Lock()
tc.conn = conn tc.conn = conn
tc.Online = true tc.Online = true
tc.neverConnected = false tc.neverConnected = false
tc.reconnecting = false tc.reconnecting = false
tc.mutex.Unlock() tc.Mutex.Unlock()
go tc.readLoop() go tc.readLoop()
return return
@@ -349,35 +419,35 @@ func (tc *TCPClientInterface) reconnect() {
retries++ retries++
} }
tc.mutex.Lock() tc.Mutex.Lock()
tc.reconnecting = false tc.reconnecting = false
tc.mutex.Unlock() tc.Mutex.Unlock()
tc.teardown() tc.teardown()
debug.Log(debug.DEBUG_ERROR, "Failed to reconnect after all attempts", "target", net.JoinHostPort(tc.targetAddr, fmt.Sprintf("%d", tc.targetPort)), "maxTries", tc.maxReconnectTries) debug.Log(debug.DEBUG_ERROR, "Failed to reconnect after all attempts", "target", net.JoinHostPort(tc.targetAddr, fmt.Sprintf("%d", tc.targetPort)), "maxTries", tc.maxReconnectTries)
} }
func (tc *TCPClientInterface) Enable() { func (tc *TCPClientInterface) Enable() {
tc.mutex.Lock() tc.Mutex.Lock()
defer tc.mutex.Unlock() defer tc.Mutex.Unlock()
tc.Online = true tc.Online = true
} }
func (tc *TCPClientInterface) Disable() { func (tc *TCPClientInterface) Disable() {
tc.mutex.Lock() tc.Mutex.Lock()
defer tc.mutex.Unlock() defer tc.Mutex.Unlock()
tc.Online = false tc.Online = false
} }
func (tc *TCPClientInterface) IsConnected() bool { func (tc *TCPClientInterface) IsConnected() bool {
tc.mutex.RLock() tc.Mutex.RLock()
defer tc.mutex.RUnlock() defer tc.Mutex.RUnlock()
return tc.conn != nil && tc.Online && !tc.reconnecting return tc.conn != nil && tc.Online && !tc.reconnecting
} }
func (tc *TCPClientInterface) GetRTT() time.Duration { func (tc *TCPClientInterface) GetRTT() time.Duration {
tc.mutex.RLock() tc.Mutex.RLock()
defer tc.mutex.RUnlock() defer tc.Mutex.RUnlock()
if !tc.IsConnected() { if !tc.IsConnected() {
return 0 return 0
@@ -401,20 +471,20 @@ func (tc *TCPClientInterface) GetRTT() time.Duration {
} }
func (tc *TCPClientInterface) GetTxBytes() uint64 { func (tc *TCPClientInterface) GetTxBytes() uint64 {
tc.mutex.RLock() tc.Mutex.RLock()
defer tc.mutex.RUnlock() defer tc.Mutex.RUnlock()
return tc.TxBytes return tc.TxBytes
} }
func (tc *TCPClientInterface) GetRxBytes() uint64 { func (tc *TCPClientInterface) GetRxBytes() uint64 {
tc.mutex.RLock() tc.Mutex.RLock()
defer tc.mutex.RUnlock() defer tc.Mutex.RUnlock()
return tc.RxBytes return tc.RxBytes
} }
func (tc *TCPClientInterface) UpdateStats(bytes uint64, isRx bool) { func (tc *TCPClientInterface) UpdateStats(bytes uint64, isRx bool) {
tc.mutex.Lock() tc.Mutex.Lock()
defer tc.mutex.Unlock() defer tc.Mutex.Unlock()
now := time.Now() now := time.Now()
if isRx { if isRx {
@@ -429,23 +499,22 @@ func (tc *TCPClientInterface) UpdateStats(bytes uint64, isRx bool) {
} }
func (tc *TCPClientInterface) GetStats() (tx uint64, rx uint64, lastTx time.Time, lastRx time.Time) { func (tc *TCPClientInterface) GetStats() (tx uint64, rx uint64, lastTx time.Time, lastRx time.Time) {
tc.mutex.RLock() tc.Mutex.RLock()
defer tc.mutex.RUnlock() defer tc.Mutex.RUnlock()
return tc.TxBytes, tc.RxBytes, tc.lastTx, tc.lastRx return tc.TxBytes, tc.RxBytes, tc.lastTx, tc.lastRx
} }
type TCPServerInterface struct { type TCPServerInterface struct {
BaseInterface BaseInterface
connections map[string]net.Conn connections map[string]net.Conn
mutex sync.RWMutex listener net.Listener
bindAddr string bindAddr string
bindPort int bindPort int
preferIPv6 bool preferIPv6 bool
kissFraming bool kissFraming bool
i2pTunneled bool i2pTunneled bool
packetCallback common.PacketCallback done chan struct{}
TxBytes uint64 stopOnce sync.Once
RxBytes uint64
} }
func NewTCPServerInterface(name string, bindAddr string, bindPort int, kissFraming bool, i2pTunneled bool, preferIPv6 bool) (*TCPServerInterface, error) { func NewTCPServerInterface(name string, bindAddr string, bindPort int, kissFraming bool, i2pTunneled bool, preferIPv6 bool) (*TCPServerInterface, error) {
@@ -456,6 +525,7 @@ func NewTCPServerInterface(name string, bindAddr string, bindPort int, kissFrami
Type: common.IF_TYPE_TCP, Type: common.IF_TYPE_TCP,
Online: false, Online: false,
MTU: common.DEFAULT_MTU, MTU: common.DEFAULT_MTU,
Enabled: true,
Detached: false, Detached: false,
}, },
connections: make(map[string]net.Conn), connections: make(map[string]net.Conn),
@@ -464,6 +534,7 @@ func NewTCPServerInterface(name string, bindAddr string, bindPort int, kissFrami
preferIPv6: preferIPv6, preferIPv6: preferIPv6,
kissFraming: kissFraming, kissFraming: kissFraming,
i2pTunneled: i2pTunneled, i2pTunneled: i2pTunneled,
done: make(chan struct{}),
} }
return ts, nil return ts, nil
@@ -482,21 +553,21 @@ func (ts *TCPServerInterface) String() string {
} }
func (ts *TCPServerInterface) SetPacketCallback(callback common.PacketCallback) { func (ts *TCPServerInterface) SetPacketCallback(callback common.PacketCallback) {
ts.mutex.Lock() ts.Mutex.Lock()
defer ts.mutex.Unlock() defer ts.Mutex.Unlock()
ts.packetCallback = callback ts.packetCallback = callback
} }
func (ts *TCPServerInterface) GetPacketCallback() common.PacketCallback { func (ts *TCPServerInterface) GetPacketCallback() common.PacketCallback {
ts.mutex.RLock() ts.Mutex.RLock()
defer ts.mutex.RUnlock() defer ts.Mutex.RUnlock()
return ts.packetCallback return ts.packetCallback
} }
func (ts *TCPServerInterface) IsEnabled() bool { func (ts *TCPServerInterface) IsEnabled() bool {
ts.mutex.RLock() ts.Mutex.RLock()
defer ts.mutex.RUnlock() defer ts.Mutex.RUnlock()
return ts.BaseInterface.Enabled && ts.BaseInterface.Online && !ts.BaseInterface.Detached return ts.Enabled && ts.Online && !ts.Detached
} }
func (ts *TCPServerInterface) GetName() string { func (ts *TCPServerInterface) GetName() string {
@@ -504,32 +575,47 @@ func (ts *TCPServerInterface) GetName() string {
} }
func (ts *TCPServerInterface) IsDetached() bool { func (ts *TCPServerInterface) IsDetached() bool {
ts.mutex.RLock() ts.Mutex.RLock()
defer ts.mutex.RUnlock() defer ts.Mutex.RUnlock()
return ts.BaseInterface.Detached return ts.Detached
} }
func (ts *TCPServerInterface) IsOnline() bool { func (ts *TCPServerInterface) IsOnline() bool {
ts.mutex.RLock() ts.Mutex.RLock()
defer ts.mutex.RUnlock() defer ts.Mutex.RUnlock()
return ts.Online return ts.Online
} }
func (ts *TCPServerInterface) Enable() { func (ts *TCPServerInterface) Enable() {
ts.mutex.Lock() ts.Mutex.Lock()
defer ts.mutex.Unlock() defer ts.Mutex.Unlock()
ts.Online = true ts.Online = true
} }
func (ts *TCPServerInterface) Disable() { func (ts *TCPServerInterface) Disable() {
ts.mutex.Lock() ts.Mutex.Lock()
defer ts.mutex.Unlock() defer ts.Mutex.Unlock()
ts.Online = false ts.Online = false
} }
func (ts *TCPServerInterface) Start() error { func (ts *TCPServerInterface) Start() error {
ts.mutex.Lock() ts.Mutex.Lock()
defer ts.mutex.Unlock() if ts.listener != nil {
ts.Mutex.Unlock()
return fmt.Errorf("TCP server already started")
}
// Only recreate done if it's nil or was closed
select {
case <-ts.done:
ts.done = make(chan struct{})
ts.stopOnce = sync.Once{}
default:
if ts.done == nil {
ts.done = make(chan struct{})
ts.stopOnce = sync.Once{}
}
}
ts.Mutex.Unlock()
addr := net.JoinHostPort(ts.bindAddr, fmt.Sprintf("%d", ts.bindPort)) addr := net.JoinHostPort(ts.bindAddr, fmt.Sprintf("%d", ts.bindPort))
listener, err := net.Listen("tcp", addr) listener, err := net.Listen("tcp", addr)
@@ -537,14 +623,30 @@ func (ts *TCPServerInterface) Start() error {
return fmt.Errorf("failed to start TCP server: %w", err) return fmt.Errorf("failed to start TCP server: %w", err)
} }
ts.Mutex.Lock()
ts.listener = listener
ts.Online = true ts.Online = true
ts.Mutex.Unlock()
// Accept connections in a goroutine // Accept connections in a goroutine
go func() { go func() {
for { for {
ts.Mutex.RLock()
done := ts.done
ts.Mutex.RUnlock()
select {
case <-done:
return
default:
}
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
if !ts.Online { ts.Mutex.RLock()
online := ts.Online
ts.Mutex.RUnlock()
if !online {
return // Normal shutdown return // Normal shutdown
} }
debug.Log(debug.DEBUG_ERROR, "Error accepting connection", "error", err) debug.Log(debug.DEBUG_ERROR, "Error accepting connection", "error", err)
@@ -560,60 +662,87 @@ func (ts *TCPServerInterface) Start() error {
} }
func (ts *TCPServerInterface) Stop() error { func (ts *TCPServerInterface) Stop() error {
ts.mutex.Lock() ts.Mutex.Lock()
defer ts.mutex.Unlock()
ts.Online = false ts.Online = false
if ts.listener != nil {
_ = ts.listener.Close()
ts.listener = nil
}
// Close all client connections
for addr, conn := range ts.connections {
_ = conn.Close()
delete(ts.connections, addr)
}
ts.Mutex.Unlock()
ts.stopOnce.Do(func() {
if ts.done != nil {
close(ts.done)
}
})
return nil return nil
} }
func (ts *TCPServerInterface) GetTxBytes() uint64 { func (ts *TCPServerInterface) GetTxBytes() uint64 {
ts.mutex.RLock() ts.Mutex.RLock()
defer ts.mutex.RUnlock() defer ts.Mutex.RUnlock()
return ts.TxBytes return ts.TxBytes
} }
func (ts *TCPServerInterface) GetRxBytes() uint64 { func (ts *TCPServerInterface) GetRxBytes() uint64 {
ts.mutex.RLock() ts.Mutex.RLock()
defer ts.mutex.RUnlock() defer ts.Mutex.RUnlock()
return ts.RxBytes return ts.RxBytes
} }
func (ts *TCPServerInterface) handleConnection(conn net.Conn) { func (ts *TCPServerInterface) handleConnection(conn net.Conn) {
addr := conn.RemoteAddr().String() addr := conn.RemoteAddr().String()
ts.mutex.Lock() ts.Mutex.Lock()
ts.connections[addr] = conn ts.connections[addr] = conn
ts.mutex.Unlock() ts.Mutex.Unlock()
defer func() { defer func() {
ts.mutex.Lock() ts.Mutex.Lock()
delete(ts.connections, addr) delete(ts.connections, addr)
ts.mutex.Unlock() ts.Mutex.Unlock()
conn.Close() // #nosec G104 _ = conn.Close()
}() }()
buffer := make([]byte, ts.MTU) buffer := make([]byte, ts.MTU)
for { for {
ts.Mutex.RLock()
done := ts.done
ts.Mutex.RUnlock()
select {
case <-done:
return
default:
}
n, err := conn.Read(buffer) n, err := conn.Read(buffer)
if err != nil { if err != nil {
return return
} }
ts.mutex.Lock() ts.Mutex.Lock()
ts.RxBytes += uint64(n) // #nosec G115 ts.RxBytes += uint64(n) // #nosec G115
ts.mutex.Unlock() callback := ts.packetCallback
ts.Mutex.Unlock()
if ts.packetCallback != nil { if callback != nil {
ts.packetCallback(buffer[:n], ts) callback(buffer[:n], ts)
} }
} }
} }
func (ts *TCPServerInterface) ProcessOutgoing(data []byte) error { func (ts *TCPServerInterface) ProcessOutgoing(data []byte) error {
ts.mutex.RLock() ts.Mutex.RLock()
defer ts.mutex.RUnlock() online := ts.Online
ts.Mutex.RUnlock()
if !ts.Online { if !online {
return fmt.Errorf("interface offline") return fmt.Errorf("interface offline")
} }
@@ -626,9 +755,15 @@ func (ts *TCPServerInterface) ProcessOutgoing(data []byte) error {
frame = append(frame, HDLC_FLAG) frame = append(frame, HDLC_FLAG)
} }
ts.TxBytes += uint64(len(frame)) ts.Mutex.Lock()
ts.TxBytes += uint64(len(frame)) // #nosec G115
conns := make([]net.Conn, 0, len(ts.connections))
for _, conn := range ts.connections { for _, conn := range ts.connections {
conns = append(conns, conn)
}
ts.Mutex.Unlock()
for _, conn := range conns {
if _, err := conn.Write(frame); err != nil { if _, err := conn.Write(frame); err != nil {
debug.Log(debug.DEBUG_VERBOSE, "Error writing to connection", "address", conn.RemoteAddr(), "error", err) debug.Log(debug.DEBUG_VERBOSE, "Error writing to connection", "address", conn.RemoteAddr(), "error", err)
} }