feat: enhance TCP and UDP interfaces with improved timeout settings and MTU configuration

This commit is contained in:
2025-11-20 17:57:00 -06:00
parent 01e639133b
commit fdfe895d2d
2 changed files with 126 additions and 33 deletions

View File

@@ -5,6 +5,7 @@ import (
"net"
"runtime"
"sync"
"syscall"
"time"
"github.com/Sudo-Ivan/reticulum-go/pkg/common"
@@ -21,14 +22,22 @@ const (
KISS_TFEND = 0xDC
KISS_TFESC = 0xDD
TCP_USER_TIMEOUT = 24
TCP_PROBE_AFTER = 5
TCP_PROBE_INTERVAL = 2
TCP_PROBES = 12
RECONNECT_WAIT = 5
INITIAL_TIMEOUT = 5
INITIAL_BACKOFF = time.Second
MAX_BACKOFF = time.Minute * 5
DEFAULT_MTU = 1064
BITRATE_GUESS_VAL = 10 * 1000 * 1000
RECONNECT_WAIT = 5
INITIAL_TIMEOUT = 5
INITIAL_BACKOFF = time.Second
MAX_BACKOFF = time.Minute * 5
TCP_USER_TIMEOUT_SEC = 24
TCP_PROBE_AFTER_SEC = 5
TCP_PROBE_INTERVAL_SEC = 2
TCP_PROBES_COUNT = 12
I2P_USER_TIMEOUT_SEC = 45
I2P_PROBE_AFTER_SEC = 10
I2P_PROBE_INTERVAL_SEC = 9
I2P_PROBES_COUNT = 5
)
type TCPClientInterface struct {
@@ -62,7 +71,7 @@ func NewTCPClientInterface(name string, targetHost string, targetPort int, kissF
i2pTunneled: i2pTunneled,
initiator: true,
enabled: enabled,
maxReconnectTries: TCP_PROBES,
maxReconnectTries: RECONNECT_WAIT * TCP_PROBES_COUNT,
packetBuffer: make([]byte, 0),
neverConnected: true,
}
@@ -198,13 +207,9 @@ func (tc *TCPClientInterface) Send(data []byte, address string) error {
return fmt.Errorf("TCP interface %s is not online", tc.Name)
}
// For TCP interface, we need to prepend a packet type byte for announce packets
// RNS TCP protocol expects: [packet_type][data]
frame := make([]byte, 0, len(data)+1)
frame = append(frame, 0x01) // Announce packet type
frame = append(frame, data...)
return tc.ProcessOutgoing(frame)
// Send data directly - packet type is already in the first byte of data
// TCP interface uses HDLC framing around the raw packet
return tc.ProcessOutgoing(data)
}
func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error {
@@ -331,9 +336,7 @@ func (tc *TCPClientInterface) reconnect() {
return
}
// Log reconnection attempt
fmt.Printf("Failed to reconnect to %s (attempt %d/%d): %v\n",
net.JoinHostPort(tc.targetAddr, fmt.Sprintf("%d", tc.targetPort)), retries+1, tc.maxReconnectTries, err)
debug.Log(debug.DEBUG_VERBOSE, "Failed to reconnect", "target", net.JoinHostPort(tc.targetAddr, fmt.Sprintf("%d", tc.targetPort)), "attempt", retries+1, "maxTries", tc.maxReconnectTries, "error", err)
// Wait with exponential backoff
time.Sleep(backoff)
@@ -351,10 +354,8 @@ func (tc *TCPClientInterface) reconnect() {
tc.reconnecting = false
tc.mutex.Unlock()
// If we've exhausted all retries, perform final teardown
tc.teardown()
fmt.Printf("Failed to reconnect to %s after %d attempts\n",
net.JoinHostPort(tc.targetAddr, fmt.Sprintf("%d", tc.targetPort)), tc.maxReconnectTries)
debug.Log(debug.DEBUG_ERROR, "Failed to reconnect after all attempts", "target", net.JoinHostPort(tc.targetAddr, fmt.Sprintf("%d", tc.targetPort)), "maxTries", tc.maxReconnectTries)
}
func (tc *TCPClientInterface) Enable() {
@@ -440,15 +441,57 @@ func (tc *TCPClientInterface) setTimeoutsLinux() error {
return fmt.Errorf("not a TCP connection")
}
if !tc.i2pTunneled {
if err := tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err := tcpConn.SetKeepAlivePeriod(time.Duration(TCP_PROBE_INTERVAL) * time.Second); err != nil {
return err
}
rawConn, err := tcpConn.SyscallConn()
if err != nil {
return fmt.Errorf("failed to get raw connection: %v", err)
}
var sockoptErr error
err = rawConn.Control(func(fd uintptr) {
var userTimeout, probeAfter, probeInterval, probeCount int
if tc.i2pTunneled {
userTimeout = I2P_USER_TIMEOUT_SEC * 1000
probeAfter = I2P_PROBE_AFTER_SEC
probeInterval = I2P_PROBE_INTERVAL_SEC
probeCount = I2P_PROBES_COUNT
} else {
userTimeout = TCP_USER_TIMEOUT_SEC * 1000
probeAfter = TCP_PROBE_AFTER_SEC
probeInterval = TCP_PROBE_INTERVAL_SEC
probeCount = TCP_PROBES_COUNT
}
if err := syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, 18, userTimeout); err != nil {
debug.Log(debug.DEBUG_VERBOSE, "Failed to set TCP_USER_TIMEOUT", "error", err)
}
if err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1); err != nil {
sockoptErr = fmt.Errorf("failed to enable SO_KEEPALIVE: %v", err)
return
}
if err := syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, 4, probeAfter); err != nil {
debug.Log(debug.DEBUG_VERBOSE, "Failed to set TCP_KEEPIDLE", "error", err)
}
if err := syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, 5, probeInterval); err != nil {
debug.Log(debug.DEBUG_VERBOSE, "Failed to set TCP_KEEPINTVL", "error", err)
}
if err := syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, 6, probeCount); err != nil {
debug.Log(debug.DEBUG_VERBOSE, "Failed to set TCP_KEEPCNT", "error", err)
}
})
if err != nil {
return fmt.Errorf("control failed: %v", err)
}
if sockoptErr != nil {
return sockoptErr
}
debug.Log(debug.DEBUG_VERBOSE, "TCP keepalive configured (Linux)", "i2p", tc.i2pTunneled)
return nil
}
@@ -458,10 +501,40 @@ func (tc *TCPClientInterface) setTimeoutsOSX() error {
return fmt.Errorf("not a TCP connection")
}
if err := tcpConn.SetKeepAlive(true); err != nil {
return err
rawConn, err := tcpConn.SyscallConn()
if err != nil {
return fmt.Errorf("failed to get raw connection: %v", err)
}
var sockoptErr error
err = rawConn.Control(func(fd uintptr) {
const TCP_KEEPALIVE = 0x10
var probeAfter int
if tc.i2pTunneled {
probeAfter = I2P_PROBE_AFTER_SEC
} else {
probeAfter = TCP_PROBE_AFTER_SEC
}
if err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1); err != nil {
sockoptErr = fmt.Errorf("failed to enable SO_KEEPALIVE: %v", err)
return
}
if err := syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, TCP_KEEPALIVE, probeAfter); err != nil {
debug.Log(debug.DEBUG_VERBOSE, "Failed to set TCP_KEEPALIVE", "error", err)
}
})
if err != nil {
return fmt.Errorf("control failed: %v", err)
}
if sockoptErr != nil {
return sockoptErr
}
debug.Log(debug.DEBUG_VERBOSE, "TCP keepalive configured (OSX)", "i2p", tc.i2pTunneled)
return nil
}

