init v0.1.0

This commit is contained in:
Sudo-Ivan
2024-12-30 01:56:25 -06:00
commit 668d7c56b5
27 changed files with 3921 additions and 0 deletions

166
pkg/announce/announce.go Normal file
View File

@@ -0,0 +1,166 @@
package announce
import (
"crypto/sha256"
"encoding/binary"
"errors"
"sync"
"time"
"github.com/Sudo-Ivan/reticulum-go/pkg/identity"
"github.com/Sudo-Ivan/reticulum-go/pkg/transport"
)
const (
ANNOUNCE_NONE = 0x00
ANNOUNCE_PATH = 0x01
ANNOUNCE_IDENTITY = 0x02
MAX_HOPS = 128
PROPAGATION_RATE = 0.02 // 2% of interface bandwidth
RETRY_INTERVAL = 300 // 5 minutes
MAX_RETRIES = 3
)
type AnnounceHandler interface {
AspectFilter() []string
ReceivedAnnounce(destinationHash []byte, announcedIdentity *identity.Identity, appData []byte) error
ReceivePathResponses() bool
}
type Announce struct {
mutex sync.RWMutex
destinationHash []byte
identity *identity.Identity
appData []byte
hops uint8
timestamp int64
signature []byte
pathResponse bool
retries int
handlers []AnnounceHandler
}
func New(dest *identity.Identity, appData []byte, pathResponse bool) (*Announce, error) {
a := &Announce{
identity: dest,
appData: appData,
hops: 0,
timestamp: time.Now().Unix(),
pathResponse: pathResponse,
retries: 0,
handlers: make([]AnnounceHandler, 0),
}
// Generate destination hash
hash := sha256.New()
hash.Write(dest.GetPublicKey())
a.destinationHash = hash.Sum(nil)[:16] // Truncated hash
// Sign the announce
signData := append(a.destinationHash, a.appData...)
a.signature = dest.Sign(signData)
return a, nil
}
func (a *Announce) Propagate(interfaces []transport.Interface) error {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.hops >= MAX_HOPS {
return errors.New("maximum hop count reached")
}
// Increment hop count
a.hops++
// Create announce packet
packet := make([]byte, 0)
packet = append(packet, a.destinationHash...)
packet = append(packet, a.identity.GetPublicKey()...)
packet = append(packet, byte(a.hops))
if a.appData != nil {
packet = append(packet, a.appData...)
}
packet = append(packet, a.signature...)
// Propagate to all interfaces
for _, iface := range interfaces {
if err := iface.SendAnnounce(packet, a.pathResponse); err != nil {
return err
}
}
return nil
}
func (a *Announce) RegisterHandler(handler AnnounceHandler) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.handlers = append(a.handlers, handler)
}
func (a *Announce) DeregisterHandler(handler AnnounceHandler) {
a.mutex.Lock()
defer a.mutex.Unlock()
for i, h := range a.handlers {
if h == handler {
a.handlers = append(a.handlers[:i], a.handlers[i+1:]...)
break
}
}
}
func (a *Announce) HandleAnnounce(data []byte) error {
a.mutex.Lock()
defer a.mutex.Unlock()
// Validate announce data
if len(data) < 16+32+1 { // Min size: hash + pubkey + hops
return errors.New("invalid announce data")
}
// Extract fields
destHash := data[:16]
pubKey := data[16:48]
hops := data[48]
appData := data[49 : len(data)-64]
signature := data[len(data)-64:]
// Verify signature
signData := append(destHash, appData...)
if !a.identity.Verify(signData, signature) {
return errors.New("invalid announce signature")
}
// Process announce with registered handlers
for _, handler := range a.handlers {
if handler.ReceivePathResponses() || !a.pathResponse {
if err := handler.ReceivedAnnounce(destHash, a.identity, appData); err != nil {
return err
}
}
}
return nil
}
func (a *Announce) RequestPath(destHash []byte, onInterface transport.Interface) error {
a.mutex.Lock()
defer a.mutex.Unlock()
// Create path request packet
packet := make([]byte, 0)
packet = append(packet, destHash...)
packet = append(packet, byte(0)) // Initial hop count
// Send path request
if err := onInterface.SendPathRequest(packet); err != nil {
return err
}
return nil
}

29
pkg/common/config.go Normal file
View File

@@ -0,0 +1,29 @@
package common
// ConfigProvider interface for accessing configuration
type ConfigProvider interface {
GetConfigPath() string
GetLogLevel() int
GetInterfaces() map[string]InterfaceConfig
}
// InterfaceConfig represents interface configuration
type InterfaceConfig struct {
Type string `toml:"type"`
Enabled bool `toml:"enabled"`
TargetHost string `toml:"target_host,omitempty"`
TargetPort int `toml:"target_port,omitempty"`
Interface string `toml:"interface,omitempty"`
}
// ReticulumConfig represents the main configuration structure
type ReticulumConfig struct {
EnableTransport bool `toml:"enable_transport"`
ShareInstance bool `toml:"share_instance"`
SharedInstancePort int `toml:"shared_instance_port"`
InstanceControlPort int `toml:"instance_control_port"`
PanicOnInterfaceErr bool `toml:"panic_on_interface_error"`
LogLevel int `toml:"loglevel"`
ConfigPath string `toml:"-"`
Interfaces map[string]InterfaceConfig
}

28
pkg/common/constants.go Normal file
View File

@@ -0,0 +1,28 @@
package common
const (
// Interface Types
IF_TYPE_UDP InterfaceType = iota
IF_TYPE_TCP
IF_TYPE_UNIX
// Interface Modes
IF_MODE_FULL InterfaceMode = iota
IF_MODE_POINT
IF_MODE_GATEWAY
// Transport Modes
TRANSPORT_MODE_DIRECT TransportMode = iota
TRANSPORT_MODE_RELAY
TRANSPORT_MODE_GATEWAY
// Path Status
PATH_STATUS_UNKNOWN PathStatus = iota
PATH_STATUS_DIRECT
PATH_STATUS_RELAY
PATH_STATUS_FAILED
// Common Constants
DEFAULT_MTU = 1500
MAX_PACKET_SIZE = 65535
)

57
pkg/common/interfaces.go Normal file
View File

@@ -0,0 +1,57 @@
package common
import (
"net"
"sync"
"time"
)
// NetworkInterface combines both low-level and high-level interface requirements
type NetworkInterface interface {
// Low-level network operations
Start() error
Stop() error
Send(data []byte, address string) error
Receive() ([]byte, string, error)
GetType() InterfaceType
GetMode() InterfaceMode
GetMTU() int
// High-level packet operations
ProcessIncoming([]byte)
ProcessOutgoing([]byte) error
SendPathRequest([]byte) error
SendLinkPacket([]byte, []byte, time.Time) error
Detach()
SetPacketCallback(PacketCallback)
// Additional required fields
GetName() string
GetConn() net.Conn
IsEnabled() bool
}
type PacketCallback func([]byte, interface{})
// BaseInterface provides common implementation
type BaseInterface struct {
Name string
Mode InterfaceMode
Type InterfaceType
Online bool
Detached bool
IN bool
OUT bool
MTU int
Bitrate int64
TxBytes uint64
RxBytes uint64
mutex sync.RWMutex
owner interface{}
packetCallback PacketCallback
}

36
pkg/common/types.go Normal file
View File

@@ -0,0 +1,36 @@
package common
import (
"time"
)
// Interface related types
type InterfaceMode byte
type InterfaceType byte
// Transport related types
type TransportMode byte
type PathStatus byte
// Common structs
type Path struct {
Interface NetworkInterface
Address string
Status PathStatus
LastSeen time.Time
NextHop []byte
Hops uint8
LastUpdated time.Time
}
// Common callbacks
type ProofRequestedCallback func(interface{}) bool
type LinkEstablishedCallback func(interface{})
// Request handler
type RequestHandler struct {
Path string
ResponseGenerator func(path string, data []byte, requestID []byte, linkID []byte, remoteIdentity interface{}, requestedAt int64) []byte
AllowMode byte
AllowedList [][]byte
}

49
pkg/config/config.go Normal file
View File

@@ -0,0 +1,49 @@
package config
import (
"io/ioutil"
"gopkg.in/yaml.v3"
)
type Config struct {
Identity struct {
Name string `yaml:"name"`
StoragePath string `yaml:"storage_path"`
} `yaml:"identity"`
Interfaces []struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Enabled bool `yaml:"enabled"`
ListenPort int `yaml:"listen_port"`
ListenIP string `yaml:"listen_ip"`
KissFraming bool `yaml:"kiss_framing"`
I2PTunneled bool `yaml:"i2p_tunneled"`
} `yaml:"interfaces"`
Transport struct {
AnnounceInterval int `yaml:"announce_interval"`
PathRequestTimeout int `yaml:"path_request_timeout"`
MaxHops int `yaml:"max_hops"`
BitrateLimit int64 `yaml:"bitrate_limit"`
} `yaml:"transport"`
Logging struct {
Level string `yaml:"level"`
File string `yaml:"file"`
} `yaml:"logging"`
}
func LoadConfig(path string) (*Config, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
var cfg Config
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}

View File

