From 7a7ce847788f9189027a06cfd4e12efbd99f8bee Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Mon, 30 Dec 2024 12:58:43 -0600 Subject: [PATCH] 0.2.8 --- cmd/client-ftp/main.go | 266 ++++++++++++++++++++++++++++++++++ cmd/client/client.go | 282 +++++++++++++++++++++++++++++------- cmd/reticulum/main.go | 33 ++++- configs/test-client1.toml | 8 +- configs/test-client2.toml | 8 +- internal/config/config.go | 2 +- pkg/announce/announce.go | 36 ++++- pkg/common/config.go | 23 +-- pkg/identity/identity.go | 40 +++-- pkg/interfaces/interface.go | 34 ++++- pkg/interfaces/tcp.go | 168 ++++++++++++--------- pkg/interfaces/udp.go | 99 +++++++++---- pkg/transport/transport.go | 41 +++++- scripts/run_clients.sh | 86 +++++++++-- 14 files changed, 919 insertions(+), 207 deletions(-) create mode 100644 cmd/client-ftp/main.go diff --git a/cmd/client-ftp/main.go b/cmd/client-ftp/main.go new file mode 100644 index 0000000..4afb41c --- /dev/null +++ b/cmd/client-ftp/main.go @@ -0,0 +1,266 @@ +package main + +import ( + "flag" + "fmt" + "io" + "log" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "github.com/Sudo-Ivan/reticulum-go/internal/config" + "github.com/Sudo-Ivan/reticulum-go/pkg/common" + "github.com/Sudo-Ivan/reticulum-go/pkg/destination" + "github.com/Sudo-Ivan/reticulum-go/pkg/identity" + "github.com/Sudo-Ivan/reticulum-go/pkg/link" + "github.com/Sudo-Ivan/reticulum-go/pkg/packet" + "github.com/Sudo-Ivan/reticulum-go/pkg/resource" + "github.com/Sudo-Ivan/reticulum-go/pkg/transport" +) + +const ( + APP_NAME = "example_utilities" + APP_ASPECT = "filetransfer" +) + +var ( + configPath = flag.String("config", "", "Path to config file") + servePath = flag.String("serve", "", "Directory to serve files from") +) + +type FileServer struct { + config *common.ReticulumConfig + transport *transport.Transport + interfaces []common.NetworkInterface + identity *identity.Identity + servePath string +} + +func NewFileServer(cfg *common.ReticulumConfig, servePath string) (*FileServer, error) { + if cfg == nil { + var err error + cfg, err = config.InitConfig() + if err != nil { + return nil, fmt.Errorf("failed to initialize config: %v", err) + } + } + + t, err := transport.NewTransport(cfg) + if err != nil { + return nil, fmt.Errorf("failed to initialize transport: %v", err) + } + + id, err := identity.New() + if err != nil { + return nil, fmt.Errorf("failed to create identity: %v", err) + } + + return &FileServer{ + config: cfg, + transport: t, + interfaces: make([]common.NetworkInterface, 0), + identity: id, + servePath: servePath, + }, nil +} + +func (s *FileServer) OnLinkEstablished(l *link.Link) { + s.handleLinkEstablished(l) +} + +func (s *FileServer) Start() error { + dest, err := destination.New( + s.identity, + destination.OUT, + destination.SINGLE, + APP_NAME, + APP_ASPECT, + ) + if err != nil { + return fmt.Errorf("failed to create destination: %v", err) + } + + callback := func(l interface{}) { + if link, ok := l.(*link.Link); ok { + s.OnLinkEstablished(link) + } + } + + dest.SetLinkEstablishedCallback(callback) + + log.Printf("File server started. Server hash: %s", s.identity.Hex()) + log.Printf("Serving directory: %s", s.servePath) + return nil +} + +func (s *FileServer) handleLinkEstablished(l *link.Link) { + log.Printf("Client connected") + + l.SetPacketCallback(func(data []byte, p *packet.Packet) { + s.handlePacket(data, l) + }) + + l.SetResourceCallback(func(r interface{}) bool { + if res, ok := r.(*resource.Resource); ok { + return s.handleResource(res) + } + return false + }) +} + +func (s *FileServer) handlePacket(data []byte, l *link.Link) { + if string(data) == "LIST" { + files, err := s.getFileList() + if err != nil { + log.Printf("Error getting file list: %v", err) + l.Teardown() + return + } + + if err := l.SendPacket(files); err != nil { + log.Printf("Error sending file list: %v", err) + l.Teardown() + } + } +} + +func (s *FileServer) handleResource(r *resource.Resource) bool { + filename := filepath.Join(s.servePath, r.GetName()) + file, err := os.Create(filename) + if err != nil { + log.Printf("Failed to create file: %v", err) + return false + } + defer file.Close() + + written, err := io.Copy(file, r) + if err != nil { + log.Printf("Failed to write file: %v", err) + return false + } + + log.Printf("Received file: %s (%d bytes)", filename, written) + return true +} + +func (s *FileServer) getFileList() ([]byte, error) { + files, err := os.ReadDir(s.servePath) + if err != nil { + return nil, err + } + + var fileList []string + for _, file := range files { + if !file.IsDir() { + fileList = append(fileList, file.Name()) + } + } + + return []byte(fmt.Sprintf("%v", fileList)), nil +} + +func main() { + flag.Parse() + + if *servePath == "" { + log.Fatal("Please specify a directory to serve with -serve") + } + + var cfg *common.ReticulumConfig + var err error + + if *configPath == "" { + cfg, err = config.InitConfig() + } else { + cfg, err = config.LoadConfig(*configPath) + } + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + server, err := NewFileServer(cfg, *servePath) + if err != nil { + log.Fatalf("Failed to create server: %v", err) + } + + if err := server.Start(); err != nil { + log.Fatalf("Failed to start server: %v", err) + } + + // Start watching the directory for changes + go server.watchDirectory() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan +} + +func (s *FileServer) watchDirectory() { + for { + time.Sleep(1 * time.Second) + files, err := os.ReadDir(s.servePath) + if err != nil { + log.Printf("Error reading directory: %v", err) + continue + } + + for _, file := range files { + if !file.IsDir() { + // Try to send file to connected peers + filePath := filepath.Join(s.servePath, file.Name()) + if err := s.sendFile(filePath); err != nil { + log.Printf("Error sending file %s: %v", file.Name(), err) + } + } + } + } +} + +func (s *FileServer) sendFile(filePath string) error { + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file: %v", err) + } + defer file.Close() + + // Create a destination for the file transfer + dest, err := destination.New( + s.identity, + destination.OUT, + destination.SINGLE, + APP_NAME, + APP_ASPECT, + ) + if err != nil { + return fmt.Errorf("failed to create destination: %v", err) + } + + // Set up link for file transfer + callback := func(l interface{}) { + if link, ok := l.(*link.Link); ok { + // Create a new resource with auto-compression enabled + res, err := resource.New(file, true) + if err != nil { + log.Printf("Error creating resource: %v", err) + return + } + + // The filename is automatically set from the file handle + // in resource.New when using an io.ReadWriteSeeker + + // Send the resource through the link + if err := link.SendResource(res); err != nil { + log.Printf("Error sending resource: %v", err) + return + } + log.Printf("File %s sent successfully", filepath.Base(filePath)) + } + } + + dest.SetLinkEstablishedCallback(callback) + + return nil +} diff --git a/cmd/client/client.go b/cmd/client/client.go index d636b48..9e62100 100644 --- a/cmd/client/client.go +++ b/cmd/client/client.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "flag" "fmt" + "io" "log" "os" "os/signal" @@ -64,53 +65,90 @@ func NewClient(cfg *common.ReticulumConfig) (*Client, error) { } func (c *Client) Start() error { - // Initialize interfaces - for _, ifaceConfig := range c.config.Interfaces { + log.Printf("Starting Reticulum client...") + log.Printf("Configuration: %+v", c.config) + + // Initialize transport + t, err := transport.NewTransport(c.config) + if err != nil { + return fmt.Errorf("failed to initialize transport: %v", err) + } + c.transport = t + log.Printf("Transport initialized") + + log.Printf("Initializing network interfaces...") + for name, ifaceConfig := range c.config.Interfaces { + if !ifaceConfig.Enabled { + log.Printf("Skipping disabled interface %s", name) + continue + } + + log.Printf("Configuring interface %s (%s)", name, ifaceConfig.Type) var iface common.NetworkInterface switch ifaceConfig.Type { case "TCPClientInterface": + log.Printf("Connecting to %s:%d via TCP...", ifaceConfig.TargetHost, ifaceConfig.TargetPort) client, err := interfaces.NewTCPClient( - ifaceConfig.Name, + name, ifaceConfig.TargetHost, ifaceConfig.TargetPort, ifaceConfig.KISSFraming, ifaceConfig.I2PTunneled, + ifaceConfig.Enabled, ) if err != nil { - return fmt.Errorf("failed to create TCP interface %s: %v", ifaceConfig.Name, err) + return fmt.Errorf("failed to create TCP interface %s: %v", name, err) } + + if err := client.Start(); err != nil { + return fmt.Errorf("failed to start TCP interface %s: %v", name, err) + } + iface = client + log.Printf("Successfully connected to %s:%d", ifaceConfig.TargetHost, ifaceConfig.TargetPort) case "UDPInterface": addr := fmt.Sprintf("%s:%d", ifaceConfig.Address, ifaceConfig.Port) + target := fmt.Sprintf("%s:%d", ifaceConfig.TargetHost, ifaceConfig.TargetPort) + log.Printf("Starting UDP interface on %s...", addr) udp, err := interfaces.NewUDPInterface( - ifaceConfig.Name, + name, addr, - "", // No target address for client initially + target, + ifaceConfig.Enabled, ) if err != nil { - return fmt.Errorf("failed to create UDP interface %s: %v", ifaceConfig.Name, err) + return fmt.Errorf("failed to create UDP interface %s: %v", name, err) } - iface = udp - default: - return fmt.Errorf("unsupported interface type: %s", ifaceConfig.Type) + if err := udp.Start(); err != nil { + return fmt.Errorf("failed to start UDP interface %s: %v", name, err) + } + + iface = udp + log.Printf("UDP interface listening on %s", addr) } - c.interfaces = append(c.interfaces, iface) - log.Printf("Created interface %s", iface.GetName()) + if iface != nil { + // Set packet callback + iface.SetPacketCallback(c.transport.HandlePacket) + c.interfaces = append(c.interfaces, iface) + log.Printf("Created and started interface %s (type=%v, enabled=%v)", + name, iface.GetType(), iface.IsEnabled()) + } } - // Start periodic announces - go func() { - for { - c.sendAnnounce() - time.Sleep(30 * time.Second) - } - }() + // Register announce handler with explicit type + var handler transport.AnnounceHandler = &ClientAnnounceHandler{client: c} + c.transport.RegisterAnnounceHandler(handler) + + // Send initial announce + log.Printf("Sending initial announce...") + if err := c.sendAnnounce(); err != nil { + log.Printf("Warning: Failed to send initial announce: %v", err) + } - log.Printf("Client started with %d interfaces", len(c.interfaces)) return nil } @@ -119,17 +157,33 @@ func (c *Client) handlePacket(data []byte, p *packet.Packet) { return } - packetType := data[0] + header := data[0] + packetType := header & 0x03 // Extract packet type from header + switch packetType { - case 0x04: // Announce packet - c.handleAnnounce(data[1:]) + case announce.PACKET_TYPE_ANNOUNCE: + log.Printf("Received announce packet:") + log.Printf(" Raw data: %x", data) + + // Create announce instance + a, err := announce.New(c.identity, []byte("RNS.Go.Client"), false) + if err != nil { + log.Printf("Failed to create announce handler: %v", err) + return + } + + // Handle the announce + if err := a.HandleAnnounce(data[1:]); err != nil { + log.Printf("Failed to handle announce: %v", err) + } + default: c.transport.HandlePacket(data, p) } } func (c *Client) handleAnnounce(data []byte) { - if len(data) < 42 { // 32 bytes hash + 8 bytes timestamp + 1 byte hops + 1 byte flags + if len(data) < 42 { log.Printf("Received malformed announce packet (too short)") return } @@ -144,33 +198,41 @@ func (c *Client) handleAnnounce(data []byte) { log.Printf(" Hops: %d", hops) log.Printf(" Flags: %x", flags) + // Extract public key if present (after flags) if len(data) > 42 { + pubKeyLen := 32 // Ed25519 public key length + pubKey := data[42 : 42+pubKeyLen] + log.Printf(" Public Key: %x", pubKey) + // Extract app data if present - dataLen := binary.BigEndian.Uint16(data[42:44]) - if len(data) >= 44+int(dataLen) { - appData := data[44 : 44+dataLen] - log.Printf(" App Data: %s", string(appData)) + var appData []byte + if len(data) > 42+pubKeyLen+2 { + dataLen := binary.BigEndian.Uint16(data[42+pubKeyLen : 42+pubKeyLen+2]) + if len(data) >= 42+pubKeyLen+2+int(dataLen) { + appData = data[42+pubKeyLen+2 : 42+pubKeyLen+2+int(dataLen)] + log.Printf(" App Data: %s", string(appData)) + } } + + // Store the identity for future use with all required parameters + if !identity.ValidateAnnounce(data, destHash, pubKey, data[len(data)-64:], appData) { + log.Printf("Failed to validate announce") + return + } + log.Printf("Successfully validated and stored announce") } } -func (c *Client) sendAnnounce() { +func (c *Client) sendAnnounce() error { + // Create announce packet + identityHash := c.identity.Hash() announceData := make([]byte, 0) - // Create header - header := announce.CreateHeader( - announce.IFAC_NONE, - announce.HEADER_TYPE_1, - 0x00, - announce.PROP_TYPE_BROADCAST, - announce.DEST_TYPE_SINGLE, - announce.PACKET_TYPE_ANNOUNCE, - 0x00, - ) + // Add header + header := []byte{0x01, 0x00} // Announce packet type announceData = append(announceData, header...) - // Add destination hash (16 bytes truncated) - identityHash := c.identity.Hash() + // Add destination hash announceData = append(announceData, identityHash...) // Add context byte @@ -197,17 +259,32 @@ func (c *Client) sendAnnounce() { log.Printf(" Packet Length: %d bytes", len(announceData)) log.Printf(" Full Packet: %x", announceData) + sentCount := 0 // Send on all interfaces for _, iface := range c.interfaces { - log.Printf("Sending on interface %s (%s):", iface.GetName(), iface.GetType()) + log.Printf("Attempting to send on interface %s:", iface.GetName()) + log.Printf(" Type: %v", iface.GetType()) log.Printf(" MTU: %d bytes", iface.GetMTU()) + log.Printf(" Status: enabled=%v", iface.IsEnabled()) + + if !iface.IsEnabled() { + log.Printf(" Skipping disabled interface") + continue + } if err := iface.Send(announceData, ""); err != nil { log.Printf(" Failed to send: %v", err) } else { log.Printf(" Successfully sent announce") + sentCount++ } } + + if sentCount == 0 { + return fmt.Errorf("no interfaces available to send announce") + } + + return nil } func (c *Client) Stop() { @@ -265,22 +342,67 @@ func (c *Client) handleLinkClosed(l *link.Link) { log.Printf("Link closed") } +type ClientAnnounceHandler struct { + client *Client +} + +func (h *ClientAnnounceHandler) AspectFilter() []string { + return []string{"RNS.Go.Client"} +} + +func (h *ClientAnnounceHandler) ReceivedAnnounce(destinationHash []byte, announcedIdentity interface{}, appData []byte) error { + log.Printf("=== Received Announce Details ===") + log.Printf("Destination Hash: %x", destinationHash) + log.Printf("App Data: %s", string(appData)) + + // Type assert the identity + if id, ok := announcedIdentity.(*identity.Identity); ok { + log.Printf("Identity Public Key: %x", id.GetPublicKey()) + + // Create packet hash for storage + packetHash := identity.TruncatedHash(append(destinationHash, id.GetPublicKey()...)) + log.Printf("Generated Packet Hash: %x", packetHash) + + // Store the peer identity with all required parameters + identity.Remember(packetHash, destinationHash, id.GetPublicKey(), appData) + log.Printf("Identity stored successfully") + log.Printf("===========================") + return nil + } + + log.Printf("Error: Invalid identity type") + log.Printf("===========================") + return fmt.Errorf("invalid identity type") +} + +func (h *ClientAnnounceHandler) ReceivePathResponses() bool { + return true +} + func main() { flag.Parse() + log.Printf("Starting Reticulum Go client...") + log.Printf("Config path: %s", *configPath) + log.Printf("Target hash: %s", *targetHash) + var cfg *common.ReticulumConfig var err error if *configPath == "" { + log.Printf("No config path specified, using default configuration") cfg, err = config.InitConfig() } else { + log.Printf("Loading configuration from: %s", *configPath) cfg, err = config.LoadConfig(*configPath) } if err != nil { log.Fatalf("Failed to load config: %v", err) } + log.Printf("Configuration loaded successfully") if *generateIdentity { + log.Printf("Generating new identity...") id, err := identity.New() if err != nil { log.Fatalf("Failed to generate identity: %v", err) @@ -299,29 +421,79 @@ func main() { log.Fatalf("Failed to start client: %v", err) } - // Wait for interrupt + log.Printf("Client running, press Ctrl+C to exit") + + // If target is specified, start interactive mode + if *targetHash != "" { + targetBytes, err := identity.HashFromString(*targetHash) + if err != nil { + log.Fatalf("Invalid target hash: %v", err) + } + link, err := client.transport.GetLink(targetBytes) + if err != nil { + log.Fatalf("Failed to get link: %v", err) + } + log.Printf("Starting interactive mode...") + interactiveLoop(link) + return + } + + // Wait for interrupt if no target specified sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan + log.Printf("Received interrupt signal, shutting down...") } func interactiveLoop(link *transport.Link) { reader := bufio.NewReader(os.Stdin) + connected := make(chan struct{}) + disconnected := make(chan struct{}) + + // Set up connection status handlers + link.OnConnected(func() { + connected <- struct{}{} + }) + + link.OnDisconnected(func() { + disconnected <- struct{}{} + }) + + // Wait for initial connection + select { + case <-connected: + log.Println("Connected to target") + case <-time.After(10 * time.Second): + log.Fatal("Connection timeout") + return + } + + // Start input loop for { - fmt.Print("> ") - input, err := reader.ReadString('\n') - if err != nil { - fmt.Printf("Error reading input: %v\n", err) - continue - } - - input = strings.TrimSpace(input) - if input == "quit" || input == "exit" { + select { + case <-disconnected: + log.Println("Connection lost") return - } + default: + fmt.Print("> ") + input, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + return + } + log.Printf("Error reading input: %v", err) + continue + } - if err := link.Send([]byte(input)); err != nil { - fmt.Printf("Failed to send: %v\n", err) + input = strings.TrimSpace(input) + if input == "quit" || input == "exit" { + return + } + + if err := link.Send([]byte(input)); err != nil { + log.Printf("Failed to send: %v", err) + return + } } } } diff --git a/cmd/reticulum/main.go b/cmd/reticulum/main.go index 39b9b58..5a06913 100644 --- a/cmd/reticulum/main.go +++ b/cmd/reticulum/main.go @@ -7,15 +7,15 @@ import ( "os/signal" "syscall" - "github.com/Sudo-Ivan/reticulum-go/pkg/common" "github.com/Sudo-Ivan/reticulum-go/internal/config" - "github.com/Sudo-Ivan/reticulum-go/pkg/transport" + "github.com/Sudo-Ivan/reticulum-go/pkg/common" "github.com/Sudo-Ivan/reticulum-go/pkg/interfaces" + "github.com/Sudo-Ivan/reticulum-go/pkg/transport" ) type Reticulum struct { - config *common.ReticulumConfig - transport *transport.Transport + config *common.ReticulumConfig + transport *transport.Transport interfaces []interfaces.Interface } @@ -48,11 +48,18 @@ func (r *Reticulum) Start() error { ifaceConfig.TargetPort, ifaceConfig.KISSFraming, ifaceConfig.I2PTunneled, + ifaceConfig.Enabled, ) if err != nil { log.Printf("Failed to create TCP interface %s: %v", ifaceConfig.Name, err) continue } + + if err := client.Start(); err != nil { + log.Printf("Failed to start TCP interface %s: %v", ifaceConfig.Name, err) + continue + } + iface = client case "TCPServerInterface": @@ -68,19 +75,33 @@ func (r *Reticulum) Start() error { log.Printf("Failed to create TCP server interface %s: %v", ifaceConfig.Name, err) continue } + + if err := server.Start(); err != nil { + log.Printf("Failed to start TCP server interface %s: %v", ifaceConfig.Name, err) + continue + } + iface = server case "UDPInterface": addr := fmt.Sprintf("%s:%d", ifaceConfig.Address, ifaceConfig.Port) + udp, err := interfaces.NewUDPInterface( ifaceConfig.Name, addr, "", // No target address for server initially + ifaceConfig.Enabled, ) if err != nil { log.Printf("Failed to create UDP interface %s: %v", ifaceConfig.Name, err) continue } + + if err := udp.Start(); err != nil { + log.Printf("Failed to start UDP interface %s: %v", ifaceConfig.Name, err) + continue + } + iface = udp case "AutoInterface": @@ -96,6 +117,8 @@ func (r *Reticulum) Start() error { // Set packet callback to transport iface.SetPacketCallback(r.transport.HandlePacket) r.interfaces = append(r.interfaces, iface) + log.Printf("Created and started interface %s (type=%v, enabled=%v)", + iface.GetName(), iface.GetType(), iface.IsEnabled()) } } @@ -137,4 +160,4 @@ func main() { if err := r.Stop(); err != nil { log.Printf("Error during shutdown: %v", err) } -} \ No newline at end of file +} diff --git a/configs/test-client1.toml b/configs/test-client1.toml index 62b672f..3d21b3b 100644 --- a/configs/test-client1.toml +++ b/configs/test-client1.toml @@ -9,10 +9,14 @@ loglevel = 4 [interfaces."Local TCP"] type = "TCPClientInterface" enabled = true - target_host = "127.0.0.1" + target_host = "rns.quad4.io" target_port = 4242 [interfaces."Local UDP"] type = "UDPInterface" enabled = true - interface = "lo" \ No newline at end of file + interface = "lo" + address = "127.0.0.1" + port = 37428 + target_address = "127.0.0.1" + target_port = 37430 \ No newline at end of file diff --git a/configs/test-client2.toml b/configs/test-client2.toml index f289188..4dc7b41 100644 --- a/configs/test-client2.toml +++ b/configs/test-client2.toml @@ -9,10 +9,14 @@ loglevel = 4 [interfaces."Local TCP"] type = "TCPClientInterface" enabled = true - target_host = "127.0.0.1" + target_host = "rns.quad4.io" target_port = 4243 [interfaces."Local UDP"] type = "UDPInterface" enabled = true - interface = "lo" \ No newline at end of file + interface = "lo" + address = "127.0.0.1" + port = 37430 + target_address = "127.0.0.1" + target_port = 37428 \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index c3b714c..690bf3a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -93,7 +93,7 @@ func CreateDefaultConfig(path string) error { Type: "UDPInterface", Enabled: true, Address: "0.0.0.0", - Port: 37428, // Default RNS port + Port: 37696, // Default RNS port } data, err := toml.Marshal(cfg) diff --git a/pkg/announce/announce.go b/pkg/announce/announce.go index 0942af9..97d5368 100644 --- a/pkg/announce/announce.go +++ b/pkg/announce/announce.go @@ -93,16 +93,19 @@ func New(dest *identity.Identity, appData []byte, pathResponse bool) (*Announce, func (a *Announce) Propagate(interfaces []common.NetworkInterface) error { packet := a.CreatePacket() - log.Printf("Propagating announce:") + // Enhanced logging + log.Printf("Creating announce packet:") log.Printf(" Destination Hash: %x", a.destinationHash) - log.Printf(" Public Key: %x", a.identity.GetPublicKey()) + log.Printf(" Identity Public Key: %x", a.identity.GetPublicKey()) log.Printf(" App Data: %s", string(a.appData)) - log.Printf(" Packet Size: %d bytes", len(packet)) - log.Printf(" Full Packet: %x", packet) + log.Printf(" Signature: %x", a.signature) + log.Printf(" Total Packet Size: %d bytes", len(packet)) + log.Printf(" Raw Packet: %x", packet) // Propagate to interfaces for _, iface := range interfaces { - log.Printf("Propagating on interface %s (%s):", iface.GetName(), iface.GetType()) + log.Printf("Propagating on interface %s:", iface.GetName()) + log.Printf(" Interface Type: %s", iface.GetType()) log.Printf(" MTU: %d bytes", iface.GetMTU()) if err := iface.Send(packet, ""); err != nil { @@ -136,18 +139,28 @@ func (a *Announce) HandleAnnounce(data []byte) error { a.mutex.Lock() defer a.mutex.Unlock() + // Enhanced validation logging + log.Printf("Received announce data (%d bytes):", len(data)) + log.Printf(" Raw Data: %x", data) + // Validate announce data if len(data) < 16+32+1 { // Min size: hash + pubkey + hops + log.Printf(" Error: Invalid announce data length (got %d, need at least %d)", + len(data), 16+32+1) return errors.New("invalid announce data") } - // Extract fields + // Extract and log fields destHash := data[:16] publicKey := data[16:48] hopCount := data[48] - // Validate hop count + log.Printf(" Destination Hash: %x", destHash) + log.Printf(" Public Key: %x", publicKey) + log.Printf(" Hop Count: %d", hopCount) + if hopCount > MAX_HOPS { + log.Printf(" Error: Exceeded maximum hop count (%d > %d)", hopCount, MAX_HOPS) return errors.New("announce exceeded maximum hop count") } @@ -155,27 +168,36 @@ func (a *Announce) HandleAnnounce(data []byte) error { appData := data[49 : len(data)-64] signature := data[len(data)-64:] + log.Printf(" App Data (%d bytes): %s", len(appData), string(appData)) + log.Printf(" Signature: %x", signature) + // Create announced identity from public key announcedIdentity := identity.FromPublicKey(publicKey) if announcedIdentity == nil { + log.Printf(" Error: Invalid identity public key") return errors.New("invalid identity public key") } // Verify signature signData := append(destHash, appData...) if !announcedIdentity.Verify(signData, signature) { + log.Printf(" Error: Invalid announce signature") return errors.New("invalid announce signature") } + log.Printf(" Signature verification successful") + // Process announce with registered handlers for _, handler := range a.handlers { if handler.ReceivePathResponses() || !a.pathResponse { if err := handler.ReceivedAnnounce(destHash, announcedIdentity, appData); err != nil { + log.Printf(" Handler error: %v", err) return err } } } + log.Printf(" Successfully processed announce") return nil } diff --git a/pkg/common/config.go b/pkg/common/config.go index 6c35843..13577bb 100644 --- a/pkg/common/config.go +++ b/pkg/common/config.go @@ -9,17 +9,18 @@ type ConfigProvider interface { // InterfaceConfig represents interface configuration type InterfaceConfig struct { - Type string `toml:"type"` - Name string `toml:"name"` - Enabled bool `toml:"enabled"` - TargetHost string `toml:"target_host,omitempty"` - TargetPort int `toml:"target_port,omitempty"` - Interface string `toml:"interface,omitempty"` - Address string `toml:"address,omitempty"` - Port int `toml:"port,omitempty"` - KISSFraming bool `toml:"kiss_framing,omitempty"` - I2PTunneled bool `toml:"i2p_tunneled,omitempty"` - PreferIPv6 bool `toml:"prefer_ipv6,omitempty"` + Name string `toml:"name"` + Type string `toml:"type"` + Enabled bool `toml:"enabled"` + Address string `toml:"address"` + Port int `toml:"port"` + TargetHost string `toml:"target_host"` + TargetPort int `toml:"target_port"` + TargetAddress string `toml:"target_address"` + Interface string `toml:"interface"` + KISSFraming bool `toml:"kiss_framing"` + I2PTunneled bool `toml:"i2p_tunneled"` + PreferIPv6 bool `toml:"prefer_ipv6"` } // ReticulumConfig represents the main configuration structure diff --git a/pkg/identity/identity.go b/pkg/identity/identity.go index b299277..9fe9f33 100644 --- a/pkg/identity/identity.go +++ b/pkg/identity/identity.go @@ -15,6 +15,8 @@ import ( "sync" "time" + "encoding/hex" + "golang.org/x/crypto/curve25519" "golang.org/x/crypto/hkdf" ) @@ -448,19 +450,19 @@ func (i *Identity) DecryptWithHMAC(data []byte, key []byte) ([]byte, error) { func (i *Identity) ToFile(path string) error { data := map[string]interface{}{ - "private_key": i.privateKey, - "public_key": i.publicKey, - "signing_key": i.signingKey, + "private_key": i.privateKey, + "public_key": i.publicKey, + "signing_key": i.signingKey, "verification_key": i.verificationKey, - "app_data": i.appData, + "app_data": i.appData, } - + file, err := os.Create(path) if err != nil { return err } defer file.Close() - + return json.NewEncoder(file).Encode(data) } @@ -470,22 +472,30 @@ func RecallIdentity(path string) (*Identity, error) { return nil, err } defer file.Close() - + var data map[string]interface{} if err := json.NewDecoder(file).Decode(&data); err != nil { return nil, err } - + // Reconstruct identity from saved data id := &Identity{ - privateKey: data["private_key"].([]byte), - publicKey: data["public_key"].([]byte), - signingKey: data["signing_key"].(ed25519.PrivateKey), + privateKey: data["private_key"].([]byte), + publicKey: data["public_key"].([]byte), + signingKey: data["signing_key"].(ed25519.PrivateKey), verificationKey: data["verification_key"].(ed25519.PublicKey), - appData: data["app_data"].([]byte), - ratchets: make(map[string][]byte), - ratchetExpiry: make(map[string]int64), + appData: data["app_data"].([]byte), + ratchets: make(map[string][]byte), + ratchetExpiry: make(map[string]int64), } - + return id, nil } + +func HashFromString(hash string) ([]byte, error) { + if len(hash) != 32 { + return nil, fmt.Errorf("invalid hash length: expected 32, got %d", len(hash)) + } + + return hex.DecodeString(hash) +} diff --git a/pkg/interfaces/interface.go b/pkg/interfaces/interface.go index ba17711..715bab0 100644 --- a/pkg/interfaces/interface.go +++ b/pkg/interfaces/interface.go @@ -21,7 +21,10 @@ type Interface interface { GetMode() common.InterfaceMode IsOnline() bool IsDetached() bool + IsEnabled() bool Detach() + Enable() + Disable() Send(data []byte, addr string) error SetPacketCallback(common.PacketCallback) GetPacketCallback() common.PacketCallback @@ -33,12 +36,25 @@ type BaseInterface struct { mode common.InterfaceMode ifType common.InterfaceType online bool + enabled bool detached bool mtu int mutex sync.RWMutex packetCallback common.PacketCallback } +func NewBaseInterface(name string, ifType common.InterfaceType, enabled bool) BaseInterface { + return BaseInterface{ + name: name, + mode: common.IF_MODE_FULL, + ifType: ifType, + online: false, + enabled: enabled, + detached: false, + mtu: common.DEFAULT_MTU, + } +} + func (i *BaseInterface) SetPacketCallback(callback common.PacketCallback) { i.Mutex.Lock() defer i.Mutex.Unlock() @@ -136,5 +152,21 @@ func (i *BaseInterface) GetConn() net.Conn { } func (i *BaseInterface) IsEnabled() bool { - return i.Online && !i.Detached + i.mutex.RLock() + defer i.mutex.RUnlock() + return i.enabled && i.online && !i.detached +} + +func (i *BaseInterface) Enable() { + i.mutex.Lock() + defer i.mutex.Unlock() + i.enabled = true + i.online = true +} + +func (i *BaseInterface) Disable() { + i.mutex.Lock() + defer i.mutex.Unlock() + i.enabled = false + i.online = false } diff --git a/pkg/interfaces/tcp.go b/pkg/interfaces/tcp.go index e403a91..267abec 100644 --- a/pkg/interfaces/tcp.go +++ b/pkg/interfaces/tcp.go @@ -44,85 +44,56 @@ type TCPClientInterface struct { packetCallback common.PacketCallback mutex sync.RWMutex detached bool + enabled bool } -func NewTCPClient(name string, targetAddr string, targetPort int, kissFraming bool, i2pTunneled bool) (*TCPClientInterface, error) { +func NewTCPClient(name string, targetHost string, targetPort int, kissFraming bool, i2pTunneled bool, enabled bool) (*TCPClientInterface, error) { tc := &TCPClientInterface{ - BaseInterface: BaseInterface{ - name: name, - mode: common.IF_MODE_FULL, - ifType: common.IF_TYPE_TCP, - online: false, - mtu: 1064, - detached: false, - }, - targetAddr: targetAddr, - targetPort: targetPort, - kissFraming: kissFraming, - i2pTunneled: i2pTunneled, - initiator: true, + BaseInterface: NewBaseInterface(name, common.IF_TYPE_TCP, enabled), + targetAddr: targetHost, + targetPort: targetPort, + kissFraming: kissFraming, + i2pTunneled: i2pTunneled, + initiator: true, + enabled: enabled, } - if err := tc.connect(true); err != nil { - go tc.reconnect() - } else { - go tc.readLoop() + if enabled { + addr := fmt.Sprintf("%s:%d", targetHost, targetPort) + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + tc.conn = conn + tc.online = true } return tc, nil } -func (tc *TCPClientInterface) connect(initial bool) error { +func (tc *TCPClientInterface) Start() error { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + if !tc.enabled { + return fmt.Errorf("interface not enabled") + } + + if tc.conn != nil { + tc.online = true + return nil + } + addr := fmt.Sprintf("%s:%d", tc.targetAddr, tc.targetPort) - conn, err := net.DialTimeout("tcp", addr, time.Second*INITIAL_TIMEOUT) + conn, err := net.Dial("tcp", addr) if err != nil { - if initial { - return fmt.Errorf("initial connection failed: %v", err) - } return err } - tc.conn = conn - tc.Online = true - tc.writing = false - tc.neverConnected = false - - // Set TCP options - if tcpConn, ok := conn.(*net.TCPConn); ok { - tcpConn.SetNoDelay(true) - tcpConn.SetKeepAlive(true) - tcpConn.SetKeepAlivePeriod(time.Second * TCP_PROBE_INTERVAL) - } - + tc.online = true return nil } -func (tc *TCPClientInterface) reconnect() { - if tc.initiator && !tc.reconnecting { - tc.reconnecting = true - attempts := 0 - - for !tc.Online { - time.Sleep(time.Second * RECONNECT_WAIT) - attempts++ - - if tc.maxReconnectTries > 0 && attempts > tc.maxReconnectTries { - tc.teardown() - break - } - - if err := tc.connect(false); err != nil { - continue - } - - go tc.readLoop() - break - } - - tc.reconnecting = false - } -} - func (tc *TCPClientInterface) readLoop() { buffer := make([]byte, tc.MTU) inFrame := false @@ -281,7 +252,9 @@ func (tc *TCPClientInterface) SetPacketCallback(cb common.PacketCallback) { } func (tc *TCPClientInterface) IsEnabled() bool { - return tc.Online + tc.mutex.RLock() + defer tc.mutex.RUnlock() + return tc.enabled && tc.online && !tc.detached } func (tc *TCPClientInterface) GetName() string { @@ -306,17 +279,68 @@ func (tc *TCPClientInterface) IsOnline() bool { return tc.online } +func (tc *TCPClientInterface) reconnect() { + tc.mutex.Lock() + if tc.reconnecting { + tc.mutex.Unlock() + return + } + tc.reconnecting = true + tc.mutex.Unlock() + + retries := 0 + for retries < tc.maxReconnectTries { + tc.teardown() + + addr := fmt.Sprintf("%s:%d", tc.targetAddr, tc.targetPort) + conn, err := net.Dial("tcp", addr) + if err == nil { + tc.mutex.Lock() + tc.conn = conn + tc.online = true + tc.neverConnected = false + tc.reconnecting = false + tc.mutex.Unlock() + + // Restart read loop + go tc.readLoop() + return + } + + retries++ + // Wait before retrying + select { + case <-time.After(RECONNECT_WAIT * time.Second): + continue + } + } + + // Failed to reconnect after max retries + tc.mutex.Lock() + tc.reconnecting = false + tc.mutex.Unlock() + tc.teardown() +} + +func (tc *TCPClientInterface) Enable() { + tc.mutex.Lock() + defer tc.mutex.Unlock() + tc.online = true +} + +func (tc *TCPClientInterface) Disable() { + tc.mutex.Lock() + defer tc.mutex.Unlock() + tc.online = false +} + type TCPServerInterface struct { BaseInterface - listener net.Listener connections map[string]net.Conn mutex sync.RWMutex bindAddr string bindPort int preferIPv6 bool - spawned bool - port int - host string kissFraming bool i2pTunneled bool packetCallback common.PacketCallback @@ -387,3 +411,15 @@ func (ts *TCPServerInterface) IsOnline() bool { defer ts.mutex.RUnlock() return ts.online } + +func (ts *TCPServerInterface) Enable() { + ts.mutex.Lock() + defer ts.mutex.Unlock() + ts.online = true +} + +func (ts *TCPServerInterface) Disable() { + ts.mutex.Lock() + defer ts.mutex.Unlock() + ts.online = false +} diff --git a/pkg/interfaces/udp.go b/pkg/interfaces/udp.go index 0380c6b..e6cb3f8 100644 --- a/pkg/interfaces/udp.go +++ b/pkg/interfaces/udp.go @@ -2,6 +2,7 @@ package interfaces import ( "fmt" + "log" "net" "sync" @@ -19,9 +20,10 @@ type UDPInterface struct { rxBytes uint64 mtu int bitrate int + enabled bool } -func NewUDPInterface(name string, addr string, target string) (*UDPInterface, error) { +func NewUDPInterface(name string, addr string, target string, enabled bool) (*UDPInterface, error) { udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err @@ -36,17 +38,12 @@ func NewUDPInterface(name string, addr string, target string) (*UDPInterface, er } ui := &UDPInterface{ - BaseInterface: BaseInterface{ - name: name, - mode: common.IF_MODE_FULL, - ifType: common.IF_TYPE_UDP, - online: false, - mtu: common.DEFAULT_MTU, - detached: false, - }, - addr: udpAddr, - targetAddr: targetAddr, - readBuffer: make([]byte, common.DEFAULT_MTU), + BaseInterface: NewBaseInterface(name, common.IF_TYPE_UDP, enabled), + addr: udpAddr, + targetAddr: targetAddr, + readBuffer: make([]byte, common.DEFAULT_MTU), + mtu: common.DEFAULT_MTU, + enabled: enabled, } return ui, nil @@ -86,29 +83,16 @@ func (ui *UDPInterface) Detach() { } func (ui *UDPInterface) Send(data []byte, addr string) error { - if !ui.IsOnline() { - return fmt.Errorf("interface offline") + if !ui.IsEnabled() { + return fmt.Errorf("interface not enabled") } - targetAddr := ui.targetAddr - if addr != "" { - var err error - targetAddr, err = net.ResolveUDPAddr("udp", addr) - if err != nil { - return fmt.Errorf("invalid target address: %v", err) - } - } - - if targetAddr == nil { + if ui.targetAddr == nil { return fmt.Errorf("no target address configured") } - _, err := ui.conn.WriteToUDP(data, targetAddr) - if err != nil { - return fmt.Errorf("UDP write failed: %v", err) - } - - return nil + _, err := ui.conn.WriteTo(data, ui.targetAddr) + return err } func (ui *UDPInterface) SetPacketCallback(callback common.PacketCallback) { @@ -173,3 +157,58 @@ func (ui *UDPInterface) GetMTU() int { func (ui *UDPInterface) GetBitrate() int { return ui.bitrate } + +func (ui *UDPInterface) Enable() { + ui.mutex.Lock() + defer ui.mutex.Unlock() + ui.online = true +} + +func (ui *UDPInterface) Disable() { + ui.mutex.Lock() + defer ui.mutex.Unlock() + ui.online = false +} + +func (ui *UDPInterface) Start() error { + conn, err := net.ListenUDP("udp", ui.addr) + if err != nil { + return err + } + ui.conn = conn + ui.online = true + return nil +} + +func (ui *UDPInterface) readLoop() { + buffer := make([]byte, ui.mtu) + for { + if ui.IsDetached() { + return + } + + n, addr, err := ui.conn.ReadFromUDP(buffer) + if err != nil { + if !ui.IsDetached() { + log.Printf("UDP read error: %v", err) + } + return + } + + ui.mutex.Lock() + ui.rxBytes += uint64(n) + ui.mutex.Unlock() + + log.Printf("Received %d bytes from %s", n, addr.String()) + + if callback := ui.GetPacketCallback(); callback != nil { + callback(buffer[:n], ui) + } + } +} + +func (ui *UDPInterface) IsEnabled() bool { + ui.mutex.RLock() + defer ui.mutex.RUnlock() + return ui.enabled && ui.online && !ui.detached +} diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index a7dd9dd..a0c5b88 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -54,6 +54,7 @@ type Transport struct { mutex sync.RWMutex handlerLock sync.RWMutex pathLock sync.RWMutex + links map[string]*Link } func NewTransport(config *common.ReticulumConfig) (*Transport, error) { @@ -61,6 +62,7 @@ func NewTransport(config *common.ReticulumConfig) (*Transport, error) { config: config, interfaces: make(map[string]common.NetworkInterface), paths: make(map[string]*common.Path), + links: make(map[string]*Link), } transportMutex.Lock() @@ -126,6 +128,8 @@ type Link struct { packetCb func([]byte) resourceCb func(interface{}) bool resourceStrategy int + connectedCb func() + disconnectedCb func() } type Destination struct { @@ -183,6 +187,9 @@ func (l *Link) SetResourceCallback(cb func(interface{}) bool) { } func (l *Link) Teardown() { + if l.disconnectedCb != nil { + l.disconnectedCb() + } if l.closedCb != nil { l.closedCb() } @@ -206,7 +213,9 @@ func (l *Link) Send(data []byte) error { } type AnnounceHandler interface { - ReceivedAnnounce(destinationHash []byte, identity interface{}, appData []byte) + AspectFilter() []string + ReceivedAnnounce(destinationHash []byte, announcedIdentity interface{}, appData []byte) error + ReceivePathResponses() bool } func (t *Transport) RegisterAnnounceHandler(handler AnnounceHandler) { @@ -630,3 +639,33 @@ func (t *Transport) SendPacket(p *packet.Packet) error { return nil } + +func (t *Transport) GetLink(destHash []byte) (*Link, error) { + t.mutex.RLock() + defer t.mutex.RUnlock() + + link, exists := t.links[string(destHash)] + if !exists { + // Create new link if it doesn't exist + link = NewLink( + destHash, + nil, // established callback + nil, // closed callback + ) + t.links[string(destHash)] = link + } + + return link, nil +} + +func (l *Link) OnConnected(cb func()) { + l.connectedCb = cb + // If already established, trigger callback immediately + if !l.establishedAt.IsZero() && cb != nil { + cb() + } +} + +func (l *Link) OnDisconnected(cb func()) { + l.disconnectedCb = cb +} diff --git a/scripts/run_clients.sh b/scripts/run_clients.sh index 4b3c22b..65671b6 100755 --- a/scripts/run_clients.sh +++ b/scripts/run_clients.sh @@ -1,10 +1,47 @@ #!/bin/bash -# Build the client and server -echo "Building Reticulum client..." -go build -o bin/reticulum-client ./cmd/client +# Function to show usage +show_usage() { + echo "Usage: $0 [--type TYPE]" + echo " --type Type of client to run (default: client, options: client, ftp)" + exit 1 +} + +# Parse command line arguments +CLIENT_TYPE="client" +while [[ $# -gt 0 ]]; do + case $1 in + --type) + CLIENT_TYPE="$2" + shift 2 + ;; + *) + show_usage + ;; + esac +done + +# Validate client type +if [[ "$CLIENT_TYPE" != "client" && "$CLIENT_TYPE" != "ftp" ]]; then + echo "Error: Invalid client type. Must be 'client' or 'ftp'" + show_usage +fi + +# Build the appropriate binaries +echo "Building Reticulum binaries..." go build -o bin/reticulum ./cmd/reticulum +case $CLIENT_TYPE in + "client") + go build -o bin/reticulum-client ./cmd/client + CLIENT_BIN="reticulum-client" + ;; + "ftp") + go build -o bin/reticulum-client-ftp ./cmd/client-ftp + CLIENT_BIN="reticulum-client-ftp" + ;; +esac + # Check if build was successful if [ $? -ne 0 ]; then echo "Build failed!" @@ -23,8 +60,8 @@ sleep 2 # Give server time to start # Generate identities for both clients echo "Generating identities..." -CLIENT1_HASH=$(./bin/reticulum-client -config configs/test-client1.toml -generate-identity 2>&1 | grep "Identity hash:" | cut -d' ' -f3) -CLIENT2_HASH=$(./bin/reticulum-client -config configs/test-client2.toml -generate-identity 2>&1 | grep "Identity hash:" | cut -d' ' -f3) +CLIENT1_HASH=$(./bin/"$CLIENT_BIN" -config configs/test-client1.toml -generate-identity 2>&1 | grep "Identity hash:" | cut -d' ' -f3) +CLIENT2_HASH=$(./bin/"$CLIENT_BIN" -config configs/test-client2.toml -generate-identity 2>&1 | grep "Identity hash:" | cut -d' ' -f3) echo "Client 1 Hash: $CLIENT1_HASH" echo "Client 2 Hash: $CLIENT2_HASH" @@ -34,15 +71,35 @@ run_client() { local config=$1 local target=$2 local logfile=$3 - echo "Starting client with config: $config targeting: $target" - ./bin/reticulum-client -config "$config" -target "$target" > "$logfile" 2>&1 & + + case $CLIENT_TYPE in + "client") + echo "Starting regular client with config: $config targeting: $target" + ./bin/"$CLIENT_BIN" -config "$config" -target "$target" > "$logfile" 2>&1 & + ;; + "ftp") + echo "Starting FTP client with config: $config serving directory: $target" + ./bin/"$CLIENT_BIN" -config "$config" -serve "$target" > "$logfile" 2>&1 & + ;; + esac + echo $! > "$logfile.pid" echo "Client started with PID: $(cat $logfile.pid)" } -# Run both clients targeting each other -run_client "configs/test-client1.toml" "$CLIENT2_HASH" "logs/client1.log" -run_client "configs/test-client2.toml" "$CLIENT1_HASH" "logs/client2.log" +# Run both clients with appropriate parameters +case $CLIENT_TYPE in + "client") + run_client "configs/test-client1.toml" "$CLIENT2_HASH" "logs/client1.log" + run_client "configs/test-client2.toml" "$CLIENT1_HASH" "logs/client2.log" + ;; + "ftp") + # Create shared directories for FTP clients + mkdir -p ./shared/client1 ./shared/client2 + run_client "configs/test-client1.toml" "./shared/client1" "logs/client1.log" "$CLIENT2_HASH" + run_client "configs/test-client2.toml" "./shared/client2" "logs/client2.log" "$CLIENT1_HASH" + ;; +esac echo echo "Both clients are running. To stop everything:" @@ -50,4 +107,11 @@ echo "kill \$(cat logs/*.pid)" echo echo "To view logs:" echo "tail -f logs/client1.log" -echo "tail -f logs/client2.log" \ No newline at end of file +echo "tail -f logs/client2.log" + +if [ "$CLIENT_TYPE" = "ftp" ]; then + echo + echo "FTP shared directories:" + echo "./shared/client1" + echo "./shared/client2" +fi \ No newline at end of file