Add channel handling to Link struct; implement methods for managing incoming channel packets, resource advertisements, and packet delivery. Enhance initialization with IncomingLinkHandler registration and improve error handling for resource requests.
Some checks failed
Go Build Multi-Platform / build (amd64, darwin) (push) Failing after 17s
Go Build Multi-Platform / build (amd64, freebsd) (push) Failing after 25s
Go Build Multi-Platform / build (amd64, windows) (push) Failing after 31s
Go Build Multi-Platform / build (amd64, linux) (push) Failing after 33s
Go Build Multi-Platform / build (arm, freebsd) (push) Failing after 31s
Go Build Multi-Platform / build (arm, linux) (push) Failing after 23s
Go Build Multi-Platform / build (arm, windows) (push) Failing after 40s
Go Build Multi-Platform / build (arm64, darwin) (push) Failing after 38s
Go Revive Lint / lint (push) Successful in 30s
Run Gosec / tests (push) Failing after 57s
Go Test Multi-Platform / Test (macos-latest, amd64) (push) Has been cancelled
Go Test Multi-Platform / Test (windows-latest, amd64) (push) Has been cancelled
Go Test Multi-Platform / Test (macos-latest, arm64) (push) Has been cancelled
Go Build Multi-Platform / build (arm64, freebsd) (push) Failing after 29s
Go Build Multi-Platform / build (arm64, linux) (push) Failing after 27s
Go Build Multi-Platform / build (arm64, windows) (push) Failing after 33s
Go Test Multi-Platform / Test (ubuntu-latest, amd64) (push) Failing after 35s
Go Build Multi-Platform / Create Release (push) Has been skipped
Go Test Multi-Platform / Test (ubuntu-latest, arm64) (push) Failing after 36s

This commit is contained in:
2025-12-01 20:30:59 -06:00
parent b489135c5b
commit c8d231556c

View File