@@ -0,0 +1,350 @@
package destination
import (
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"sync"
"github.com/Sudo-Ivan/reticulum-go/pkg/common"
"github.com/Sudo-Ivan/reticulum-go/pkg/identity"
"github.com/Sudo-Ivan/reticulum-go/pkg/transport"
)
const (
IN = 0x01
OUT = 0x02
SINGLE = 0x00
GROUP = 0x01
PLAIN = 0x02
PROVE_NONE = 0x00
PROVE_ALL = 0x01
PROVE_APP = 0x02
ALLOW_NONE = 0x00
ALLOW_ALL = 0x01
ALLOW_LIST = 0x02
RATCHET_COUNT = 512 // Default number of retained ratchet keys
RATCHET_INTERVAL = 1800 // Minimum interval between ratchet rotations in seconds
)
type PacketCallback = common.PacketCallback
type ProofRequestedCallback = common.ProofRequestedCallback
type LinkEstablishedCallback = common.LinkEstablishedCallback
type RequestHandler struct {
Path string
ResponseGenerator func(path string, data []byte, requestID []byte, linkID []byte, remoteIdentity *identity.Identity, requestedAt int64) []byte
AllowMode byte
AllowedList [][]byte
}
type Destination struct {
identity *identity.Identity
direction byte
destType byte
appName string
aspects []string
hash []byte
acceptsLinks bool
proofStrategy byte
packetCallback PacketCallback
proofCallback ProofRequestedCallback
linkCallback LinkEstablishedCallback
ratchetsEnabled bool
ratchetPath string
ratchetCount int
ratchetInterval int
enforceRatchets bool
defaultAppData []byte
mutex sync.RWMutex
requestHandlers map[string]*RequestHandler
callbacks struct {
packetReceived common.PacketCallback
proofRequested common.ProofRequestedCallback
linkEstablished common.LinkEstablishedCallback
}
}
func New(id *identity.Identity, direction byte, destType byte, appName string, aspects ...string) (*Destination, error) {
if id == nil {
return nil, errors.New("identity cannot be nil")
}
d := &Destination{
identity: id,
direction: direction,
destType: destType,
appName: appName,
aspects: aspects,
acceptsLinks: false,
proofStrategy: PROVE_NONE,
ratchetCount: RATCHET_COUNT,
ratchetInterval: RATCHET_INTERVAL,
requestHandlers: make(map[string]*RequestHandler),
}
// Generate destination hash
d.hash = d.Hash()
return d, nil
}
func (d *Destination) Hash() []byte {
nameHash := sha256.Sum256([]byte(d.ExpandName()))
identityHash := sha256.Sum256(d.identity.GetPublicKey())
combined := append(nameHash[:], identityHash[:]...)
finalHash := sha256.Sum256(combined)
return finalHash[:16] // Truncated to 128 bits
}
func (d *Destination) ExpandName() string {
name := d.appName
for _, aspect := range d.aspects {
name += "." + aspect
}
return name
}
func (d *Destination) Announce(appData []byte) error {
d.mutex.Lock()
defer d.mutex.Unlock()
// If no specific appData provided, use default
if appData == nil {
appData = d.defaultAppData
}
// Create announce packet
packet := make([]byte, 0)
// Add destination hash
packet = append(packet, d.hash...)
// Add identity public key
packet = append(packet, d.identity.GetPublicKey()...)
// Add flags byte
flags := byte(0)
if d.acceptsLinks {
flags |= 0x01
}
if d.ratchetsEnabled {
flags |= 0x02
}
packet = append(packet, flags)
// Add proof strategy
packet = append(packet, d.proofStrategy)
// Add app data length and data if present
if appData != nil {
appDataLen := uint16(len(appData))
lenBytes := make([]byte, 2)
binary.BigEndian.PutUint16(lenBytes, appDataLen)
packet = append(packet, lenBytes...)
packet = append(packet, appData...)
} else {
// No app data
packet = append(packet, 0x00, 0x00)
}
// Add ratchet data if enabled
if d.ratchetsEnabled {
// Add ratchet interval
intervalBytes := make([]byte, 4)
binary.BigEndian.PutUint32(intervalBytes, uint32(d.ratchetInterval))
packet = append(packet, intervalBytes...)
// Add current ratchet key
ratchetKey := d.identity.GetCurrentRatchetKey()
if ratchetKey == nil {
return errors.New("failed to get current ratchet key")
}
packet = append(packet, ratchetKey...)
}
// Sign the announce packet
signature, err := d.Sign(packet)
if err != nil {
return fmt.Errorf("failed to sign announce packet: %w", err)
}
packet = append(packet, signature...)
// Send announce packet through transport layer
// This will need to be implemented in the transport package
return transport.SendAnnounce(packet)
}
func (d *Destination) AcceptsLinks(accepts bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.acceptsLinks = accepts
}
func (d *Destination) SetLinkEstablishedCallback(callback common.LinkEstablishedCallback) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.linkCallback = callback
}
func (d *Destination) SetPacketCallback(callback common.PacketCallback) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.packetCallback = callback
}
func (d *Destination) SetProofRequestedCallback(callback common.ProofRequestedCallback) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.proofCallback = callback
}
func (d *Destination) SetProofStrategy(strategy byte) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.proofStrategy = strategy
}
func (d *Destination) EnableRatchets(path string) bool {
d.mutex.Lock()
defer d.mutex.Unlock()
d.ratchetsEnabled = true
d.ratchetPath = path
return true
}
func (d *Destination) EnforceRatchets() {
d.mutex.Lock()
defer d.mutex.Unlock()
d.enforceRatchets = true
}
func (d *Destination) SetRetainedRatchets(count int) bool {
if count < 1 {
return false
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.ratchetCount = count
return true
}
func (d *Destination) SetRatchetInterval(interval int) bool {
if interval < 1 {
return false
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.ratchetInterval = interval
return true
}
func (d *Destination) SetDefaultAppData(data []byte) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.defaultAppData = data
}
func (d *Destination) ClearDefaultAppData() {
d.mutex.Lock()
defer d.mutex.Unlock()
d.defaultAppData = nil
}
func (d *Destination) RegisterRequestHandler(path string, responseGen func(string, []byte, []byte, []byte, *identity.Identity, int64) []byte, allow byte, allowedList [][]byte) error {
if path == "" {
return errors.New("path cannot be empty")
}
if allow != ALLOW_NONE && allow != ALLOW_ALL && allow != ALLOW_LIST {
return errors.New("invalid allow mode")
}
if allow == ALLOW_LIST && (allowedList == nil || len(allowedList) == 0) {
return errors.New("allowed list required for ALLOW_LIST mode")
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.requestHandlers[path] = &RequestHandler{
Path: path,
ResponseGenerator: responseGen,
AllowMode: allow,
AllowedList: allowedList,
}
return nil
}
func (d *Destination) DeregisterRequestHandler(path string) bool {
d.mutex.Lock()
defer d.mutex.Unlock()
if _, exists := d.requestHandlers[path]; exists {
delete(d.requestHandlers, path)
return true
}
return false
}
func (d *Destination) Encrypt(plaintext []byte) ([]byte, error) {
if d.destType == PLAIN {
return plaintext, nil
}
if d.identity == nil {
return nil, errors.New("no identity available for encryption")
}
switch d.destType {
case SINGLE:
return d.identity.Encrypt(plaintext, nil)
case GROUP:
return d.identity.EncryptSymmetric(plaintext)
default:
return nil, errors.New("unsupported destination type for encryption")
}
}
func (d *Destination) Decrypt(ciphertext []byte) ([]byte, error) {
if d.destType == PLAIN {
return ciphertext, nil
}
if d.identity == nil {
return nil, errors.New("no identity available for decryption")
}
switch d.destType {
case SINGLE:
return d.identity.Decrypt(ciphertext, nil)
case GROUP:
return d.identity.DecryptSymmetric(ciphertext)
default:
return nil, errors.New("unsupported destination type for decryption")
}
}
func (d *Destination) Sign(data []byte) ([]byte, error) {
if d.identity == nil {
return nil, errors.New("no identity available")
}
signature := d.identity.Sign(data)
return signature, nil
}

430
pkg/identity/identity.go Normal file
View File

@@ -0,0 +1,430 @@
package identity
import (
"crypto/aes"
"crypto/cipher"
"crypto/ed25519"
"crypto/rand"
"crypto/sha256"
"errors"
"io"
"os"
"sync"
"time"
"encoding/hex"
"fmt"
"path/filepath"
"bytes"
"golang.org/x/crypto/curve25519"
"golang.org/x/crypto/hkdf"
"github.com/Sudo-Ivan/reticulum-go/pkg/common"
)
const (
KeySize = 512 // Combined size of encryption and signing keys
RatchetSize = 256
RatchetExpiry = 2592000 // 30 days in seconds
TruncatedHashLen = 128 // bits
)
type Identity struct {
privateKey []byte
publicKey []byte
signingKey ed25519.PrivateKey
verificationKey ed25519.PublicKey
ratchets map[string][]byte
ratchetExpiry map[string]int64
mutex sync.RWMutex
}
func New() (*Identity, error) {
i := &Identity{
ratchets: make(map[string][]byte),
ratchetExpiry: make(map[string]int64),
}
// Generate X25519 key pair
var err error
i.privateKey = make([]byte, curve25519.ScalarSize)
if _, err = io.ReadFull(rand.Reader, i.privateKey); err != nil {
return nil, err
}
// Generate public key
i.publicKey, err = curve25519.X25519(i.privateKey, curve25519.Basepoint)
if err != nil {
return nil, err
}
// Generate Ed25519 signing keypair
publicKey, privateKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
return nil, err
}
i.signingKey = privateKey
i.verificationKey = publicKey
return i, nil
}
func FromBytes(data []byte) (*Identity, error) {
if len(data) != KeySize/8 {
return nil, errors.New("invalid key size")
}
i := &Identity{
ratchets: make(map[string][]byte),
ratchetExpiry: make(map[string]int64),
}
// First 32 bytes are X25519 private key
i.privateKey = data[:32]
var err error
i.publicKey, err = curve25519.X25519(i.privateKey, curve25519.Basepoint)
if err != nil {
return nil, err
}
// Next 32 bytes are Ed25519 private key
i.signingKey = ed25519.PrivateKey(data[32:])
i.verificationKey = i.signingKey.Public().(ed25519.PublicKey)
return i, nil
}
func (i *Identity) ToBytes() []byte {
data := make([]byte, KeySize/8)
copy(data[:32], i.privateKey)
copy(data[32:], i.signingKey)
return data
}
func (i *Identity) SaveToFile(path string) error {
return os.WriteFile(path, i.ToBytes(), 0600)
}
func LoadFromFile(path string) (*Identity, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return FromBytes(data)
}
func (i *Identity) Encrypt(plaintext []byte, ratchets []byte) ([]byte, error) {
// Generate ephemeral key pair
ephemeralPrivate := make([]byte, curve25519.ScalarSize)
if _, err := io.ReadFull(rand.Reader, ephemeralPrivate); err != nil {
return nil, err
}
ephemeralPublic, err := curve25519.X25519(ephemeralPrivate, curve25519.Basepoint)
if err != nil {
return nil, err
}
// Perform key exchange
sharedSecret, err := curve25519.X25519(ephemeralPrivate, i.publicKey)
if err != nil {
return nil, err
}
// Generate AES key from shared secret using HKDF
hash := sha256.New
hkdf := hkdf.New(hash, sharedSecret, nil, nil)
aesKey := make([]byte, 32)
if _, err := io.ReadFull(hkdf, aesKey); err != nil {
return nil, err
}
// Create AES-GCM cipher
block, err := aes.NewCipher(aesKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
// Generate nonce
nonce := make([]byte, gcm.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, err
}
// Encrypt plaintext
ciphertext := gcm.Seal(nil, nonce, plaintext, nil)
// Combine ephemeral public key, nonce and ciphertext
result := make([]byte, len(ephemeralPublic)+len(nonce)+len(ciphertext))
copy(result, ephemeralPublic)
copy(result[len(ephemeralPublic):], nonce)
copy(result[len(ephemeralPublic)+len(nonce):], ciphertext)
return result, nil
}
func (i *Identity) Decrypt(ciphertext []byte, ratchets []byte) ([]byte, error) {
if len(ciphertext) <= curve25519.ScalarSize {
return nil, errors.New("invalid ciphertext")
}
// Extract ephemeral public key
ephemeralPublic := ciphertext[:curve25519.ScalarSize]
// Perform key exchange
sharedSecret, err := curve25519.X25519(i.privateKey, ephemeralPublic)
if err != nil {
return nil, err
}
// Generate AES key from shared secret using HKDF
hash := sha256.New
hkdf := hkdf.New(hash, sharedSecret, nil, nil)
aesKey := make([]byte, 32)
if _, err := io.ReadFull(hkdf, aesKey); err != nil {
return nil, err
}
// Create AES-GCM cipher
block, err := aes.NewCipher(aesKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
// Extract nonce and encrypted data
nonceSize := gcm.NonceSize()
if len(ciphertext) < curve25519.ScalarSize+nonceSize {
return nil, errors.New("invalid ciphertext")
}
nonce := ciphertext[curve25519.ScalarSize : curve25519.ScalarSize+nonceSize]
encryptedData := ciphertext[curve25519.ScalarSize+nonceSize:]
// Decrypt data
plaintext, err := gcm.Open(nil, nonce, encryptedData, nil)
if err != nil {
return nil, err
}
return plaintext, nil
}
func (i *Identity) Sign(message []byte) []byte {
return ed25519.Sign(i.signingKey, message)
}
func (i *Identity) Verify(message, signature []byte) bool {
return ed25519.Verify(i.verificationKey, message, signature)
}
func (i *Identity) GetPublicKey() []byte {
return append([]byte{}, i.publicKey...)
}
func (i *Identity) AddRatchet(ratchetID string, ratchetKey []byte) {
i.mutex.Lock()
defer i.mutex.Unlock()
i.ratchets[ratchetID] = ratchetKey
i.ratchetExpiry[ratchetID] = time.Now().Unix() + RatchetExpiry
}
func (i *Identity) GetRatchet(ratchetID string) []byte {
i.mutex.RLock()
defer i.mutex.RUnlock()
if expiry, ok := i.ratchetExpiry[ratchetID]; ok {
if time.Now().Unix() < expiry {
return i.ratchets[ratchetID]
}
// Cleanup expired ratchet
delete(i.ratchets, ratchetID)
delete(i.ratchetExpiry, ratchetID)
}
return nil
}
// Helper functions
func TruncatedHash(data []byte) []byte {
hash := sha256.Sum256(data)
return hash[:TruncatedHashLen/8]
}
func FullHash(data []byte) []byte {
hash := sha256.Sum256(data)
return hash[:]
}
func HashFromHex(hexHash string) ([]byte, error) {
if len(hexHash) != TruncatedHashLen/4 { // hex string is twice the length of bytes
return nil, errors.New("invalid hash length")
}
hash := make([]byte, TruncatedHashLen/8)
_, err := hex.Decode(hash, []byte(hexHash))
if err != nil {
return nil, err
}
return hash, nil
}
func Recall(hash []byte) (*Identity, error) {
// Get config path from environment or default location
configDir := os.Getenv("RETICULUM_CONFIG_DIR")
if configDir == "" {
homeDir, err := os.UserHomeDir()
if err != nil {
return nil, fmt.Errorf("failed to get home directory: %w", err)
}
configDir = filepath.Join(homeDir, ".reticulum")
}
// Create identities directory if it doesn't exist
identitiesPath := filepath.Join(configDir, "identities")
if err := os.MkdirAll(identitiesPath, 0755); err != nil {
return nil, fmt.Errorf("failed to create identities directory: %w", err)
}
// Convert hash to hex for filename
hashHex := hex.EncodeToString(hash)
identityPath := filepath.Join(identitiesPath, hashHex)
// Check if identity file exists
if _, err := os.Stat(identityPath); os.IsNotExist(err) {
return nil, errors.New("identity not found")
}
// Load identity from file
identity, err := LoadFromFile(identityPath)
if err != nil {
return nil, fmt.Errorf("failed to load identity: %w", err)
}
// Verify the loaded identity matches the requested hash
if !bytes.Equal(TruncatedHash(identity.GetPublicKey()), hash) {
return nil, errors.New("identity hash mismatch")
}
return identity, nil
}
func LoadIdentity(cfg *common.ReticulumConfig) (*Identity, error) {
if cfg == nil {
return nil, errors.New("config cannot be nil")
}
// Try to load existing identity
identityPath := filepath.Join(filepath.Dir(cfg.ConfigPath), "identity")
if _, err := os.Stat(identityPath); err == nil {
// Identity exists, load it
return LoadFromFile(identityPath)
}
// Create new identity
identity, err := New()
if err != nil {
return nil, fmt.Errorf("failed to create new identity: %w", err)
}
// Save the new identity
if err := identity.SaveToFile(identityPath); err != nil {
return nil, fmt.Errorf("failed to save new identity: %w", err)
}
return identity, nil
}
func (i *Identity) GetCurrentRatchetKey() []byte {
i.mutex.RLock()
defer i.mutex.RUnlock()
// Generate new ratchet key if none exists
if len(i.ratchets) == 0 {
key := make([]byte, 32)
if _, err := rand.Read(key); err != nil {
return nil
}
ratchetID := fmt.Sprintf("%d", time.Now().Unix())
i.AddRatchet(ratchetID, key)
return key
}
// Return most recent ratchet key
var latestTime int64
var latestKey []byte
for id, key := range i.ratchets {
if expiry, ok := i.ratchetExpiry[id]; ok {
if expiry > latestTime {
latestTime = expiry
latestKey = key
}
}
}
return latestKey
}
func (i *Identity) EncryptSymmetric(plaintext []byte) ([]byte, error) {
key := i.GetCurrentRatchetKey()
if key == nil {
return nil, errors.New("no ratchet key available")
}
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, gcm.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, err
}
return gcm.Seal(nonce, nonce, plaintext, nil), nil
}
func (i *Identity) DecryptSymmetric(ciphertext []byte) ([]byte, error) {
key := i.GetCurrentRatchetKey()
if key == nil {
return nil, errors.New("no ratchet key available")
}
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonceSize := gcm.NonceSize()
if len(ciphertext) < nonceSize {
return nil, errors.New("ciphertext too short")
}
nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
plaintext, err := gcm.Open(nil, nonce, ciphertext, nil)
if err != nil {
return nil, fmt.Errorf("decryption failed: %w", err)
}
return plaintext, nil
}

View File

@@ -0,0 +1,79 @@
package interfaces
import (
"fmt"
"sync"
"time"
"encoding/binary"
"github.com/Sudo-Ivan/reticulum-go/pkg/common"
)
const (
BITRATE_MINIMUM = 5 // Minimum required bitrate in bits/sec
)
// BaseInterface embeds common.BaseInterface and implements common.Interface
type BaseInterface struct {
common.BaseInterface
}
func (i *BaseInterface) SetPacketCallback(callback common.PacketCallback) {
i.mutex.Lock()
defer i.mutex.Unlock()
i.packetCallback = callback
}
func (i *BaseInterface) ProcessIncoming(data []byte) {
i.mutex.RLock()
callback := i.packetCallback
i.mutex.RUnlock()
if callback != nil {
callback(data, i)
}
i.RxBytes += uint64(len(data))
}
func (i *BaseInterface) ProcessOutgoing(data []byte) error {
i.TxBytes += uint64(len(data))
return nil
}
func (i *BaseInterface) Detach() {
i.mutex.Lock()
defer i.mutex.Unlock()
i.Detached = true
i.Online = false
}
func (i *BaseInterface) SendPathRequest(packet []byte) error {
if !i.Online || i.Detached {
return fmt.Errorf("interface offline or detached")
}
frame := make([]byte, 0, len(packet)+2)
frame = append(frame, 0x01)
frame = append(frame, packet...)
return i.ProcessOutgoing(frame)
}
func (i *BaseInterface) SendLinkPacket(dest []byte, data []byte, timestamp time.Time) error {
if !i.Online || i.Detached {
return fmt.Errorf("interface offline or detached")
}
frame := make([]byte, 0, len(dest)+len(data)+9)
frame = append(frame, 0x02)
frame = append(frame, dest...)
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, uint64(timestamp.Unix()))
frame = append(frame, ts...)
frame = append(frame, data...)
return i.ProcessOutgoing(frame)
}

408
pkg/interfaces/tcp.go Normal file
View File

@@ -0,0 +1,408 @@
package interfaces
import (
"fmt"
"net"
"sync"
"time"
)
const (
HDLC_FLAG = 0x7E
HDLC_ESC = 0x7D
HDLC_ESC_MASK = 0x20
KISS_FEND = 0xC0
KISS_FESC = 0xDB
KISS_TFEND = 0xDC
KISS_TFESC = 0xDD
TCP_USER_TIMEOUT = 24
TCP_PROBE_AFTER = 5
TCP_PROBE_INTERVAL = 2
TCP_PROBES = 12
RECONNECT_WAIT = 5
INITIAL_TIMEOUT = 5
)
type TCPClientInterface struct {
Interface
conn net.Conn
targetAddr string
targetPort int
kissFraming bool
i2pTunneled bool
initiator bool
reconnecting bool
neverConnected bool
writing bool
maxReconnectTries int
packetBuffer []byte
packetType byte
}
func NewTCPClient(name string, targetAddr string, targetPort int, kissFraming bool, i2pTunneled bool) (*TCPClientInterface, error) {
tc := &TCPClientInterface{
Interface: Interface{
Name: name,
Mode: MODE_FULL,
MTU: 1064,
Bitrate: 10000000, // 10Mbps estimate
},
targetAddr: targetAddr,
targetPort: targetPort,
kissFraming: kissFraming,
i2pTunneled: i2pTunneled,
initiator: true,
}
if err := tc.connect(true); err != nil {
go tc.reconnect()
} else {
go tc.readLoop()
}
return tc, nil
}
func (tc *TCPClientInterface) connect(initial bool) error {
addr := fmt.Sprintf("%s:%d", tc.targetAddr, tc.targetPort)
conn, err := net.DialTimeout("tcp", addr, time.Second*INITIAL_TIMEOUT)
if err != nil {
if initial {
return fmt.Errorf("initial connection failed: %v", err)
}
return err
}
tc.conn = conn
tc.Online = true
tc.writing = false
tc.neverConnected = false
// Set TCP options
if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.SetNoDelay(true)
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(time.Second * TCP_PROBE_INTERVAL)
}
return nil
}
func (tc *TCPClientInterface) reconnect() {
if tc.initiator && !tc.reconnecting {
tc.reconnecting = true
attempts := 0
for !tc.Online {
time.Sleep(time.Second * RECONNECT_WAIT)
attempts++
if tc.maxReconnectTries > 0 && attempts > tc.maxReconnectTries {
tc.teardown()
break
}
if err := tc.connect(false); err != nil {
continue
}
go tc.readLoop()
break
}
tc.reconnecting = false
}
}
func (tc *TCPClientInterface) readLoop() {
buffer := make([]byte, tc.MTU)
inFrame := false
escape := false
dataBuffer := make([]byte, 0)
for {
n, err := tc.conn.Read(buffer)
if err != nil {
tc.Online = false
if tc.initiator && !tc.Detached {
go tc.reconnect()
} else {
tc.teardown()
}
return
}
for i := 0; i < n; i++ {
b := buffer[i]
if tc.kissFraming {
// KISS framing logic
if inFrame && b == KISS_FEND {
inFrame = false
tc.handlePacket(dataBuffer)
dataBuffer = dataBuffer[:0]
} else if b == KISS_FEND {
inFrame = true
} else if inFrame {
if b == KISS_FESC {
escape = true
} else {
if escape {
if b == KISS_TFEND {
b = KISS_FEND
}
if b == KISS_TFESC {
b = KISS_FESC
}
escape = false
}
dataBuffer = append(dataBuffer, b)
}
}
} else {
// HDLC framing logic
if inFrame && b == HDLC_FLAG {
inFrame = false
tc.handlePacket(dataBuffer)
dataBuffer = dataBuffer[:0]
} else if b == HDLC_FLAG {
inFrame = true
} else if inFrame {
if b == HDLC_ESC {
escape = true
} else {
if escape {
b ^= HDLC_ESC_MASK
escape = false
}
dataBuffer = append(dataBuffer, b)
}
}
}
}
}
}
func (tc *TCPClientInterface) handlePacket(data []byte) {
if len(data) < 1 {
return
}
packetType := data[0]
payload := data[1:]
switch packetType {
case 0x01: // Path request
tc.Interface.ProcessIncoming(payload)
case 0x02: // Link packet
if len(payload) < 40 { // minimum size for link packet
return
}
tc.Interface.ProcessIncoming(payload)
default:
// Unknown packet type
return
}
}
func (tc *TCPClientInterface) ProcessOutgoing(data []byte) error {
if !tc.Online {
return fmt.Errorf("interface offline")
}
tc.writing = true
defer func() { tc.writing = false }()
var frame []byte
if tc.kissFraming {
frame = append([]byte{KISS_FEND}, escapeKISS(data)...)
frame = append(frame, KISS_FEND)
} else {
frame = append([]byte{HDLC_FLAG}, escapeHDLC(data)...)
frame = append(frame, HDLC_FLAG)
}
if _, err := tc.conn.Write(frame); err != nil {
tc.teardown()
return fmt.Errorf("write failed: %v", err)
}
tc.Interface.ProcessOutgoing(data)
return nil
}
func (tc *TCPClientInterface) teardown() {
tc.Online = false
tc.IN = false
tc.OUT = false
if tc.conn != nil {
tc.conn.Close()
}
}
// Helper functions for escaping data
func escapeHDLC(data []byte) []byte {
escaped := make([]byte, 0, len(data)*2)
for _, b := range data {
if b == HDLC_FLAG || b == HDLC_ESC {
escaped = append(escaped, HDLC_ESC, b^HDLC_ESC_MASK)
} else {
escaped = append(escaped, b)
}
}
return escaped
}
func escapeKISS(data []byte) []byte {
escaped := make([]byte, 0, len(data)*2)
for _, b := range data {
if b == KISS_FEND {
escaped = append(escaped, KISS_FESC, KISS_TFEND)
} else if b == KISS_FESC {
escaped = append(escaped, KISS_FESC, KISS_TFESC)
} else {
escaped = append(escaped, b)
}
}
return escaped
}
type TCPServerInterface struct {
Interface
server net.Listener
bindAddr string
bindPort int
i2pTunneled bool
preferIPv6 bool
spawned []*TCPClientInterface
spawnedMutex sync.RWMutex
}
func NewTCPServer(name string, bindAddr string, bindPort int, i2pTunneled bool, preferIPv6 bool) (*TCPServerInterface, error) {
ts := &TCPServerInterface{
Interface: Interface{
Name: name,
Mode: MODE_FULL,
MTU: 1064,
Bitrate: 10000000, // 10Mbps estimate
},
bindAddr: bindAddr,
bindPort: bindPort,
i2pTunneled: i2pTunneled,
preferIPv6: preferIPv6,
spawned: make([]*TCPClientInterface, 0),
}
// Resolve bind address
var addr string
if ts.bindAddr == "" {
if ts.preferIPv6 {
addr = fmt.Sprintf("[::0]:%d", ts.bindPort)
} else {
addr = fmt.Sprintf("0.0.0.0:%d", ts.bindPort)
}
} else {
addr = fmt.Sprintf("%s:%d", ts.bindAddr, ts.bindPort)
}
// Create listener
var err error
ts.server, err = net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to create TCP listener: %v", err)
}
ts.Online = true
ts.IN = true
// Start accept loop
go ts.acceptLoop()
return ts, nil
}
func (ts *TCPServerInterface) acceptLoop() {
for {
conn, err := ts.server.Accept()
if err != nil {
if !ts.Detached {
// Log error and continue accepting
continue
}
return
}
// Create new client interface for this connection
client := &TCPClientInterface{
Interface: Interface{
Name: fmt.Sprintf("Client-%s-%s", ts.Name, conn.RemoteAddr()),
Mode: ts.Mode,
MTU: ts.MTU,
},
conn: conn,
i2pTunneled: ts.i2pTunneled,
}
// Configure TCP options
if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.SetNoDelay(true)
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(time.Duration(TCP_PROBE_INTERVAL) * time.Second)
}
client.Online = true
client.IN = ts.IN
client.OUT = ts.OUT
// Add to spawned interfaces
ts.spawnedMutex.Lock()
ts.spawned = append(ts.spawned, client)
ts.spawnedMutex.Unlock()
// Start client read loop
go client.readLoop()
}
}
func (ts *TCPServerInterface) Detach() {
ts.Interface.Detach()
if ts.server != nil {
ts.server.Close()
}
ts.spawnedMutex.Lock()
for _, client := range ts.spawned {
client.Detach()
}
ts.spawned = nil
ts.spawnedMutex.Unlock()
}
func (ts *TCPServerInterface) ProcessOutgoing(data []byte) error {
ts.spawnedMutex.RLock()
defer ts.spawnedMutex.RUnlock()
var lastErr error
for _, client := range ts.spawned {
if err := client.ProcessOutgoing(data); err != nil {
lastErr = err
}
}
return lastErr
}
func (ts *TCPServerInterface) String() string {
addr := ts.bindAddr
if addr == "" {
if ts.preferIPv6 {
addr = "[::0]"
} else {
addr = "0.0.0.0"
}
}
return fmt.Sprintf("TCPServerInterface[%s/%s:%d]", ts.Name, addr, ts.bindPort)
}

