This commit is contained in:
Sudo-Ivan
2024-12-30 03:50:52 -06:00
parent 0f5f5cbb13
commit decbd8f29a
16 changed files with 1046 additions and 970 deletions

View File

@@ -3,11 +3,13 @@ package transport
import (
"encoding/binary"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/Sudo-Ivan/reticulum-go/pkg/common"
"github.com/Sudo-Ivan/reticulum-go/pkg/packet"
)
var (
@@ -19,18 +21,18 @@ const (
PathfinderM = 128 // Maximum number of hops
PathRequestTTL = 300 // Time to live for path requests in seconds
AnnounceTimeout = 15 // Timeout for announce responses in seconds
// Link constants
EstablishmentTimeoutPerHop = 6 // Timeout for link establishment in seconds per hop
KeepaliveTimeoutFactor = 4 // RTT timeout factor for link timeout calculation
StaleGrace = 2 // Grace period in seconds for link timeout
Keepalive = 360 // Interval for sending keep-alive packets in seconds
StaleTime = 720 // Time after which link is considered stale
EstablishmentTimeoutPerHop = 6 // Timeout for link establishment in seconds per hop
KeepaliveTimeoutFactor = 4 // RTT timeout factor for link timeout calculation
StaleGrace = 2 // Grace period in seconds for link timeout
Keepalive = 360 // Interval for sending keep-alive packets in seconds
StaleTime = 720 // Time after which link is considered stale
// Direction constants
OUT = 0x02
IN = 0x01
OUT = 0x02
IN = 0x01
// Destination type constants
SINGLE = 0x00
GROUP = 0x01
@@ -120,9 +122,9 @@ type Link struct {
lastData time.Time
rtt time.Duration
establishedCb func()
closedCb func()
packetCb func([]byte)
resourceCb func(interface{}) bool
closedCb func()
packetCb func([]byte)
resourceCb func(interface{}) bool
resourceStrategy int
}
@@ -142,7 +144,7 @@ func NewLink(dest []byte, establishedCallback func(), closedCallback func()) *Li
lastOutbound: time.Now(),
lastData: time.Now(),
establishedCb: establishedCallback,
closedCb: closedCallback,
closedCb: closedCallback,
}
}
@@ -189,17 +191,17 @@ func (l *Link) Teardown() {
func (l *Link) Send(data []byte) error {
l.lastOutbound = time.Now()
l.lastData = time.Now()
packet := &LinkPacket{
Destination: l.destination,
Data: data,
Timestamp: time.Now(),
Data: data,
Timestamp: time.Now(),
}
if l.rtt == 0 {
l.rtt = l.InactiveFor()
}
return packet.send()
}
@@ -227,93 +229,93 @@ func (t *Transport) DeregisterAnnounceHandler(handler AnnounceHandler) {
func (t *Transport) HasPath(destinationHash []byte) bool {
t.pathLock.RLock()
defer t.pathLock.RUnlock()
path, exists := t.paths[string(destinationHash)]
if !exists {
return false
}
// Check if path is still valid (not expired)
if time.Since(path.LastUpdated) > time.Duration(PathRequestTTL)*time.Second {
delete(t.paths, string(destinationHash))
return false
}
return true
}
func (t *Transport) HopsTo(destinationHash []byte) uint8 {
t.pathLock.RLock()
defer t.pathLock.RUnlock()
path, exists := t.paths[string(destinationHash)]
if !exists {
return PathfinderM
}
return path.Hops
}
func (t *Transport) NextHop(destinationHash []byte) []byte {
t.pathLock.RLock()
defer t.pathLock.RUnlock()
path, exists := t.paths[string(destinationHash)]
if !exists {
return nil
}
return path.NextHop
}
func (t *Transport) NextHopInterface(destinationHash []byte) string {
t.pathLock.RLock()
defer t.pathLock.RUnlock()
path, exists := t.paths[string(destinationHash)]
if !exists {
return ""
}
return path.Interface.GetName()
}
func (t *Transport) RequestPath(destinationHash []byte, onInterface string, tag []byte, recursive bool) error {
packet := &PathRequest{
DestinationHash: destinationHash,
Tag: tag,
TTL: PathRequestTTL,
Recursive: recursive,
Tag: tag,
TTL: PathRequestTTL,
Recursive: recursive,
}
if onInterface != "" {
return t.sendPathRequest(packet, onInterface)
}
return t.broadcastPathRequest(packet)
}
func (t *Transport) UpdatePath(destinationHash []byte, nextHop []byte, interfaceName string, hops uint8) {
t.pathLock.Lock()
defer t.pathLock.Unlock()
iface, err := t.GetInterface(interfaceName)
if err != nil {
return
}
t.paths[string(destinationHash)] = &common.Path{
Interface: iface,
NextHop: nextHop,
Hops: hops,
LastUpdated: time.Now(),
Interface: iface,
NextHop: nextHop,
Hops: hops,
LastUpdated: time.Now(),
}
}
func (t *Transport) HandleAnnounce(destinationHash []byte, identity []byte, appData []byte) {
t.handlerLock.RLock()
defer t.handlerLock.RUnlock()
for _, handler := range t.announceHandlers {
handler.ReceivedAnnounce(destinationHash, identity, appData)
}
@@ -335,9 +337,9 @@ func (t *Transport) NewLink(dest []byte, establishedCallback func(), closedCallb
type PathRequest struct {
DestinationHash []byte
Tag []byte
TTL int
Recursive bool
Tag []byte
TTL int
Recursive bool
}
type LinkPacket struct {
@@ -354,7 +356,7 @@ func (p *LinkPacket) send() error {
header := make([]byte, 0, 64)
header = append(header, 0x02) // Link packet type
header = append(header, p.Destination...)
// Add timestamp
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, uint64(p.Timestamp.Unix()))
@@ -417,7 +419,7 @@ func (t *Transport) broadcastPathRequest(req *PathRequest) error {
if !iface.IsEnabled() {
continue
}
if err := t.sendPathRequest(req, iface.GetName()); err != nil {
lastErr = err
}
@@ -434,11 +436,11 @@ type PathRequestPacket struct {
}
type NetworkInterface struct {
Name string
Addr *net.UDPAddr
Conn *net.UDPConn
MTU int
Enabled bool
Name string
Addr *net.UDPAddr
Conn *net.UDPConn
MTU int
Enabled bool
}
func SendAnnounce(packet []byte) error {
@@ -446,7 +448,7 @@ func SendAnnounce(packet []byte) error {
if t == nil {
return errors.New("transport not initialized")
}
// Send announce packet to all interfaces
var lastErr error
for _, iface := range t.interfaces {
@@ -454,7 +456,7 @@ func SendAnnounce(packet []byte) error {
lastErr = err
}
}
return lastErr
}
@@ -487,7 +489,7 @@ func (t *Transport) handlePathRequest(data []byte, iface interface{}) {
recursive := false
if len(data) > 33 {
tag = data[33:len(data)-1]
tag = data[33 : len(data)-1]
recursive = data[len(data)-1] == 0x01
}
@@ -496,7 +498,7 @@ func (t *Transport) handlePathRequest(data []byte, iface interface{}) {
// Create and send path response
hops := t.HopsTo(destHash)
nextHop := t.NextHop(destHash)
response := make([]byte, 0, 64)
response = append(response, 0x03) // Path Response type
response = append(response, destHash...)
@@ -514,7 +516,7 @@ func (t *Transport) handlePathRequest(data []byte, iface interface{}) {
newData := make([]byte, len(data))
copy(newData, data)
newData[32] = ttl - 1 // Decrease TTL
for name, otherIface := range t.interfaces {
if name != iface.(common.NetworkInterface).GetName() && otherIface.IsEnabled() {
otherIface.Send(newData, "")
@@ -536,7 +538,7 @@ func (t *Transport) handleLinkPacket(data []byte, iface interface{}) {
if t.HasPath(dest) {
nextHop := t.NextHop(dest)
nextIface := t.NextHopInterface(dest)
if iface, ok := t.interfaces[nextIface]; ok {
iface.Send(data, string(nextHop))
}
@@ -598,8 +600,33 @@ func (t *Transport) handleAnnouncePacket(data []byte, iface interface{}) {
func (t *Transport) findLink(dest []byte) *Link {
t.mutex.RLock()
defer t.mutex.RUnlock()
// This is a simplified version - you might want to maintain a map of active links
// in the Transport struct for better performance
return nil
}
}
func (t *Transport) SendPacket(p *packet.Packet) error {
t.mutex.RLock()
defer t.mutex.RUnlock()
// Serialize packet
data, err := p.Serialize()
if err != nil {
return fmt.Errorf("failed to serialize packet: %w", err)
}
// Find appropriate interface
destHash := p.Addresses[:packet.AddressSize]
path, exists := t.paths[string(destHash)]
if !exists {
return errors.New("no path to destination")
}
// Send through interface
if err := path.Interface.Send(data, ""); err != nil {
return fmt.Errorf("failed to send packet: %w", err)
}
return nil
}