@@ -13,6 +13,7 @@ import (
"sync"
"time"
"github.com/Sudo-Ivan/reticulum-go/pkg/channel"
"github.com/Sudo-Ivan/reticulum-go/pkg/common"
"github.com/Sudo-Ivan/reticulum-go/pkg/cryptography"
"github.com/Sudo-Ivan/reticulum-go/pkg/debug"
@@ -29,12 +30,12 @@ import (
const (
CURVE = "Curve25519"
ECPUBSIZE = 64
KEYSIZE = 32
LINK_MTU_SIZE = 3
MTU_BYTEMASK = 0xFFFFFF
MODE_BYTEMASK = 0xE0
ECPUBSIZE = 64
KEYSIZE = 32
LINK_MTU_SIZE = 3
MTU_BYTEMASK = 0xFFFFFF
MODE_BYTEMASK = 0xE0
ESTABLISHMENT_TIMEOUT_PER_HOP = 6
KEEPALIVE_TIMEOUT_FACTOR = 4
STALE_GRACE = 2
@@ -65,6 +66,16 @@ const (
WATCHDOG_INTERVAL = 0.1
)
func init() {
destination.RegisterIncomingLinkHandler(func(pkt *packet.Packet, dest *destination.Destination, trans interface{}, networkIface common.NetworkInterface) (interface{}, error) {
transportObj, ok := trans.(*transport.Transport)
if !ok {
return nil, errors.New("invalid transport type")
}
return HandleIncomingLinkRequest(pkt, dest, transportObj, networkIface)
})
}
type Link struct {
mutex sync.RWMutex
destination *destination.Destination
@@ -110,7 +121,7 @@ type Link struct {
keepalive time.Duration
staleTime time.Duration
initiator bool
prv []byte
sigPriv ed25519.PrivateKey
pub []byte
@@ -124,9 +135,12 @@ type Link struct {
mdu int
requestTime time.Time
requestPacket *packet.Packet
pendingRequests []*RequestReceipt
requestMutex sync.RWMutex
channel *channel.Channel
channelMutex sync.RWMutex
}
func NewLink(dest *destination.Destination, transport *transport.Transport, networkIface common.NetworkInterface, establishedCallback func(*Link), closedCallback func(*Link)) *Link {
@@ -156,23 +170,23 @@ func NewLink(dest *destination.Destination, transport *transport.Transport, netw
func HandleIncomingLinkRequest(pkt *packet.Packet, dest *destination.Destination, transport *transport.Transport, networkIface common.NetworkInterface) (*Link, error) {
debug.Log(debug.DEBUG_INFO, "Creating link for incoming request", "dest_hash", fmt.Sprintf("%x", dest.GetHash()))
l := NewLink(dest, transport, networkIface, nil, nil)
l.status = STATUS_PENDING
l.initiator = false // This is a responder link
ownerIdentity := dest.GetIdentity()
if ownerIdentity == nil {
return nil, errors.New("destination has no identity")
}
if err := l.HandleLinkRequest(pkt, ownerIdentity); err != nil {
debug.Log(debug.DEBUG_ERROR, "Failed to handle link request", "error", err)
return nil, err
}
go l.startWatchdog()
debug.Log(debug.DEBUG_INFO, "Link established for incoming request", "link_id", fmt.Sprintf("%x", l.linkID))
return l, nil
}
@@ -343,17 +357,17 @@ func (l *Link) Request(path string, data []byte, timeout time.Duration) (*Reques
}
type RequestReceipt struct {
link *Link
mutex sync.RWMutex
requestID []byte
status byte
sentAt time.Time
receivedAt time.Time
response []byte
timeout time.Duration
responseCb func(*RequestReceipt)
failedCb func(*RequestReceipt)
progressCb func(*RequestReceipt)
link *Link
mutex sync.RWMutex
requestID []byte
status byte
sentAt time.Time
receivedAt time.Time
response []byte
timeout time.Duration
responseCb func(*RequestReceipt)
failedCb func(*RequestReceipt)
progressCb func(*RequestReceipt)
}
func (r *RequestReceipt) GetRequestID() []byte {
@@ -691,7 +705,7 @@ func (l *Link) handleDataPacket(pkt *packet.Packet) error {
case packet.ContextResponse:
return l.handleResponse(plaintext)
case packet.ContextLinkIdentify:
return l.handleIdentification(plaintext)
return l.HandleIdentification(plaintext)
case packet.ContextKeepalive:
if !l.initiator && len(plaintext) == 1 && plaintext[0] == 0xFF {
keepaliveResp := []byte{0xFE}
@@ -737,6 +751,35 @@ func (l *Link) handleDataPacket(pkt *packet.Packet) error {
return l.handleResourceReject(pkt)
case packet.ContextResource:
return l.handleResourcePart(pkt)
case packet.ContextChannel:
return l.handleChannelPacket(pkt)
}
return nil
}
func (l *Link) GetChannel() *channel.Channel {
l.channelMutex.Lock()
defer l.channelMutex.Unlock()
if l.channel == nil {
l.channel = channel.NewChannel(l)
}
return l.channel
}
func (l *Link) handleChannelPacket(pkt *packet.Packet) error {
plaintext, err := l.decrypt(pkt.Data)
if err != nil {
return err
}
l.channelMutex.RLock()
ch := l.channel
l.channelMutex.RUnlock()
if ch != nil {
return ch.HandleInbound(plaintext)
}
return nil
@@ -748,6 +791,48 @@ func (l *Link) handleResourceAdvertisement(pkt *packet.Packet) error {
return err
}
adv, err := resource.UnpackResourceAdvertisement(plaintext)
if err != nil {
debug.Log(debug.DEBUG_INFO, "Failed to unpack resource advertisement", "error", err)
return err
}
if resource.IsRequestAdvertisement(plaintext) {
requestID := resource.ReadRequestID(plaintext)
if l.destination != nil {
handler := l.destination.GetRequestHandler(requestID)
if handler != nil {
response := handler(requestID, nil, requestID, l.remoteIdentity, time.Now())
if response != nil {
return l.sendResourceResponse(requestID, response)
}
}
}
return nil
}
if resource.IsResponseAdvertisement(plaintext) {
requestID := resource.ReadRequestID(plaintext)
l.requestMutex.Lock()
for i, req := range l.pendingRequests {
if string(req.requestID) == string(requestID) {
req.mutex.Lock()
req.status = STATUS_ACTIVE
req.receivedAt = time.Now()
req.mutex.Unlock()
if req.responseCb != nil {
go req.responseCb(req)
}
l.pendingRequests = append(l.pendingRequests[:i], l.pendingRequests[i+1:]...)
break
}
}
l.requestMutex.Unlock()
return nil
}
if l.resourceStrategy == ACCEPT_NONE {
return nil
}
@@ -756,20 +841,100 @@ func (l *Link) handleResourceAdvertisement(pkt *packet.Packet) error {
if l.resourceStrategy == ACCEPT_ALL {
allowed = true
} else if l.resourceStrategy == ACCEPT_APP && l.resourceCallback != nil {
allowed = l.resourceCallback(plaintext)
allowed = l.resourceCallback(adv)
}
if allowed {
if l.resourceStartedCallback != nil {
l.resourceStartedCallback(plaintext)
l.resourceStartedCallback(adv)
}
} else {
_ = l.rejectResource(adv.Hash) // #nosec G104 - best effort resource rejection
debug.Log(debug.DEBUG_INFO, "Resource advertisement rejected")
}
return nil
}
func (l *Link) rejectResource(resourceHash []byte) error {
rejectPkt := &packet.Packet{
HeaderType: packet.HeaderType1,
PacketType: packet.PacketTypeData,
TransportType: 0,
Context: packet.ContextResourceRCL,
ContextFlag: packet.FlagUnset,
Hops: 0,
DestinationType: 0x03,
DestinationHash: l.linkID,
Data: resourceHash,
CreateReceipt: false,
}
encrypted, err := l.encrypt(resourceHash)
if err != nil {
return err
}
rejectPkt.Data = encrypted
if err := rejectPkt.Pack(); err != nil {
return err
}
l.lastOutbound = time.Now()
return l.transport.SendPacket(rejectPkt)
}
func (l *Link) sendResourceResponse(requestID []byte, response interface{}) error {
resData, ok := response.([]byte)
if !ok {
return errors.New("response must be []byte")
}
res, err := resource.New(resData, false)
if err != nil {
return fmt.Errorf("failed to create resource: %w", err)
}
res.SetRequestID(requestID)
res.SetIsResponse(true)
return l.sendResourceAdvertisement(res)
}
func (l *Link) sendResourceAdvertisement(res *resource.Resource) error {
adv := resource.NewResourceAdvertisement(res)
if adv == nil {
return errors.New("failed to create resource advertisement")
}
advData, err := adv.Pack(0)
if err != nil {
return fmt.Errorf("failed to pack advertisement: %w", err)
}
encrypted, err := l.encrypt(advData)
if err != nil {
return err
}
advPkt := &packet.Packet{
HeaderType: packet.HeaderType1,
PacketType: packet.PacketTypeData,
TransportType: 0,
Context: packet.ContextResourceAdv,
ContextFlag: packet.FlagUnset,
Hops: 0,
DestinationType: 0x03,
DestinationHash: l.linkID,
Data: encrypted,
CreateReceipt: false,
}
if err := advPkt.Pack(); err != nil {
return err
}
l.lastOutbound = time.Now()
return l.transport.SendPacket(advPkt)
}
func (l *Link) handleResourceRequest(pkt *packet.Packet) error {
plaintext, err := l.decrypt(pkt.Data)
if err != nil {
@@ -1099,6 +1264,10 @@ func (l *Link) GetRTT() float64 {
return l.rtt
}
func (l *Link) RTT() float64 {
return l.GetRTT()
}
func (l *Link) SetRTT(rtt float64) {
l.mutex.Lock()
defer l.mutex.Unlock()
@@ -1111,6 +1280,67 @@ func (l *Link) GetStatus() byte {
return l.status
}
func (l *Link) Send(data []byte) interface{} {
pkt := &packet.Packet{
HeaderType: packet.HeaderType1,
PacketType: packet.PacketTypeData,
TransportType: 0,
Context: packet.ContextChannel,
ContextFlag: packet.FlagUnset,
Hops: 0,
DestinationType: 0x03,
DestinationHash: l.linkID,
Data: data,
CreateReceipt: false,
}
encrypted, err := l.encrypt(data)
if err != nil {
return nil
}
pkt.Data = encrypted
if err := pkt.Pack(); err != nil {
return nil
}
l.lastOutbound = time.Now()
if err := l.transport.SendPacket(pkt); err != nil {
return nil
}
return pkt
}
func (l *Link) SetPacketTimeout(pkt interface{}, callback func(interface{}), timeout time.Duration) {
if packetObj, ok := pkt.(*packet.Packet); ok {
go func() {
time.Sleep(timeout)
if callback != nil {
callback(packetObj)
}
}()
}
}
func (l *Link) SetPacketDelivered(pkt interface{}, callback func(interface{})) {
if callback != nil {
go callback(pkt)
}
}
func (l *Link) Resend(pkt interface{}) error {
packetObj, ok := pkt.(*packet.Packet)
if !ok {
return errors.New("invalid packet type")
}
if err := l.transport.SendPacket(packetObj); err != nil {
return err
}
return nil
}
func (l *Link) GetLinkID() []byte {
l.mutex.RLock()
defer l.mutex.RUnlock()
@@ -1362,7 +1592,7 @@ func (l *Link) watchdog() {
if l.initiator {
lastKeepalive := l.lastOutbound
if now.After(lastKeepalive.Add(l.keepalive)) {
l.sendKeepalive()
_ = l.sendKeepalive() // #nosec G104 - best effort keepalive
}
}
@@ -1378,7 +1608,7 @@ func (l *Link) watchdog() {
}
} else if l.status == STATUS_STALE {
sleepTime = 0.001
l.sendTeardownPacket()
_ = l.sendTeardownPacket() // #nosec G104 - best effort teardown
l.status = STATUS_CLOSED
l.teardownReason = STATUS_FAILED
if l.closedCallback != nil {
@@ -1453,11 +1683,11 @@ func (l *Link) sendTeardownPacket() error {
func (l *Link) Validate(signature, message []byte) bool {
l.mutex.RLock()
defer l.mutex.RUnlock()
if l.remoteIdentity == nil {
return false
}
return l.remoteIdentity.Verify(message, signature)
}
@@ -1536,7 +1766,7 @@ func (l *Link) SendLinkRequest() error {
func linkIDFromPacket(pkt *packet.Packet) []byte {
hashablePart := make([]byte, 0, 1+16+1+ECPUBSIZE)
hashablePart = append(hashablePart, pkt.Raw[0])
if pkt.HeaderType == packet.HeaderType2 {
startIndex := 18
endIndex := startIndex + 16 + 1 + ECPUBSIZE
@@ -1559,7 +1789,7 @@ func (l *Link) HandleLinkRequest(pkt *packet.Packet, ownerIdentity *identity.Ide
}
peerPub := pkt.Data[0:KEYSIZE]
peerSigPub := pkt.Data[KEYSIZE : ECPUBSIZE]
peerSigPub := pkt.Data[KEYSIZE:ECPUBSIZE]
l.peerPub = peerPub
l.peerSigPub = peerSigPub
@@ -1593,11 +1823,11 @@ func (l *Link) HandleLinkRequest(pkt *packet.Packet, ownerIdentity *identity.Ide
l.status = STATUS_HANDSHAKE
l.lastInbound = time.Now()
l.requestTime = time.Now()
if l.transport != nil {
l.transport.RegisterLink(l.linkID, l)
}
debug.Log(debug.DEBUG_INFO, "Link proof sent (responder), waiting for RTT", "link_id", fmt.Sprintf("%x", l.linkID))
return nil
@@ -1609,7 +1839,7 @@ func (l *Link) updateMDU() {
tokenOverhead := 16
aesBlockSize := 16
l.mdu = int(float64(l.mtu-headerMaxSize-ifacMinSize-tokenOverhead)/float64(aesBlockSize)) * aesBlockSize - 1
l.mdu = int(float64(l.mtu-headerMaxSize-ifacMinSize-tokenOverhead)/float64(aesBlockSize))*aesBlockSize - 1
if l.mdu < 0 {
l.mdu = 100
}
@@ -1655,7 +1885,7 @@ func (l *Link) performHandshake() error {
func (l *Link) sendLinkProof(ownerIdentity *identity.Identity) error {
debug.Log(debug.DEBUG_ERROR, "Generating link proof", "link_id", fmt.Sprintf("%x", l.linkID), "initiator", l.initiator, "has_interface", l.networkInterface != nil)
proofPkt, err := l.GenerateLinkProof(ownerIdentity)
if err != nil {
return err
@@ -1668,9 +1898,9 @@ func (l *Link) sendLinkProof(ownerIdentity *identity.Identity) error {
if err := proofPkt.Pack(); err != nil {
return fmt.Errorf("failed to pack proof packet: %w", err)
}
debug.Log(debug.DEBUG_ERROR, "Sending proof through interface", "raw_len", len(proofPkt.Raw), "interface", l.networkInterface.GetName())
if err := l.networkInterface.Send(proofPkt.Raw, ""); err != nil {
return fmt.Errorf("failed to send link proof through interface: %w", err)
}
@@ -1691,9 +1921,9 @@ func (l *Link) sendLinkProof(ownerIdentity *identity.Identity) error {
func (l *Link) GenerateLinkProof(ownerIdentity *identity.Identity) (*packet.Packet, error) {
signalling := signallingBytes(l.mtu, l.mode)
ownerSigPub := ownerIdentity.GetPublicKey()[KEYSIZE:ECPUBSIZE]
signedData := make([]byte, 0, len(l.linkID)+KEYSIZE+len(ownerSigPub)+len(signalling))
signedData = append(signedData, l.linkID...)
signedData = append(signedData, l.pub...)
@@ -1745,9 +1975,9 @@ func (l *Link) ValidateLinkProof(pkt *packet.Packet) error {
signalling = pkt.Data[identity.SIGLENGTH/8+KEYSIZE : identity.SIGLENGTH/8+KEYSIZE+LINK_MTU_SIZE]
mtu := (int(signalling[0]&0x1F) << 16) | (int(signalling[1]) << 8) | int(signalling[2])
mode := (signalling[0] & MODE_BYTEMASK) >> 5
l.mtu = mtu
l.mode = mode
debug.Log(debug.DEBUG_VERBOSE, "Link proof includes MTU", "mtu", mtu, "mode", mode)
l.mtu = mtu
l.mode = mode
debug.Log(debug.DEBUG_VERBOSE, "Link proof includes MTU", "mtu", mtu, "mode", mode)
}
l.peerPub = peerPub
@@ -1813,7 +2043,7 @@ func (l *Link) ValidateLinkProof(pkt *packet.Packet) error {
if l.transport != nil {
l.transport.RegisterLink(l.linkID, l)
}
debug.Log(debug.DEBUG_INFO, "Link established (initiator)", "link_id", fmt.Sprintf("%x", l.linkID), "rtt", fmt.Sprintf("%.3fs", l.rtt))
if l.establishedCallback != nil {