102
pkg/interfaces/udp.go Normal file
View File

@@ -0,0 +1,102 @@
package interfaces
import (
"fmt"
"net"
"sync"
)
type UDPInterface struct {
Interface
conn *net.UDPConn
listenAddr *net.UDPAddr
targetAddr *net.UDPAddr
readBuffer []byte
}
func NewUDPInterface(name string, listenAddr string, targetAddr string) (*UDPInterface, error) {
ui := &UDPInterface{
Interface: Interface{
Name: name,
Mode: MODE_FULL,
MTU: 1500,
Bitrate: 100000000, // 100Mbps estimate for UDP
},
readBuffer: make([]byte, 65535),
}
// Parse listen address
laddr, err := net.ResolveUDPAddr("udp", listenAddr)
if err != nil {
return nil, fmt.Errorf("invalid listen address: %v", err)
}
ui.listenAddr = laddr
// Parse target address if provided
if targetAddr != "" {
taddr, err := net.ResolveUDPAddr("udp", targetAddr)
if err != nil {
return nil, fmt.Errorf("invalid target address: %v", err)
}
ui.targetAddr = taddr
ui.OUT = true
}
// Create UDP connection
conn, err := net.ListenUDP("udp", ui.listenAddr)
if err != nil {
return nil, fmt.Errorf("failed to listen on UDP: %v", err)
}
ui.conn = conn
ui.IN = true
ui.Online = true
// Start read loop
go ui.readLoop()
return ui, nil
}
func (ui *UDPInterface) readLoop() {
for {
if !ui.Online {
return
}
n, addr, err := ui.conn.ReadFromUDP(ui.readBuffer)
if err != nil {
if !ui.Detached {
// Log error
}
continue
}
// Copy received data
data := make([]byte, n)
copy(data, ui.readBuffer[:n])
// Process packet
ui.ProcessIncoming(data)
}
}
func (ui *UDPInterface) ProcessOutgoing(data []byte) error {
if !ui.Online || ui.targetAddr == nil {
return fmt.Errorf("interface offline or no target address configured")
}
_, err := ui.conn.WriteToUDP(data, ui.targetAddr)
if err != nil {
return fmt.Errorf("UDP write failed: %v", err)
}
ui.Interface.ProcessOutgoing(data)
return nil
}
func (ui *UDPInterface) Detach() {
ui.Interface.Detach()
if ui.conn != nil {
ui.conn.Close()
}
}

