feat(interfaces): implement ProcessOutgoing method for TCPClientInterface and remove unused statistics methods
This commit is contained in:
@@ -167,6 +167,40 @@ func (tc *TCPClientInterface) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error {
|
||||
tc.Mutex.RLock()
|
||||
online := tc.Online
|
||||
tc.Mutex.RUnlock()
|
||||
|
||||
if !online {
|
||||
return fmt.Errorf("interface offline")
|
||||
}
|
||||
|
||||
tc.writing = true
|
||||
defer func() { tc.writing = false }()
|
||||
|
||||
// For TCP connections, use HDLC framing
|
||||
var frame []byte
|
||||
frame = append([]byte{HDLC_FLAG}, escapeHDLC(data)...)
|
||||
frame = append(frame, HDLC_FLAG)
|
||||
|
||||
debug.Log(debug.DEBUG_ALL, "TCP interface writing to network", "name", tc.Name, "bytes", len(frame))
|
||||
|
||||
tc.Mutex.RLock()
|
||||
conn := tc.conn
|
||||
tc.Mutex.RUnlock()
|
||||
|
||||
if conn == nil {
|
||||
return fmt.Errorf("connection closed")
|
||||
}
|
||||
|
||||
_, err := conn.Write(frame)
|
||||
if err != nil {
|
||||
debug.Log(debug.DEBUG_CRITICAL, "TCP interface write failed", "name", tc.Name, "error", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (tc *TCPClientInterface) readLoop() {
|
||||
buffer := make([]byte, tc.MTU)
|
||||
inFrame := false
|
||||
@@ -205,8 +239,6 @@ func (tc *TCPClientInterface) readLoop() {
|
||||
return
|
||||
}
|
||||
|
||||
tc.UpdateStats(uint64(n), true) // #nosec G115
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
b := buffer[i]
|
||||
|
||||
@@ -241,70 +273,13 @@ func (tc *TCPClientInterface) handlePacket(data []byte) {
|
||||
}
|
||||
|
||||
tc.Mutex.Lock()
|
||||
tc.RxBytes += uint64(len(data))
|
||||
lastRx := time.Now()
|
||||
tc.lastRx = lastRx
|
||||
callback := tc.packetCallback
|
||||
tc.Mutex.Unlock()
|
||||
|
||||
debug.Log(debug.DEBUG_ALL, "Received packet", "type", fmt.Sprintf("0x%02x", data[0]), "size", len(data))
|
||||
|
||||
// For RNS packets, call the packet callback directly
|
||||
if callback != nil {
|
||||
debug.Log(debug.DEBUG_ALL, "Calling packet callback for RNS packet")
|
||||
callback(data, tc)
|
||||
} else {
|
||||
debug.Log(debug.DEBUG_ALL, "No packet callback set for TCP interface")
|
||||
}
|
||||
}
|
||||
|
||||
// Send implements the interface Send method for TCP interface
|
||||
func (tc *TCPClientInterface) Send(data []byte, address string) error {
|
||||
debug.Log(debug.DEBUG_ALL, "TCP interface sending bytes", "name", tc.Name, "bytes", len(data))
|
||||
|
||||
if !tc.IsEnabled() || !tc.IsOnline() {
|
||||
return fmt.Errorf("TCP interface %s is not online", tc.Name)
|
||||
}
|
||||
|
||||
// Send data directly - packet type is already in the first byte of data
|
||||
// TCP interface uses HDLC framing around the raw packet
|
||||
return tc.ProcessOutgoing(data)
|
||||
}
|
||||
|
||||
func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error {
|
||||
tc.Mutex.RLock()
|
||||
online := tc.Online
|
||||
tc.Mutex.RUnlock()
|
||||
|
||||
if !online {
|
||||
return fmt.Errorf("interface offline")
|
||||
}
|
||||
|
||||
tc.writing = true
|
||||
defer func() { tc.writing = false }()
|
||||
|
||||
// For TCP connections, use HDLC framing
|
||||
var frame []byte
|
||||
frame = append([]byte{HDLC_FLAG}, escapeHDLC(data)...)
|
||||
frame = append(frame, HDLC_FLAG)
|
||||
|
||||
tc.UpdateStats(uint64(len(frame)), false) // #nosec G115
|
||||
|
||||
debug.Log(debug.DEBUG_ALL, "TCP interface writing to network", "name", tc.Name, "bytes", len(frame))
|
||||
|
||||
tc.Mutex.RLock()
|
||||
conn := tc.conn
|
||||
tc.Mutex.RUnlock()
|
||||
|
||||
if conn == nil {
|
||||
return fmt.Errorf("connection closed")
|
||||
}
|
||||
|
||||
_, err := conn.Write(frame)
|
||||
if err != nil {
|
||||
debug.Log(debug.DEBUG_CRITICAL, "TCP interface write failed", "name", tc.Name, "error", err)
|
||||
}
|
||||
return err
|
||||
tc.ProcessIncoming(data)
|
||||
}
|
||||
|
||||
func (tc *TCPClientInterface) teardown() {
|
||||
@@ -472,40 +447,6 @@ func (tc *TCPClientInterface) GetRTT() time.Duration {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (tc *TCPClientInterface) GetTxBytes() uint64 {
|
||||
tc.Mutex.RLock()
|
||||
defer tc.Mutex.RUnlock()
|
||||
return tc.TxBytes
|
||||
}
|
||||
|
||||
func (tc *TCPClientInterface) GetRxBytes() uint64 {
|
||||
tc.Mutex.RLock()
|
||||
defer tc.Mutex.RUnlock()
|
||||
return tc.RxBytes
|
||||
}
|
||||
|
||||
func (tc *TCPClientInterface) UpdateStats(bytes uint64, isRx bool) {
|
||||
tc.Mutex.Lock()
|
||||
defer tc.Mutex.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
if isRx {
|
||||
tc.RxBytes += bytes
|
||||
tc.lastRx = now
|
||||
debug.Log(debug.DEBUG_TRACE, "Interface RX stats", "name", tc.Name, "bytes", bytes, "total", tc.RxBytes, "last", tc.lastRx)
|
||||
} else {
|
||||
tc.TxBytes += bytes
|
||||
tc.lastTx = now
|
||||
debug.Log(debug.DEBUG_TRACE, "Interface TX stats", "name", tc.Name, "bytes", bytes, "total", tc.TxBytes, "last", tc.lastTx)
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *TCPClientInterface) GetStats() (tx uint64, rx uint64, lastTx time.Time, lastRx time.Time) {
|
||||
tc.Mutex.RLock()
|
||||
defer tc.Mutex.RUnlock()
|
||||
return tc.TxBytes, tc.RxBytes, tc.lastTx, tc.lastRx
|
||||
}
|
||||
|
||||
type TCPServerInterface struct {
|
||||
BaseInterface
|
||||
connections map[string]net.Conn
|
||||
@@ -686,18 +627,6 @@ func (ts *TCPServerInterface) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ts *TCPServerInterface) GetTxBytes() uint64 {
|
||||
ts.Mutex.RLock()
|
||||
defer ts.Mutex.RUnlock()
|
||||
return ts.TxBytes
|
||||
}
|
||||
|
||||
func (ts *TCPServerInterface) GetRxBytes() uint64 {
|
||||
ts.Mutex.RLock()
|
||||
defer ts.Mutex.RUnlock()
|
||||
return ts.RxBytes
|
||||
}
|
||||
|
||||
func (ts *TCPServerInterface) handleConnection(conn net.Conn) {
|
||||
addr := conn.RemoteAddr().String()
|
||||
ts.Mutex.Lock()
|
||||
@@ -728,14 +657,7 @@ func (ts *TCPServerInterface) handleConnection(conn net.Conn) {
|
||||
return
|
||||
}
|
||||
|
||||
ts.Mutex.Lock()
|
||||
ts.RxBytes += uint64(n) // #nosec G115
|
||||
callback := ts.packetCallback
|
||||
ts.Mutex.Unlock()
|
||||
|
||||
if callback != nil {
|
||||
callback(buffer[:n], ts)
|
||||
}
|
||||
ts.ProcessIncoming(buffer[:n])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -758,7 +680,6 @@ func (ts *TCPServerInterface) ProcessOutgoing(data []byte) error {
|
||||
}
|
||||
|
||||
ts.Mutex.Lock()
|
||||
ts.TxBytes += uint64(len(frame)) // #nosec G115
|
||||
conns := make([]net.Conn, 0, len(ts.connections))
|
||||
for _, conn := range ts.connections {
|
||||
conns = append(conns, conn)
|
||||
|
||||
Reference in New Issue
Block a user