View File

@@ -36,8 +36,11 @@ func NewUDPInterface(name string, addr string, target string, enabled bool) (*UD
BaseInterface: NewBaseInterface(name, common.IF_TYPE_UDP, enabled),
addr: udpAddr,
targetAddr: targetAddr,
readBuffer: make([]byte, common.DEFAULT_MTU),
readBuffer: make([]byte, 1064), // Python RNS uses 1064 bytes for UDP MTU
}
// Set MTU to match Python RNS
ui.MTU = 1064
return ui, nil
}
@@ -181,6 +184,19 @@ func (ui *UDPInterface) Start() error {
return err
}
ui.conn = conn
// Enable broadcast mode if we have a target address
// This matches Python RNS UDP interface behavior
if ui.targetAddr != nil {
// Get the raw connection file descriptor to set SO_BROADCAST
if err := conn.SetReadBuffer(1064); err != nil {
debug.Log(debug.DEBUG_ERROR, "Failed to set read buffer size", "error", err)
}
if err := conn.SetWriteBuffer(1064); err != nil {
debug.Log(debug.DEBUG_ERROR, "Failed to set write buffer size", "error", err)
}
}
ui.Online = true
// Start the read loop in a goroutine
@@ -190,7 +206,7 @@ func (ui *UDPInterface) Start() error {
}
func (ui *UDPInterface) readLoop() {
buffer := make([]byte, common.DEFAULT_MTU)
buffer := make([]byte, 1064) // Match Python RNS UDP MTU
for ui.IsOnline() && !ui.IsDetached() {
n, remoteAddr, err := ui.conn.ReadFromUDP(buffer)
if err != nil {
@@ -200,7 +216,11 @@ func (ui *UDPInterface) readLoop() {
return
}
// Update RX stats
ui.mutex.Lock()
ui.RxBytes += uint64(n)
// Auto-discover target address from first packet if not set
if ui.targetAddr == nil {
debug.Log(debug.DEBUG_ALL, "UDP interface discovered peer", "name", ui.Name, "peer", remoteAddr.String())
ui.targetAddr = remoteAddr