549
pkg/link/link.go Normal file
View File

@@ -0,0 +1,549 @@
package link
import (
"crypto/aes"
"crypto/cipher"
"crypto/ed25519"
"crypto/rand"
"crypto/sha256"
"encoding/binary"
"errors"
"io"
"sync"
"time"
"github.com/Sudo-Ivan/reticulum-go/pkg/identity"
"github.com/Sudo-Ivan/reticulum-go/pkg/packet"
)
const (
CURVE = "Curve25519"
ESTABLISHMENT_TIMEOUT_PER_HOP = 6
KEEPALIVE_TIMEOUT_FACTOR = 4
STALE_GRACE = 2
KEEPALIVE = 360
STALE_TIME = 720
ACCEPT_NONE = 0x00
ACCEPT_ALL = 0x01
ACCEPT_APP = 0x02
STATUS_PENDING = 0x00
STATUS_ACTIVE = 0x01
STATUS_CLOSED = 0x02
STATUS_FAILED = 0x03
)
type Link struct {
mutex sync.RWMutex
destination interface{}
status byte
establishedAt time.Time
lastInbound time.Time
lastOutbound time.Time
lastDataReceived time.Time
lastDataSent time.Time
remoteIdentity *identity.Identity
sessionKey []byte
linkID []byte
rtt float64
establishmentRate float64
trackPhyStats bool
rssi float64
snr float64
q float64
resourceStrategy byte
establishedCallback func(*Link)
closedCallback func(*Link)
packetCallback func([]byte, *packet.Packet)
resourceCallback func(interface{}) bool
resourceStartedCallback func(interface{})
resourceConcludedCallback func(interface{})
remoteIdentifiedCallback func(*Link, *identity.Identity)
}
func New(dest interface{}, establishedCb func(*Link), closedCb func(*Link)) *Link {
l := &Link{
destination: dest,
status: STATUS_PENDING,
establishedAt: time.Time{},
lastInbound: time.Time{},
lastOutbound: time.Time{},
lastDataReceived: time.Time{},
lastDataSent: time.Time{},
resourceStrategy: ACCEPT_NONE,
establishedCallback: establishedCb,
closedCallback: closedCb,
}
return l
}
func (l *Link) Identify(id *identity.Identity) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.status != STATUS_ACTIVE {
return errors.New("link not active")
}
// Create identification message
idMsg := append(id.GetPublicKey(), id.Sign(l.linkID)...)
// Encrypt and send identification
err := l.SendPacket(idMsg)
if err != nil {
return err
}
return nil
}
func (l *Link) HandleIdentification(data []byte) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if len(data) < ed25519.PublicKeySize+ed25519.SignatureSize {
return errors.New("invalid identification data")
}
pubKey := data[:ed25519.PublicKeySize]
signature := data[ed25519.PublicKeySize:]
remoteIdentity := &identity.Identity{}
if !remoteIdentity.LoadPublicKey(pubKey) {
return errors.New("invalid remote public key")
}
// Verify signature of link ID
if !remoteIdentity.Verify(l.linkID, signature) {
return errors.New("invalid identification signature")
}
l.remoteIdentity = remoteIdentity
if l.remoteIdentifiedCallback != nil {
l.remoteIdentifiedCallback(l, remoteIdentity)
}
return nil
}
func (l *Link) Request(path string, data []byte, timeout time.Duration) (*RequestReceipt, error) {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.status != STATUS_ACTIVE {
return nil, errors.New("link not active")
}
requestID := make([]byte, 16)
if _, err := rand.Read(requestID); err != nil {
return nil, err
}
// Create request message
reqMsg := make([]byte, 0)
reqMsg = append(reqMsg, requestID...)
reqMsg = append(reqMsg, []byte(path)...)
if data != nil {
reqMsg = append(reqMsg, data...)
}
receipt := &RequestReceipt{
requestID: requestID,
status: STATUS_PENDING,
sentAt: time.Now(),
}
// Send request
err := l.SendPacket(reqMsg)
if err != nil {
return nil, err
}
// Set timeout
if timeout > 0 {
go func() {
time.Sleep(timeout)
l.mutex.Lock()
if receipt.status == STATUS_PENDING {
receipt.status = STATUS_FAILED
}
l.mutex.Unlock()
}()
}
return receipt, nil
}
type RequestReceipt struct {
mutex sync.RWMutex
requestID []byte
status byte
sentAt time.Time
receivedAt time.Time
response []byte
}
func (r *RequestReceipt) GetRequestID() []byte {
r.mutex.RLock()
defer r.mutex.RUnlock()
return append([]byte{}, r.requestID...)
}
func (r *RequestReceipt) GetStatus() byte {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.status
}
func (r *RequestReceipt) GetResponse() []byte {
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.response == nil {
return nil
}
return append([]byte{}, r.response...)
}
func (r *RequestReceipt) GetResponseTime() float64 {
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.receivedAt.IsZero() {
return 0
}
return r.receivedAt.Sub(r.sentAt).Seconds()
}
func (r *RequestReceipt) Concluded() bool {
status := r.GetStatus()
return status == STATUS_ACTIVE || status == STATUS_FAILED
}
func (l *Link) TrackPhyStats(track bool) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.trackPhyStats = track
}
func (l *Link) GetRSSI() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.rssi
}
func (l *Link) GetSNR() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.snr
}
func (l *Link) GetQ() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.q
}
func (l *Link) GetEstablishmentRate() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.establishmentRate
}
func (l *Link) GetAge() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
if l.establishedAt.IsZero() {
return 0
}
return time.Since(l.establishedAt).Seconds()
}
func (l *Link) NoInboundFor() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
if l.lastInbound.IsZero() {
return 0
}
return time.Since(l.lastInbound).Seconds()
}
func (l *Link) NoOutboundFor() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
if l.lastOutbound.IsZero() {
return 0
}
return time.Since(l.lastOutbound).Seconds()
}
func (l *Link) NoDataFor() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
lastData := l.lastDataReceived
if l.lastDataSent.After(lastData) {
lastData = l.lastDataSent
}
if lastData.IsZero() {
return 0
}
return time.Since(lastData).Seconds()
}
func (l *Link) InactiveFor() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
lastActivity := l.lastInbound
if l.lastOutbound.After(lastActivity) {
lastActivity = l.lastOutbound
}
if lastActivity.IsZero() {
return 0
}
return time.Since(lastActivity).Seconds()
}
func (l *Link) GetRemoteIdentity() *identity.Identity {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.remoteIdentity
}
func (l *Link) Teardown() {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.status == STATUS_ACTIVE {
l.status = STATUS_CLOSED
if l.closedCallback != nil {
l.closedCallback(l)
}
}
}
func (l *Link) SetLinkClosedCallback(callback func(*Link)) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.closedCallback = callback
}
func (l *Link) SetPacketCallback(callback func([]byte, *packet.Packet)) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.packetCallback = callback
}
func (l *Link) SetResourceCallback(callback func(interface{}) bool) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.resourceCallback = callback
}
func (l *Link) SetResourceStartedCallback(callback func(interface{})) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.resourceStartedCallback = callback
}
func (l *Link) SetResourceConcludedCallback(callback func(interface{})) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.resourceConcludedCallback = callback
}
func (l *Link) SetRemoteIdentifiedCallback(callback func(*Link, *identity.Identity)) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.remoteIdentifiedCallback = callback
}
func (l *Link) SetResourceStrategy(strategy byte) error {
if strategy != ACCEPT_NONE && strategy != ACCEPT_ALL && strategy != ACCEPT_APP {
return errors.New("unsupported resource strategy")
}
l.mutex.Lock()
defer l.mutex.Unlock()
l.resourceStrategy = strategy
return nil
}
func NewLink(destination interface{}, establishedCallback func(*Link), closedCallback func(*Link)) *Link {
l := &Link{
destination: destination,
status: STATUS_PENDING,
establishedAt: time.Time{},
lastInbound: time.Time{},
lastOutbound: time.Time{},
lastDataReceived: time.Time{},
lastDataSent: time.Time{},
establishedCallback: establishedCallback,
closedCallback: closedCallback,
resourceStrategy: ACCEPT_NONE,
trackPhyStats: false,
}
return l
}
func (l *Link) Establish() error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.status != STATUS_PENDING {
return errors.New("link already established or failed")
}
// Generate session key using ECDH
ephemeralKey := make([]byte, 32)
if _, err := rand.Read(ephemeralKey); err != nil {
return err
}
l.sessionKey = ephemeralKey
l.establishedAt = time.Now()
l.status = STATUS_ACTIVE
if l.establishedCallback != nil {
l.establishedCallback(l)
}
return nil
}
func (l *Link) SendPacket(data []byte) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.status != STATUS_ACTIVE {
return errors.New("link not active")
}
// Encrypt data using session key
encryptedData, err := l.encrypt(data)
if err != nil {
return err
}
l.lastOutbound = time.Now()
l.lastDataSent = time.Now()
if l.packetCallback != nil {
l.packetCallback(encryptedData, nil)
}
return nil
}
func (l *Link) HandleInbound(data []byte) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.status != STATUS_ACTIVE {
return errors.New("link not active")
}
// Decrypt data using session key
decryptedData, err := l.decrypt(data)
if err != nil {
return err
}
l.lastInbound = time.Now()
l.lastDataReceived = time.Now()
if l.packetCallback != nil {
l.packetCallback(decryptedData, nil)
}
return nil
}
func (l *Link) encrypt(data []byte) ([]byte, error) {
if l.sessionKey == nil {
return nil, errors.New("no session key available")
}
block, err := aes.NewCipher(l.sessionKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, gcm.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, err
}
return gcm.Seal(nonce, nonce, data, nil), nil
}
func (l *Link) decrypt(data []byte) ([]byte, error) {
if l.sessionKey == nil {
return nil, errors.New("no session key available")
}
block, err := aes.NewCipher(l.sessionKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonceSize := gcm.NonceSize()
if len(data) < nonceSize {
return nil, errors.New("ciphertext too short")
}
nonce, ciphertext := data[:nonceSize], data[nonceSize:]
return gcm.Open(nil, nonce, ciphertext, nil)
}
func (l *Link) UpdatePhyStats(rssi float64, snr float64, q float64) {
if !l.trackPhyStats {
return
}
l.mutex.Lock()
defer l.mutex.Unlock()
l.rssi = rssi
l.snr = snr
l.q = q
}
func (l *Link) GetRTT() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.rtt
}
func (l *Link) SetRTT(rtt float64) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.rtt = rtt
}
func (l *Link) GetStatus() byte {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.status
}
func (l *Link) IsActive() bool {
return l.GetStatus() == STATUS_ACTIVE
}

27
pkg/packet/constants.go Normal file
View File

@@ -0,0 +1,27 @@
package packet
const (
// MTU constants
EncryptedMDU = 383 // Maximum size of payload data in encrypted packet
PlainMDU = 464 // Maximum size of payload data in unencrypted packet
// Header Types
HeaderType1 = 0 // Two byte header, one 16 byte address field
HeaderType2 = 1 // Two byte header, two 16 byte address fields
// Propagation Types
PropagationBroadcast = 0
PropagationTransport = 1
// Destination Types
DestinationSingle = 0
DestinationGroup = 1
DestinationPlain = 2
DestinationLink = 3
// Packet Types
PacketData = 0
PacketAnnounce = 1
PacketLinkRequest = 2
PacketProof = 3
)

171
pkg/packet/packet.go Normal file
View File

@@ -0,0 +1,171 @@
package packet
import (
"encoding/binary"
"errors"
)
const (
HeaderSize = 2
AddressSize = 16
ContextSize = 1
MaxDataSize = 465 // Maximum size of payload data
)
// Header flags and types
const (
// First byte flags
IFACFlag = 0x80 // Interface authentication code flag
HeaderTypeFlag = 0x40 // Header type flag
ContextFlag = 0x20 // Context flag
PropagationFlags = 0x18 // Propagation type flags (bits 3-4)
DestinationFlags = 0x06 // Destination type flags (bits 1-2)
PacketTypeFlags = 0x01 // Packet type flags (bit 0)
// Second byte
HopsField = 0xFF // Number of hops (entire byte)
)
type Packet struct {
Header [2]byte
Addresses []byte // Either 16 or 32 bytes depending on header type
Context byte
Data []byte
AccessCode []byte // Optional: Only present if IFAC flag is set
}
func NewPacket(headerType, propagationType, destinationType, packetType byte, hops byte) *Packet {
p := &Packet{
Header: [2]byte{0, hops},
Addresses: make([]byte, 0),
Data: make([]byte, 0),
}
// Set header type
if headerType == HeaderType2 {
p.Header[0] |= HeaderTypeFlag
p.Addresses = make([]byte, 2*AddressSize) // Two address fields
} else {
p.Addresses = make([]byte, AddressSize) // One address field
}
// Set propagation type
p.Header[0] |= (propagationType << 3) & PropagationFlags
// Set destination type
p.Header[0] |= (destinationType << 1) & DestinationFlags
// Set packet type
p.Header[0] |= packetType & PacketTypeFlags
return p
}
func (p *Packet) SetAccessCode(code []byte) {
p.AccessCode = code
p.Header[0] |= IFACFlag
}
func (p *Packet) SetContext(context byte) {
p.Context = context
p.Header[0] |= ContextFlag
}
func (p *Packet) SetData(data []byte) error {
if len(data) > MaxDataSize {
return errors.New("data exceeds maximum allowed size")
}
p.Data = data
return nil
}
func (p *Packet) SetAddress(index int, address []byte) error {
if len(address) != AddressSize {
return errors.New("invalid address size")
}
offset := index * AddressSize
if offset+AddressSize > len(p.Addresses) {
return errors.New("address index out of range")
}
copy(p.Addresses[offset:], address)
return nil
}
func (p *Packet) Serialize() ([]byte, error) {
totalSize := HeaderSize + len(p.Addresses) + ContextSize + len(p.Data)
if p.AccessCode != nil {
totalSize += len(p.AccessCode)
}
buffer := make([]byte, totalSize)
offset := 0
// Write header
copy(buffer[offset:], p.Header[:])
offset += HeaderSize
// Write access code if present
if p.AccessCode != nil {
copy(buffer[offset:], p.AccessCode)
offset += len(p.AccessCode)
}
// Write addresses
copy(buffer[offset:], p.Addresses)
offset += len(p.Addresses)
// Write context
buffer[offset] = p.Context
offset += ContextSize
// Write data
copy(buffer[offset:], p.Data)
return buffer, nil
}
func ParsePacket(data []byte) (*Packet, error) {
if len(data) < HeaderSize {
return nil, errors.New("packet data too short")
}
p := &Packet{
Header: [2]byte{data[0], data[1]},
}
offset := HeaderSize
// Handle access code if present
if p.Header[0]&IFACFlag != 0 {
// Access code handling would go here
// For now, we'll assume no access code
return nil, errors.New("access code handling not implemented")
}
// Determine address size based on header type
addrLen := AddressSize
if p.Header[0]&HeaderTypeFlag != 0 {
addrLen = 2 * AddressSize
}
if len(data[offset:]) < addrLen+ContextSize {
return nil, errors.New("packet data too short for addresses and context")
}
// Copy addresses
p.Addresses = make([]byte, addrLen)
copy(p.Addresses, data[offset:offset+addrLen])
offset += addrLen
// Copy context
p.Context = data[offset]
offset++
// Copy remaining data
p.Data = make([]byte, len(data)-offset)
copy(p.Data, data[offset:])
return p, nil
}

386
pkg/resource/resource.go Normal file
View File

@@ -0,0 +1,386 @@
package resource
import (
"crypto/sha256"
"encoding/binary"
"errors"
"io"
"strings"
"sync"
"time"
"path/filepath"
)
const (
STATUS_PENDING = 0x00
STATUS_ACTIVE = 0x01
STATUS_COMPLETE = 0x02
STATUS_FAILED = 0x03
STATUS_CANCELLED = 0x04
DEFAULT_SEGMENT_SIZE = 384 // Based on ENCRYPTED_MDU
MAX_SEGMENTS = 65535
CLEANUP_INTERVAL = 300 // 5 minutes
// Window size constants
WINDOW = 4
WINDOW_MIN = 2
WINDOW_MAX_SLOW = 10
WINDOW_MAX_VERY_SLOW = 4
WINDOW_MAX_FAST = 75
WINDOW_MAX = WINDOW_MAX_FAST
// Rate thresholds
FAST_RATE_THRESHOLD = WINDOW_MAX_SLOW - WINDOW - 2
VERY_SLOW_RATE_THRESHOLD = 2
// Transfer rates (bytes per second)
RATE_FAST = (50 * 1000) / 8 // 50 Kbps
RATE_VERY_SLOW = (2 * 1000) / 8 // 2 Kbps
// Window flexibility
WINDOW_FLEXIBILITY = 4
// Hash and segment constants
MAPHASH_LEN = 4
RANDOM_HASH_SIZE = 4
// Size limits
MAX_EFFICIENT_SIZE = 16*1024*1024 - 1 // ~16MB
AUTO_COMPRESS_MAX_SIZE = MAX_EFFICIENT_SIZE
// Timeouts and retries
PART_TIMEOUT_FACTOR = 4
PART_TIMEOUT_FACTOR_AFTER_RTT = 2
PROOF_TIMEOUT_FACTOR = 3
MAX_RETRIES = 16
MAX_ADV_RETRIES = 4
SENDER_GRACE_TIME = 10.0
PROCESSING_GRACE = 1.0
RETRY_GRACE_TIME = 0.25
PER_RETRY_DELAY = 0.5
)
type Resource struct {
mutex sync.RWMutex
data []byte
fileHandle io.ReadWriteSeeker
hash []byte
randomHash []byte
originalHash []byte
status byte
compressed bool
autoCompress bool
encrypted bool
split bool
segments uint16
segmentIndex uint16
totalSegments uint16
completedParts map[uint16]bool
transferSize int64
dataSize int64
progress float64
window int
windowMax int
windowMin int
windowFlexibility int
rtt float64
fastRateRounds int
verySlowRateRounds int
createdAt time.Time
completedAt time.Time
callback func(*Resource)
progressCallback func(*Resource)
}
func New(data interface{}, autoCompress bool) (*Resource, error) {
r := &Resource{
status: STATUS_PENDING,
compressed: false,
autoCompress: autoCompress,
completedParts: make(map[uint16]bool),
createdAt: time.Now(),
progress: 0.0,
}
switch v := data.(type) {
case []byte:
r.data = v
r.dataSize = int64(len(v))
case io.ReadWriteSeeker:
r.fileHandle = v
size, err := v.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}
r.dataSize = size
_, err = v.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported data type")
}
// Calculate segments needed
r.segments = uint16((r.dataSize + DEFAULT_SEGMENT_SIZE - 1) / DEFAULT_SEGMENT_SIZE)
if r.segments > MAX_SEGMENTS {
return nil, errors.New("resource too large")
}
// Calculate transfer size
r.transferSize = r.dataSize
if r.autoCompress {
// Estimate compressed size based on data type and content
if r.data != nil {
// For in-memory data, we can analyze content
compressibility := estimateCompressibility(r.data)
r.transferSize = int64(float64(r.dataSize) * compressibility)
} else if r.fileHandle != nil {
// For file handles, use extension-based estimation
ext := strings.ToLower(filepath.Ext(r.fileHandle.Name()))
r.transferSize = estimateFileCompression(r.dataSize, ext)
}
// Ensure minimum size and add compression overhead
if r.transferSize < r.dataSize/10 {
r.transferSize = r.dataSize / 10
}
r.transferSize += 64 // Header overhead for compression
}
// Calculate resource hash
if err := r.calculateHash(); err != nil {
return nil, err
}
return r, nil
}
func (r *Resource) calculateHash() error {
h := sha256.New()
if r.data != nil {
h.Write(r.data)
} else if r.fileHandle != nil {
if _, err := r.fileHandle.Seek(0, io.SeekStart); err != nil {
return err
}
if _, err := io.Copy(h, r.fileHandle); err != nil {
return err
}
if _, err := r.fileHandle.Seek(0, io.SeekStart); err != nil {
return err
}
}
r.hash = h.Sum(nil)
return nil
}
func (r *Resource) GetHash() []byte {
r.mutex.RLock()
defer r.mutex.RUnlock()
return append([]byte{}, r.hash...)
}
func (r *Resource) GetStatus() byte {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.status
}
func (r *Resource) GetProgress() float64 {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.progress
}
func (r *Resource) GetTransferSize() int64 {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.transferSize
}
func (r *Resource) GetDataSize() int64 {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.dataSize
}
func (r *Resource) GetSegments() uint16 {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.segments
}
func (r *Resource) IsCompressed() bool {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.compressed
}
func (r *Resource) Cancel() {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.status == STATUS_PENDING || r.status == STATUS_ACTIVE {
r.status = STATUS_CANCELLED
r.completedAt = time.Now()
if r.callback != nil {
r.callback(r)
}
}
}
func (r *Resource) SetCallback(callback func(*Resource)) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.callback = callback
}
func (r *Resource) SetProgressCallback(callback func(*Resource)) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.progressCallback = callback
}
// GetSegmentData returns the data for a specific segment
func (r *Resource) GetSegmentData(segment uint16) ([]byte, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
if segment >= r.segments {
return nil, errors.New("invalid segment number")
}
start := int64(segment) * DEFAULT_SEGMENT_SIZE
size := DEFAULT_SEGMENT_SIZE
if segment == r.segments-1 {
size = int(r.dataSize - start)
}
data := make([]byte, size)
if r.data != nil {
copy(data, r.data[start:start+int64(size)])
return data, nil
}
if r.fileHandle != nil {
if _, err := r.fileHandle.Seek(start, io.SeekStart); err != nil {
return nil, err
}
if _, err := io.ReadFull(r.fileHandle, data); err != nil {
return nil, err
}
return data, nil
}
return nil, errors.New("no data source available")
}
// MarkSegmentComplete marks a segment as completed and updates progress
func (r *Resource) MarkSegmentComplete(segment uint16) {
r.mutex.Lock()
defer r.mutex.Unlock()
if segment >= r.segments {
return
}
r.completedParts[segment] = true
completed := len(r.completedParts)
r.progress = float64(completed) / float64(r.segments)
if r.progressCallback != nil {
r.progressCallback(r)
}
// Check if all segments are complete
if completed == int(r.segments) {
r.status = STATUS_COMPLETE
r.completedAt = time.Now()
if r.callback != nil {
r.callback(r)
}
}
}
// IsSegmentComplete checks if a specific segment is complete
func (r *Resource) IsSegmentComplete(segment uint16) bool {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.completedParts[segment]
}
// Activate marks the resource as active
func (r *Resource) Activate() {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.status == STATUS_PENDING {
r.status = STATUS_ACTIVE
}
}
// SetFailed marks the resource as failed
func (r *Resource) SetFailed() {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.status != STATUS_COMPLETE {
r.status = STATUS_FAILED
r.completedAt = time.Now()
if r.callback != nil {
r.callback(r)
}
}
}
// Helper functions for compression estimation
func estimateCompressibility(data []byte) float64 {
// Sample the data to estimate compressibility
sampleSize := 4096
if len(data) < sampleSize {
sampleSize = len(data)
}
// Count unique bytes in sample
uniqueBytes := make(map[byte]struct{})
for i := 0; i < sampleSize; i++ {
uniqueBytes[data[i]] = struct{}{}
}
// Calculate entropy-based compression estimate
uniqueRatio := float64(len(uniqueBytes)) / float64(sampleSize)
return 0.3 + (0.7 * uniqueRatio) // Base compression ratio between 0.3 and 1.0
}
func estimateFileCompression(size int64, extension string) int64 {
// Compression ratio estimates based on common file types
compressionRatios := map[string]float64{
".txt": 0.4, // Text compresses well
".log": 0.4,
".json": 0.4,
".xml": 0.4,
".html": 0.4,
".csv": 0.5,
".doc": 0.8, // Already compressed
".docx": 0.95,
".pdf": 0.95,
".jpg": 0.99, // Already compressed
".jpeg": 0.99,
".png": 0.99,
".gif": 0.99,
".mp3": 0.99,
".mp4": 0.99,
".zip": 0.99,
".gz": 0.99,
".rar": 0.99,
}
ratio, exists := compressionRatios[extension]
if !exists {
ratio = 0.7 // Default compression ratio for unknown types
}
return int64(float64(size) * ratio)
}

459
pkg/transport/transport.go Normal file
View File

@@ -0,0 +1,459 @@
package transport
import (
"encoding/binary"
"errors"
"net"
"sync"
"time"
"github.com/Sudo-Ivan/reticulum-go/pkg/common"
)
var (
transportInstance *Transport
transportMutex sync.Mutex
)
const (
PathfinderM = 128 // Maximum number of hops
PathRequestTTL = 300 // Time to live for path requests in seconds
AnnounceTimeout = 15 // Timeout for announce responses in seconds
// Link constants
EstablishmentTimeoutPerHop = 6 // Timeout for link establishment in seconds per hop
KeepaliveTimeoutFactor = 4 // RTT timeout factor for link timeout calculation
StaleGrace = 2 // Grace period in seconds for link timeout
Keepalive = 360 // Interval for sending keep-alive packets in seconds
StaleTime = 720 // Time after which link is considered stale
// Direction constants
OUT = 0x02
IN = 0x01
// Destination type constants
SINGLE = 0x00
GROUP = 0x01
PLAIN = 0x02
)
type PathInfo struct {
NextHop []byte
Interface string
Hops uint8
LastUpdated time.Time
}
type Transport struct {
config *common.ReticulumConfig
interfaces map[string]common.NetworkInterface
paths map[string]*common.Path
announceHandlers []AnnounceHandler
mutex sync.RWMutex
handlerLock sync.RWMutex
pathLock sync.RWMutex
}
func NewTransport(config *common.ReticulumConfig) (*Transport, error) {
t := &Transport{
config: config,
interfaces: make(map[string]common.NetworkInterface),
paths: make(map[string]*common.Path),
}
transportMutex.Lock()
transportInstance = t
transportMutex.Unlock()
return t, nil
}
// Add GetTransportInstance function
func GetTransportInstance() *Transport {
transportMutex.Lock()
defer transportMutex.Unlock()
return transportInstance
}
// Update the interface methods
func (t *Transport) RegisterInterface(name string, iface common.NetworkInterface) error {
t.mutex.Lock()
defer t.mutex.Unlock()
if _, exists := t.interfaces[name]; exists {
return errors.New("interface already registered")
}
t.interfaces[name] = iface
return nil
}
func (t *Transport) GetInterface(name string) (common.NetworkInterface, error) {
t.mutex.RLock()
defer t.mutex.RUnlock()
iface, exists := t.interfaces[name]
if !exists {
return nil, errors.New("interface not found")
}
return iface, nil
}
// Update the Close method
func (t *Transport) Close() error {
t.mutex.Lock()
defer t.mutex.Unlock()
for _, iface := range t.interfaces {
iface.Detach()
}
return nil
}
type Link struct {
destination []byte
establishedAt time.Time
lastInbound time.Time
lastOutbound time.Time
lastData time.Time
rtt time.Duration
establishedCb func()
closedCb func()
packetCb func([]byte)
resourceCb func(interface{}) bool
resourceStrategy int
}
type Destination struct {
Identity interface{}
Direction int
Type int
AppName string
Aspects []string
}
func NewLink(dest []byte, establishedCallback func(), closedCallback func()) *Link {
return &Link{
destination: dest,
establishedAt: time.Now(),
lastInbound: time.Now(),
lastOutbound: time.Now(),
lastData: time.Now(),
establishedCb: establishedCallback,
closedCb: closedCallback,
}
}
// Link methods
func (l *Link) GetAge() time.Duration {
return time.Since(l.establishedAt)
}
func (l *Link) NoInboundFor() time.Duration {
return time.Since(l.lastInbound)
}
func (l *Link) NoOutboundFor() time.Duration {
return time.Since(l.lastOutbound)
}
func (l *Link) NoDataFor() time.Duration {
return time.Since(l.lastData)
}
func (l *Link) InactiveFor() time.Duration {
inbound := l.NoInboundFor()
outbound := l.NoOutboundFor()
if inbound < outbound {
return inbound
}
return outbound
}
func (l *Link) SetPacketCallback(cb func([]byte)) {
l.packetCb = cb
}
func (l *Link) SetResourceCallback(cb func(interface{}) bool) {
l.resourceCb = cb
}
func (l *Link) Teardown() {
if l.closedCb != nil {
l.closedCb()
}
}
func (l *Link) Send(data []byte) error {
l.lastOutbound = time.Now()
l.lastData = time.Now()
packet := &LinkPacket{
Destination: l.destination,
Data: data,
Timestamp: time.Now(),
}
if l.rtt == 0 {
l.rtt = l.InactiveFor()
}
return packet.send()
}
type AnnounceHandler interface {
ReceivedAnnounce(destinationHash []byte, identity interface{}, appData []byte)
}
func (t *Transport) RegisterAnnounceHandler(handler AnnounceHandler) {
t.handlerLock.Lock()
defer t.handlerLock.Unlock()
t.announceHandlers = append(t.announceHandlers, handler)
}
func (t *Transport) DeregisterAnnounceHandler(handler AnnounceHandler) {
t.handlerLock.Lock()
defer t.handlerLock.Unlock()
for i, h := range t.announceHandlers {
if h == handler {
t.announceHandlers = append(t.announceHandlers[:i], t.announceHandlers[i+1:]...)
return
}
}
}
func (t *Transport) HasPath(destinationHash []byte) bool {
t.pathLock.RLock()
defer t.pathLock.RUnlock()
path, exists := t.paths[string(destinationHash)]
if !exists {
return false
}
// Check if path is still valid (not expired)
if time.Since(path.LastUpdated) > time.Duration(PathRequestTTL)*time.Second {
delete(t.paths, string(destinationHash))
return false
}
return true
}
func (t *Transport) HopsTo(destinationHash []byte) uint8 {
t.pathLock.RLock()
defer t.pathLock.RUnlock()
path, exists := t.paths[string(destinationHash)]
if !exists {
return PathfinderM
}
return path.Hops
}
func (t *Transport) NextHop(destinationHash []byte) []byte {
t.pathLock.RLock()
defer t.pathLock.RUnlock()
path, exists := t.paths[string(destinationHash)]
if !exists {
return nil
}
return path.NextHop
}
func (t *Transport) NextHopInterface(destinationHash []byte) string {
t.pathLock.RLock()
defer t.pathLock.RUnlock()
path, exists := t.paths[string(destinationHash)]
if !exists {
return ""
}
return path.Interface.GetName()
}
func (t *Transport) RequestPath(destinationHash []byte, onInterface string, tag []byte, recursive bool) error {
packet := &PathRequest{
DestinationHash: destinationHash,
Tag: tag,
TTL: PathRequestTTL,
Recursive: recursive,
}
if onInterface != "" {
return t.sendPathRequest(packet, onInterface)
}
return t.broadcastPathRequest(packet)
}
func (t *Transport) UpdatePath(destinationHash []byte, nextHop []byte, interfaceName string, hops uint8) {
t.pathLock.Lock()
defer t.pathLock.Unlock()
iface, err := t.GetInterface(interfaceName)
if err != nil {
return
}
t.paths[string(destinationHash)] = &common.Path{
Interface: iface,
NextHop: nextHop,
Hops: hops,
LastUpdated: time.Now(),
}
}
func (t *Transport) HandleAnnounce(destinationHash []byte, identity []byte, appData []byte) {
t.handlerLock.RLock()
defer t.handlerLock.RUnlock()
for _, handler := range t.announceHandlers {
handler.ReceivedAnnounce(destinationHash, identity, appData)
}
}
func (t *Transport) NewDestination(identity interface{}, direction int, destType int, appName string, aspects ...string) *Destination {
return &Destination{
Identity: identity,
Direction: direction,
Type: destType,
AppName: appName,
Aspects: aspects,
}
}
func (t *Transport) NewLink(dest []byte, establishedCallback func(), closedCallback func()) *Link {
return NewLink(dest, establishedCallback, closedCallback)
}
type PathRequest struct {
DestinationHash []byte
Tag []byte
TTL int
Recursive bool
}
type LinkPacket struct {
Destination []byte
Data []byte
Timestamp time.Time
}
func (p *LinkPacket) send() error {
// Get transport instance
t := GetTransportInstance()
// Create packet header
header := make([]byte, 0, 64)
header = append(header, 0x02) // Link packet type
header = append(header, p.Destination...)
// Add timestamp
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, uint64(p.Timestamp.Unix()))
header = append(header, ts...)
// Combine header and data
packet := append(header, p.Data...)
// Get next hop info
nextHop := t.NextHop(p.Destination)
if nextHop == nil {
return errors.New("no path to destination")
}
// Get interface for next hop
ifaceName := t.NextHopInterface(p.Destination)
iface, ok := t.interfaces[ifaceName]
if !ok {
return errors.New("interface not found")
}
// Send packet using interface's Send method
return iface.Send(packet, "")
}
func (t *Transport) sendPathRequest(req *PathRequest, interfaceName string) error {
// Create path request packet
packet := &PathRequestPacket{
Type: 0x01,
DestinationHash: req.DestinationHash,
Tag: req.Tag,
TTL: byte(req.TTL),
Recursive: req.Recursive,
}
// Serialize packet
buf := make([]byte, 0, 128)
buf = append(buf, packet.Type)
buf = append(buf, packet.DestinationHash...)
buf = append(buf, packet.Tag...)
buf = append(buf, packet.TTL)
if packet.Recursive {
buf = append(buf, 0x01)
} else {
buf = append(buf, 0x00)
}
// Get interface
iface, ok := t.interfaces[interfaceName]
if !ok {
return errors.New("interface not found")
}
return iface.Send(buf, "")
}
func (t *Transport) broadcastPathRequest(req *PathRequest) error {
var lastErr error
for _, iface := range t.interfaces {
if !iface.IsEnabled() {
continue
}
if err := t.sendPathRequest(req, iface.GetName()); err != nil {
lastErr = err
}
}
return lastErr
}
type PathRequestPacket struct {
Type byte // 0x01 for path request
DestinationHash []byte // 32 bytes
Tag []byte // Variable length
TTL byte
Recursive bool
}
type NetworkInterface struct {
Name string
Addr *net.UDPAddr
Conn *net.UDPConn
MTU int
Enabled bool
}
func SendAnnounce(packet []byte) error {
t := GetTransportInstance()
if t == nil {
return errors.New("transport not initialized")
}
// Send announce packet to all interfaces
var lastErr error
for _, iface := range t.interfaces {
if err := iface.Send(packet, ""); err != nil {
lastErr = err
}
}
return lastErr
}