#!/usr/bin/env python3 """RNS FileSync - Peer-to-Peer File Synchronization over Reticulum. This module provides a file synchronization system that allows peers to synchronize files over the Reticulum Network Stack (RNS). It includes a terminal user interface (TUI) for monitoring and managing file sync operations, permission-based access control, and delta synchronization for efficient file transfers. """ import argparse import hashlib import json import os import shutil import sys import threading import time from collections import deque import RNS from RNS.vendor import umsgpack APP_NAME = "rns_filesync" APP_TIMEOUT = 30.0 BLOCK_SIZE = 4096 CHUNK_SIZE = 7000 SCAN_INTERVAL = 5.0 MAX_PACKET_DATA_SIZE = 256 peer_identity = None peer_destination = None connected_peers = [] connected_peers_lock = threading.Lock() file_monitor_active = False sync_directory = None file_hashes = {} file_blocks = {} file_hashes_lock = threading.Lock() known_peers = set() peer_permissions = {} permissions_lock = threading.Lock() whitelist_enabled = False transfer_stats = { "last_transfer_bytes": 0, "last_transfer_time": 0, "last_transfer_start": 0, "current_speed": 0, } transfer_stats_lock = threading.Lock() active_outgoing_transfers = {} active_outgoing_transfers_lock = threading.Lock() active_resources = {} active_resources_lock = threading.Lock() class Colors: """ANSI color codes for terminal output.""" RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" BLACK = "\033[30m" RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" WHITE = "\033[37m" BG_BLACK = "\033[40m" BG_RED = "\033[41m" BG_GREEN = "\033[42m" BG_YELLOW = "\033[43m" BG_BLUE = "\033[44m" BG_MAGENTA = "\033[45m" BG_CYAN = "\033[46m" BG_WHITE = "\033[47m" BRIGHT_BLACK = "\033[90m" BRIGHT_RED = "\033[91m" BRIGHT_GREEN = "\033[92m" BRIGHT_YELLOW = "\033[93m" BRIGHT_BLUE = "\033[94m" BRIGHT_MAGENTA = "\033[95m" BRIGHT_CYAN = "\033[96m" BRIGHT_WHITE = "\033[97m" class SimpleTUI: """Terminal User Interface for RNS FileSync. Provides a text-based interface for monitoring file synchronization, viewing connected peers, browsing remote files, and managing sync operations. """ def __init__(self): """Initialize the TUI with default settings.""" self.enabled = True self.terminal_height = 0 self.terminal_width = 0 self.log_lines = deque(maxlen=1000) self.log_lock = threading.Lock() self.status_info = { "files": 0, "peers": 0, "identity": "", "destination": "", "directory": "", "permissions": False, "speed": 0, } self.status_lock = threading.Lock() self.update_terminal_size() self.refresh_timer = None self.current_input = "" self.input_lock = threading.Lock() self.view_mode = "files" self.view_lock = threading.Lock() self.file_list = [] self.file_list_lock = threading.Lock() self.scroll_offset = 0 self.browser_peer = None self.remote_files = [] self.remote_files_lock = threading.Lock() def update_terminal_size(self): """Update terminal dimensions, falling back to defaults on error.""" try: size = shutil.get_terminal_size() self.terminal_width = size.columns self.terminal_height = size.lines except Exception: self.terminal_width = 80 self.terminal_height = 24 def clear_screen(self): """Clear the terminal screen.""" sys.stdout.write("\033[2J") sys.stdout.write("\033[H") sys.stdout.flush() def move_cursor(self, row, col): """Move cursor to specified position. Args: row: Row position (1-indexed). col: Column position (1-indexed). """ sys.stdout.write(f"\033[{row};{col}H") def clear_line(self): """Clear the current line.""" sys.stdout.write("\033[2K") def hide_cursor(self): """Hide the terminal cursor.""" sys.stdout.write("\033[?25l") def show_cursor(self): """Show the terminal cursor.""" sys.stdout.write("\033[?25h") def add_log(self, message, level="INFO"): """Add a log message to the TUI log buffer. Args: message: Log message text. level: Log level (string or RNS log constant). """ with self.log_lock: timestamp = time.strftime("%H:%M:%S") if level in ("ERROR", RNS.LOG_ERROR): color = Colors.RED level_str = "ERR" elif level in ("WARNING", RNS.LOG_WARNING): color = Colors.YELLOW level_str = "WARN" elif level in ("NOTICE", RNS.LOG_NOTICE): color = Colors.BRIGHT_CYAN level_str = "NOTE" elif level in ("INFO", RNS.LOG_INFO): color = Colors.GREEN level_str = "INFO" elif level in ("VERBOSE", RNS.LOG_VERBOSE): color = Colors.BRIGHT_BLACK level_str = "VERB" elif level in ("DEBUG", RNS.LOG_DEBUG): color = Colors.BRIGHT_BLACK level_str = "DBG" else: color = Colors.WHITE level_str = "LOG" formatted = f"{Colors.BRIGHT_BLACK}[{timestamp}]{Colors.RESET} {color}{level_str:4}{Colors.RESET} {message}" self.log_lines.append(formatted) def update_status(self, **kwargs): """Update status information displayed in the TUI. Args: **kwargs: Status fields to update (files, peers, identity, etc.). """ with self.status_lock: self.status_info.update(kwargs) def format_speed(self, bytes_per_sec): """Format transfer speed in human-readable units. Args: bytes_per_sec: Speed in bytes per second. Returns: Formatted speed string (Gbps, Mbps, Kbps, or bps), or None if zero. """ if bytes_per_sec == 0: return None bits_per_sec = bytes_per_sec * 8 if bits_per_sec >= 1_000_000_000: return f"{bits_per_sec / 1_000_000_000:.2f} Gbps" if bits_per_sec >= 1_000_000: return f"{bits_per_sec / 1_000_000:.2f} Mbps" if bits_per_sec >= 1_000: return f"{bits_per_sec / 1_000:.2f} Kbps" return f"{bits_per_sec:.0f} bps" def format_size(self, size): """Format file size in human-readable units. Args: size: Size in bytes. Returns: Formatted size string with appropriate unit. """ for unit in ["B", "KB", "MB", "GB", "TB"]: if size < 1024.0: return f"{size:.1f}{unit}" size /= 1024.0 return f"{size:.1f}PB" def get_file_type(self, filepath): """Determine file type category based on extension. Args: filepath: Path to the file. Returns: File type category string (e.g., "[Audio]", "[Video]", "[File]"). """ ext = os.path.splitext(filepath)[1].lower() if ext in [".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac"]: return "[Audio]" if ext in [".mp4", ".mkv", ".avi", ".mov", ".webm"]: return "[Video]" if ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"]: return "[Image]" if ext in [".txt", ".md", ".log", ".mu"]: return "[Text]" if ext in [".pdf"]: return "[PDF]" if ext in [".zip", ".tar", ".gz", ".7z", ".rar"]: return "[Archive]" if ext in [".py", ".js", ".c", ".cpp", ".java", ".go"]: return "[Code]" return "[File]" def update_file_list(self, directory): """Scan directory and update the file list for display. Args: directory: Directory path to scan. """ if not directory or not os.path.exists(directory): return files = [] try: for root, dirs, filenames in os.walk(directory): for filename in filenames: if filename.startswith(".rns-filesync"): continue full_path = os.path.join(root, filename) rel_path = os.path.relpath(full_path, directory) try: size = os.path.getsize(full_path) files.append( { "path": rel_path, "size": size, "type": self.get_file_type(filename), }, ) except Exception as e: RNS.log(f"Error reading file {rel_path}: {e}", RNS.LOG_DEBUG) except Exception as e: RNS.log(f"Error scanning directory {directory}: {e}", RNS.LOG_DEBUG) files.sort(key=lambda x: x["path"]) with self.file_list_lock: self.file_list = files def set_view_mode(self, mode): """Set the current view mode (files, logs, browser). Args: mode: View mode string. """ with self.view_lock: self.view_mode = mode self.scroll_offset = 0 def draw_box(self, row, col, width, height, title=None): """Draw a box with optional title using box-drawing characters. Args: row: Top row position. col: Left column position. width: Box width. height: Box height. title: Optional title text for the box. """ for i in range(height): self.move_cursor(row + i, col) if i == 0: if title: title_text = f" {title} " left_pad = (width - len(title_text) - 2) // 2 right_pad = width - len(title_text) - left_pad - 2 sys.stdout.write(f"┌{'─' * left_pad}{title_text}{'─' * right_pad}┐") else: sys.stdout.write(f"┌{'─' * (width - 2)}┐") elif i == height - 1: sys.stdout.write(f"└{'─' * (width - 2)}┘") else: sys.stdout.write(f"│{' ' * (width - 2)}│") def draw_status_area(self): """Draw the status information area at the top of the screen. Returns: Height of the status area in rows. """ with self.status_lock: info = self.status_info.copy() status_height = 8 self.draw_box( 1, 1, self.terminal_width - 2, status_height, "RNS FileSync Status", ) self.move_cursor(2, 3) sys.stdout.write( f"{Colors.CYAN}Directory:{Colors.RESET} {info['directory'][: self.terminal_width - 20]}", ) self.move_cursor(3, 3) sys.stdout.write( f"{Colors.CYAN}Identity:{Colors.RESET} {Colors.BRIGHT_YELLOW}{info['identity'][:40]}{Colors.RESET}", ) self.move_cursor(4, 3) sys.stdout.write( f"{Colors.CYAN}Destination:{Colors.RESET} {Colors.BRIGHT_GREEN}{info['destination'][:40]}{Colors.RESET}", ) self.move_cursor(5, 3) sys.stdout.write( f"{Colors.CYAN}Files:{Colors.RESET} {Colors.BRIGHT_WHITE}{info['files']}{Colors.RESET} ", ) sys.stdout.write( f"{Colors.CYAN}Peers:{Colors.RESET} {Colors.BRIGHT_WHITE}{info['peers']}{Colors.RESET} ", ) if info["permissions"]: sys.stdout.write( f"{Colors.CYAN}Permissions:{Colors.RESET} {Colors.GREEN}Enabled{Colors.RESET} ", ) speed_str = self.format_speed(info["speed"]) if speed_str: sys.stdout.write( f"{Colors.CYAN}Speed:{Colors.RESET} {Colors.BRIGHT_MAGENTA}{speed_str}{Colors.RESET}", ) self.move_cursor(6, 3) sys.stdout.write( f"{Colors.BRIGHT_BLACK}Commands: status | peers | browse | logs | download | quit{Colors.RESET}", ) return status_height def draw_files_area(self, start_row): """Draw the file list area. Args: start_row: Starting row position. Returns: Ending row position. """ usable_height = self.terminal_height - 1 view_height = max(1, usable_height - start_row - 2) with self.view_lock: mode = self.view_mode if mode == "browser" and self.browser_peer: title = f"Remote Files - Peer {RNS.prettyhexrep(self.browser_peer)[:16]}..." with self.remote_files_lock: files = self.remote_files[:] else: title = "Local Files" with self.file_list_lock: files = self.file_list[:] self.draw_box(start_row, 1, self.terminal_width - 2, view_height + 2, title) if not files: self.move_cursor(start_row + 1, 3) sys.stdout.write(f"{Colors.DIM}No files to display{Colors.RESET}") else: start_idx = self.scroll_offset end_idx = min(start_idx + view_height, len(files)) for i, file_info in enumerate(files[start_idx:end_idx]): if i >= view_height: break self.move_cursor(start_row + 1 + i, 3) self.clear_line() file_path = file_info["path"] file_size = self.format_size(file_info["size"]) file_type = file_info["type"] max_path_len = self.terminal_width - 40 if len(file_path) > max_path_len: file_path = "..." + file_path[-(max_path_len - 3) :] line = f"{Colors.CYAN}{file_type:<12}{Colors.RESET} {Colors.WHITE}{file_size:>10}{Colors.RESET} {file_path}" sys.stdout.write(line[: self.terminal_width - 6]) for i in range( len(files[self.scroll_offset : self.scroll_offset + view_height]) if files else 0, view_height, ): self.move_cursor(start_row + 1 + i, 3) self.clear_line() return start_row + view_height + 2 def draw_log_area(self, start_row): """Draw the log area. Args: start_row: Starting row position. Returns: Ending row position. """ usable_height = self.terminal_height - 1 log_height = max(1, usable_height - start_row - 2) self.draw_box(start_row, 1, self.terminal_width - 2, log_height + 2, "Logs") with self.log_lock: recent_logs = list(self.log_lines)[-log_height:] for i, log_line in enumerate(recent_logs): self.move_cursor(start_row + 1 + i, 3) max_len = self.terminal_width - 6 if len(log_line) > max_len: sys.stdout.write(log_line[:max_len]) else: sys.stdout.write(log_line) self.clear_line() for i in range(len(recent_logs), log_height): self.move_cursor(start_row + 1 + i, 3) self.clear_line() return start_row + log_height + 2 def draw_input_area(self, restore_input=True): """Draw the input area at the bottom of the screen. Args: restore_input: Whether to restore previously entered input. """ self.update_terminal_size() input_row = self.terminal_height self.move_cursor(input_row, 1) self.clear_line() prompt = f"{Colors.BOLD}{Colors.BRIGHT_CYAN}>{Colors.RESET} " sys.stdout.write(prompt) sys.stdout.flush() if restore_input: with self.input_lock: current_input = self.current_input if current_input: sys.stdout.write(current_input) sys.stdout.flush() def save_current_input(self, text): """Save the current input text. Args: text: Input text to save. """ with self.input_lock: self.current_input = text def clear_current_input(self): """Clear the saved input text.""" with self.input_lock: self.current_input = "" def refresh_display(self, full_clear=False, refresh_input=False): """Refresh the entire TUI display. Args: full_clear: Whether to clear the screen before refreshing. refresh_input: Whether to refresh the input area (default: False to avoid interrupting typing). """ if not self.enabled: return if full_clear: refresh_input = True saved_cursor = False if not refresh_input: sys.stdout.write("\033[s") # save cursor position saved_cursor = True self.update_terminal_size() self.hide_cursor() if full_clear: self.clear_screen() status_end = self.draw_status_area() content_start = status_end + 1 with self.view_lock: mode = self.view_mode if mode == "logs": self.draw_log_area(content_start) else: self.draw_files_area(content_start) if refresh_input: self.draw_input_area(restore_input=True) self.show_cursor() if saved_cursor: sys.stdout.write("\033[u") # restore cursor position sys.stdout.flush() def start_refresh_timer(self): """Start the background thread that periodically refreshes the display.""" def refresh_loop(): while self.enabled: time.sleep(1) self.refresh_display(refresh_input=False) self.refresh_timer = threading.Thread(target=refresh_loop, daemon=True) self.refresh_timer.start() def stop(self): """Stop the TUI and restore original logging.""" self.enabled = False self.show_cursor() self.clear_screen() global original_rns_log if original_rns_log: RNS.log = original_rns_log tui = None class TUILogHandler: """File-like object handler for redirecting stdout to TUI.""" def __init__(self, tui_instance): """Initialize the log handler. Args: tui_instance: SimpleTUI instance to write logs to. """ self.tui = tui_instance def write(self, data): """Write data to the TUI log. Args: data: Data string to write. """ if self.tui and self.tui.enabled and data.strip(): self.tui.add_log(data.strip(), "INFO") def flush(self): """Flush operation (no-op for this handler).""" original_rns_log = None def rns_log_hook(message, level, _override_destination=False): """Intercept RNS log messages and display in TUI. Args: message: Log message text. level: Log level constant. _override_destination: Unused parameter for compatibility. """ if tui and tui.enabled: tui.add_log(message, level) if original_rns_log: original_rns_log(message, level, _override_destination) def get_identity_path(identity_name): """Get the filesystem path for an RNS identity. Args: identity_name: Name of the identity. Returns: Full path to the identity file. """ config_path = os.path.expanduser("~/.reticulum") identity_path = os.path.join(config_path, "identities", f"{identity_name}") return identity_path def load_permissions(permissions_file): """Load peer permissions from a file. Args: permissions_file: Path to the permissions file. """ global peer_permissions, whitelist_enabled if not os.path.exists(permissions_file): RNS.log(f"Permissions file not found: {permissions_file}", RNS.LOG_WARNING) return try: with open(permissions_file) as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line or line.startswith("#"): continue parts = line.split() if len(parts) < 2: RNS.log( f"Invalid permissions line {line_num}: {line}", RNS.LOG_WARNING, ) continue identity_hash = parts[0] perms = parts[1].split(",") valid_perms = [] for perm in perms: perm = perm.strip().lower() if perm in ["read", "write", "delete"]: valid_perms.append(perm) else: RNS.log( f"Invalid permission '{perm}' on line {line_num}", RNS.LOG_WARNING, ) with permissions_lock: peer_permissions[identity_hash] = valid_perms if identity_hash == "*": RNS.log(f"Wildcard permissions set: {valid_perms}", RNS.LOG_INFO) else: RNS.log( f"Loaded permissions for {identity_hash}: {valid_perms}", RNS.LOG_VERBOSE, ) with permissions_lock: if peer_permissions: whitelist_enabled = True RNS.log( f"Permissions loaded for {len(peer_permissions)} identities", RNS.LOG_INFO, ) except Exception as e: RNS.log(f"Error loading permissions file: {e}", RNS.LOG_ERROR) def add_permission_from_args(identity_hash, perms_str): """Add permissions for an identity from command-line arguments. Args: identity_hash: Identity hash string. perms_str: Comma-separated permissions string (e.g., "read,write,delete"). """ global peer_permissions, whitelist_enabled perms = [p.strip().lower() for p in perms_str.split(",")] valid_perms = [] for perm in perms: if perm in ["read", "write", "delete"]: valid_perms.append(perm) else: RNS.log(f"Invalid permission: {perm}", RNS.LOG_WARNING) if valid_perms: with permissions_lock: peer_permissions[identity_hash] = valid_perms whitelist_enabled = True if identity_hash == "*": RNS.log(f"Wildcard permissions set: {valid_perms}", RNS.LOG_INFO) else: RNS.log(f"Permissions for {identity_hash}: {valid_perms}", RNS.LOG_INFO) def check_permission(identity_hash, permission): """Check if an identity has a specific permission. Args: identity_hash: Identity hash bytes or string. permission: Permission to check ("read", "write", or "delete"). Returns: True if permission is granted, False otherwise. """ if not whitelist_enabled: return True with permissions_lock: hash_str = RNS.hexrep(identity_hash, delimit=False) if hash_str in peer_permissions: return permission in peer_permissions[hash_str] if "*" in peer_permissions: return permission in peer_permissions["*"] return False def can_connect(identity_hash): """Check if an identity is allowed to connect. Args: identity_hash: Identity hash bytes or string. Returns: True if connection is allowed, False otherwise. """ if not whitelist_enabled: return True with permissions_lock: hash_str = RNS.hexrep(identity_hash, delimit=False) return hash_str in peer_permissions or "*" in peer_permissions def get_peer_permissions(identity_hash): """Get all permissions for an identity. Args: identity_hash: Identity hash bytes or string. Returns: List of permission strings, or empty list if none found. """ with permissions_lock: hash_str = RNS.hexrep(identity_hash, delimit=False) if hash_str in peer_permissions: return peer_permissions[hash_str] if "*" in peer_permissions: return peer_permissions["*"] return [] def load_or_create_identity(identity_name): """Load an existing RNS identity or create a new one. Args: identity_name: Name of the identity. Returns: RNS.Identity instance. """ identity_path = get_identity_path(identity_name) identity_dir = os.path.dirname(identity_path) if not os.path.exists(identity_dir): os.makedirs(identity_dir) if os.path.exists(identity_path): identity = RNS.Identity.from_file(identity_path) RNS.log(f"Loaded identity {identity_name} from {identity_path}", RNS.LOG_INFO) else: identity = RNS.Identity() identity.to_file(identity_path) RNS.log( f"Created new identity {identity_name} at {identity_path}", RNS.LOG_INFO, ) return identity def hash_file(filepath): """Calculate SHA256 hash of a file. Args: filepath: Path to the file. Returns: Hex digest of the file hash, or None on error. """ hasher = hashlib.sha256() try: with open(filepath, "rb") as f: while chunk := f.read(CHUNK_SIZE): hasher.update(chunk) return hasher.hexdigest() except Exception as e: RNS.log(f"Error hashing file {filepath}: {e}", RNS.LOG_ERROR) return None def hash_blocks(filepath): """Calculate SHA256 hash for each block of a file. Args: filepath: Path to the file. Returns: List of dictionaries with block number, hash, and size. """ blocks = [] try: with open(filepath, "rb") as f: block_num = 0 while block := f.read(BLOCK_SIZE): block_hash = hashlib.sha256(block).hexdigest() blocks.append( { "num": block_num, "hash": block_hash, "size": len(block), }, ) block_num += 1 return blocks except Exception as e: RNS.log(f"Error hashing blocks for {filepath}: {e}", RNS.LOG_ERROR) return [] def scan_directory(directory): """Scan directory and return file information with hashes. Args: directory: Directory path to scan. Returns: Dictionary mapping relative file paths to file info (hash, size, mtime). """ file_info = {} try: for root, dirs, files in os.walk(directory): for filename in files: if filename == ".rns-filesync.db": continue filepath = os.path.join(root, filename) relative_path = os.path.relpath(filepath, directory) try: stat = os.stat(filepath) file_hash = hash_file(filepath) if file_hash: file_info[relative_path] = { "hash": file_hash, "size": stat.st_size, "mtime": stat.st_mtime, } except Exception as e: RNS.log(f"Error scanning {relative_path}: {e}", RNS.LOG_DEBUG) except Exception as e: RNS.log(f"Error scanning directory {directory}: {e}", RNS.LOG_ERROR) return file_info def save_hash_db(directory): """Save file hash database to disk. Args: directory: Directory where the database file should be saved. """ db_path = os.path.join(directory, ".rns-filesync.db") try: with file_hashes_lock, open(db_path, "w") as f: json.dump(file_hashes, f, indent=2) except Exception as e: RNS.log(f"Error saving hash database: {e}", RNS.LOG_ERROR) def load_hash_db(directory): """Load file hash database from disk. Args: directory: Directory containing the database file. """ global file_hashes db_path = os.path.join(directory, ".rns-filesync.db") if os.path.exists(db_path): try: with open(db_path) as f: file_hashes = json.load(f) RNS.log( f"Loaded hash database with {len(file_hashes)} entries", RNS.LOG_INFO, ) except Exception as e: RNS.log(f"Error loading hash database: {e}", RNS.LOG_WARNING) file_hashes = {} else: file_hashes = {} def peer_connected(link): """Handle peer connection event. Args: link: RNS Link object for the connected peer. """ try: remote_identity = link.get_remote_identity() identity_hash = None if remote_identity: identity_hash = remote_identity.hash RNS.log( f"Peer attempting to connect: {RNS.prettyhexrep(identity_hash)}", RNS.LOG_VERBOSE, ) else: RNS.log( f"Peer attempting to connect (destination): {RNS.prettyhexrep(link.destination.hash)}", RNS.LOG_VERBOSE, ) except Exception as e: RNS.log( f"Peer connection attempt (could not get identity: {e})", RNS.LOG_WARNING, ) identity_hash = None if whitelist_enabled and identity_hash: if not can_connect(identity_hash): RNS.log( f"Connection rejected: {RNS.prettyhexrep(identity_hash)} not in whitelist", RNS.LOG_WARNING, ) link.teardown() return perms = get_peer_permissions(identity_hash) RNS.log( f"Peer connected: {RNS.prettyhexrep(identity_hash)} with permissions: {perms}", RNS.LOG_INFO, ) elif identity_hash: RNS.log(f"Peer connected: {RNS.prettyhexrep(identity_hash)}", RNS.LOG_INFO) else: RNS.log("Peer connected", RNS.LOG_INFO) with connected_peers_lock: if link not in connected_peers: connected_peers.append(link) if tui: tui.update_status(peers=len(connected_peers)) link.set_link_closed_callback(peer_disconnected) link.set_packet_callback(packet_received) link.set_resource_strategy(RNS.Link.ACCEPT_APP) link.set_resource_callback(resource_callback) link.set_resource_started_callback(resource_started) link.set_resource_concluded_callback(resource_concluded) link.download_buffers = {} link.upload_buffers = {} if ( identity_hash and check_permission(identity_hash, "read") ) or not whitelist_enabled: send_file_list_to_peer(link) time.sleep(0.1) request_file_list_from_peer(link, browser_mode=False) else: RNS.log("Peer does not have read permission, skipping file list", RNS.LOG_INFO) def peer_disconnected(link): """Handle peer disconnection event. Args: link: RNS Link object for the disconnected peer. """ with connected_peers_lock: if link in connected_peers: connected_peers.remove(link) if tui: tui.update_status(peers=len(connected_peers)) try: remote_identity = link.get_remote_identity() if remote_identity: RNS.log( f"Peer disconnected: {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_INFO, ) else: RNS.log( f"Peer disconnected: {RNS.prettyhexrep(link.destination.hash)}", RNS.LOG_INFO, ) except Exception: RNS.log("Peer disconnected", RNS.LOG_INFO) def send_file_list_to_peer(link, browser_mode=False): """Send file list to a connected peer. Args: link: RNS Link object for the peer. browser_mode: Whether this is for browser viewing (default: False). """ try: if sync_directory: current_files = scan_directory(sync_directory) with file_hashes_lock: file_list = {} for filepath, file_info in current_files.items(): file_list[filepath] = { "hash": file_info["hash"], "size": file_info["size"], "mtime": file_info["mtime"], } else: with file_hashes_lock: file_list = {path: info for path, info in file_hashes.items()} data = umsgpack.packb( { "type": "file_list", "files": file_list, "browser": browser_mode, }, ) packet = RNS.Packet(link, data) packet.send() RNS.log(f"Sent file list ({len(file_list)} files, browser={browser_mode}) to peer", RNS.LOG_INFO) except Exception as e: RNS.log(f"Error sending file list: {e}", RNS.LOG_ERROR) def request_file_list_from_peer(link, browser_mode=False): """Request file list from a connected peer. Args: link: RNS Link object for the peer. browser_mode: Whether the request is for browsing UI. """ try: data = umsgpack.packb( { "type": "file_list_request", "browser": browser_mode, }, ) packet = RNS.Packet(link, data) packet.send() if browser_mode: RNS.log("Requested file list from peer for browsing", RNS.LOG_VERBOSE) else: RNS.log("Requested file list from peer", RNS.LOG_VERBOSE) except Exception as e: RNS.log(f"Error requesting file list: {e}", RNS.LOG_ERROR) def download_file_from_peer(link, filepath): """Initiate file download from a peer. Args: link: RNS Link object for the peer. filepath: Relative path of the file to download. """ RNS.log(f"Downloading file: {filepath}", RNS.LOG_INFO) request_file(link, filepath) def packet_received(message, packet): """Handle incoming RNS packet and route to appropriate handler. Args: message: Packet message data. packet: RNS Packet object. """ try: data = umsgpack.unpackb(message) msg_type = data.get("type") if msg_type == "file_list": handle_peer_file_list(data, packet.link) elif msg_type == "file_list_request": is_browser = data.get("browser", False) send_file_list_to_peer(packet.link, browser_mode=is_browser) elif msg_type == "file_request": handle_file_request(data, packet.link) elif msg_type == "block_hashes": handle_block_hashes_response(data, packet.link) elif msg_type == "delta_request": handle_delta_request(data, packet.link) elif msg_type == "file_chunk": handle_file_chunk(data, packet.link) elif msg_type == "file_complete": handle_file_complete(data, packet.link) elif msg_type == "empty_file": handle_empty_file(data, packet.link) elif msg_type == "file_update": handle_file_update_notification(data, packet.link) elif msg_type == "file_deletion": handle_file_deletion(data, packet.link) except Exception as e: RNS.log(f"Error processing packet: {e}", RNS.LOG_ERROR) def handle_peer_file_list(data, link): """Handle file list received from a peer. Args: data: Packet data dictionary. link: RNS Link object for the peer. """ peer_files = data.get("files", {}) is_browser_response = data.get("browser", False) RNS.log(f"Received file list from peer ({len(peer_files)} files, browser={is_browser_response})", RNS.LOG_INFO) if is_browser_response and tui: remote_files = [] for filepath, peer_info in peer_files.items(): remote_files.append( { "path": filepath, "size": peer_info.get("size", 0), "type": tui.get_file_type(filepath), }, ) with tui.remote_files_lock: tui.remote_files = remote_files return if not sync_directory: RNS.log("Sync directory not set, cannot process file list", RNS.LOG_WARNING) return current_files = scan_directory(sync_directory) RNS.log(f"Comparing {len(peer_files)} peer files with {len(current_files)} local files", RNS.LOG_INFO) if len(peer_files) == 0: RNS.log("Peer has no files to sync", RNS.LOG_INFO) return files_to_request = [] files_to_sync = [] with file_hashes_lock: for filepath, peer_info in peer_files.items(): local_info = current_files.get(filepath) if not local_info: files_to_request.append(filepath) RNS.log(f"Requesting new file: {filepath}", RNS.LOG_INFO) request_file(link, filepath) elif local_info["hash"] != peer_info["hash"]: files_to_sync.append(filepath) RNS.log(f"File differs, requesting blocks: {filepath}", RNS.LOG_INFO) request_file_blocks(link, filepath) if files_to_request or files_to_sync: RNS.log(f"Sync initiated: {len(files_to_request)} new files, {len(files_to_sync)} modified files", RNS.LOG_INFO) else: RNS.log("No files need syncing (all files match)", RNS.LOG_INFO) def request_file(link, filepath): """Request a file from a peer. Args: link: RNS Link object for the peer. filepath: Relative path of the file to request. """ try: RNS.log(f"Requesting file from peer: {filepath}", RNS.LOG_INFO) data = umsgpack.packb( { "type": "file_request", "path": filepath, }, ) packet = RNS.Packet(link, data) packet.send() except Exception as e: RNS.log(f"Error requesting file: {e}", RNS.LOG_ERROR) def request_file_blocks(link, filepath): """Request delta blocks for a file from a peer. Args: link: RNS Link object for the peer. filepath: Relative path of the file. """ try: full_path = os.path.join(sync_directory, filepath) local_blocks = hash_blocks(full_path) if os.path.exists(full_path) else [] data = umsgpack.packb( { "type": "delta_request", "path": filepath, "local_blocks": [b["hash"] for b in local_blocks], }, ) packet = RNS.Packet(link, data) packet.send() except Exception as e: RNS.log(f"Error requesting blocks: {e}", RNS.LOG_ERROR) def handle_file_request(data, link): """Handle file request from a peer and send the file. Args: data: Packet data dictionary containing file path. link: RNS Link object for the peer. """ filepath = data.get("path") if not filepath: return transfer_key = (id(link), filepath) with active_outgoing_transfers_lock: if transfer_key in active_outgoing_transfers: RNS.log(f"Transfer already in progress for {filepath}, ignoring duplicate request", RNS.LOG_DEBUG) return active_outgoing_transfers[transfer_key] = time.time() try: remote_identity = link.get_remote_identity() if remote_identity and whitelist_enabled: if not check_permission(remote_identity.hash, "read"): RNS.log( f"Peer {RNS.prettyhexrep(remote_identity.hash)} does not have read permission for {filepath}", RNS.LOG_WARNING, ) with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) return except Exception as e: RNS.log(f"Error checking permissions: {e}", RNS.LOG_DEBUG) full_path = os.path.join(sync_directory, filepath) if not os.path.exists(full_path): RNS.log(f"Peer requested non-existent file: {filepath}", RNS.LOG_WARNING) with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) return try: file_size = os.path.getsize(full_path) RNS.log(f"Sending file {filepath} ({file_size} bytes) to peer", RNS.LOG_INFO) with file_hashes_lock: file_hash = file_hashes.get(filepath, {}).get("hash") metadata = { "filepath": filepath.encode("utf-8"), "hash": file_hash.encode("utf-8") if file_hash else b"", } start_time = time.time() with transfer_stats_lock: transfer_stats["last_transfer_start"] = start_time transfer_stats["last_transfer_bytes"] = 0 try: def send_concluded_callback(resource): with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) with active_resources_lock: active_resources.pop(transfer_key, None) if resource.status == RNS.Resource.COMPLETE: RNS.log(f"File {filepath} sent successfully", RNS.LOG_INFO) elif resource.status == RNS.Resource.FAILED: RNS.log(f"File {filepath} send failed", RNS.LOG_ERROR) else: RNS.log(f"File {filepath} send concluded with status {resource.status}", RNS.LOG_WARNING) if file_size == 0: RNS.log(f"Sending empty file notification for {filepath} via packet", RNS.LOG_INFO) with file_hashes_lock: file_hash = file_hashes.get(filepath, {}).get("hash") empty_file_data = umsgpack.packb({ "type": "empty_file", "path": filepath, "hash": file_hash, }) packet = RNS.Packet(link, empty_file_data) packet.send() with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) RNS.log(f"Empty file {filepath} notification sent", RNS.LOG_DEBUG) return else: file_handle = open(full_path, "rb") resource = RNS.Resource( file_handle, link, metadata=metadata, auto_compress=True, callback=send_concluded_callback, ) with active_resources_lock: active_resources[transfer_key] = resource if file_size > 0: def progress_callback(resource): with transfer_stats_lock: progress = resource.get_progress() transfer_stats["last_transfer_bytes"] = int(progress * file_size) if tui: elapsed = time.time() - start_time if elapsed > 0: transfer_stats["current_speed"] = transfer_stats["last_transfer_bytes"] / elapsed tui.update_status(speed=transfer_stats["current_speed"]) resource.progress_callback(progress_callback) RNS.log(f"File {filepath} resource created and advertised", RNS.LOG_INFO) except Exception as e: RNS.log(f"Error creating resource for {filepath}: {e}", RNS.LOG_ERROR) import traceback RNS.log(traceback.format_exc(), RNS.LOG_ERROR) with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) with active_resources_lock: active_resources.pop(transfer_key, None) except Exception as e: RNS.log(f"Error sending file {filepath}: {e}", RNS.LOG_ERROR) import traceback RNS.log(traceback.format_exc(), RNS.LOG_ERROR) with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) with active_resources_lock: active_resources.pop(transfer_key, None) def handle_delta_request(data, link): """Handle delta sync request and send only changed blocks. Args: data: Packet data dictionary containing file path and local block hashes. link: RNS Link object for the peer. """ filepath = data.get("path") peer_block_hashes = data.get("local_blocks", []) if not filepath: return transfer_key = (id(link), filepath, "delta") with active_outgoing_transfers_lock: if transfer_key in active_outgoing_transfers: RNS.log(f"Delta transfer already in progress for {filepath}, ignoring duplicate request", RNS.LOG_DEBUG) return active_outgoing_transfers[transfer_key] = time.time() try: remote_identity = link.get_remote_identity() if remote_identity and whitelist_enabled: if not check_permission(remote_identity.hash, "read"): RNS.log( f"Peer {RNS.prettyhexrep(remote_identity.hash)} does not have read permission for {filepath}", RNS.LOG_WARNING, ) with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) return except Exception as e: RNS.log(f"Error checking permissions: {e}", RNS.LOG_DEBUG) full_path = os.path.join(sync_directory, filepath) if not os.path.exists(full_path): RNS.log( f"Peer requested delta for non-existent file: {filepath}", RNS.LOG_WARNING, ) with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) return try: local_blocks = hash_blocks(full_path) file_size = os.path.getsize(full_path) blocks_to_send = [ block_info["num"] for block_info in local_blocks if block_info["hash"] not in peer_block_hashes ] if len(blocks_to_send) == len(local_blocks): RNS.log(f"No common blocks, sending full file: {filepath}", RNS.LOG_VERBOSE) with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) handle_file_request({"path": filepath}, link) return RNS.log( f"Sending {len(blocks_to_send)}/{len(local_blocks)} blocks for {filepath} (delta sync)", RNS.LOG_INFO, ) start_time = time.time() bytes_sent = 0 with open(full_path, "rb") as f: for block_num in blocks_to_send: f.seek(block_num * BLOCK_SIZE) block_data = f.read(BLOCK_SIZE) bytes_sent += len(block_data) sub_chunk_count = (len(block_data) + MAX_PACKET_DATA_SIZE - 1) // MAX_PACKET_DATA_SIZE for sub_idx in range(sub_chunk_count): start_pos = sub_idx * MAX_PACKET_DATA_SIZE end_pos = min(start_pos + MAX_PACKET_DATA_SIZE, len(block_data)) sub_chunk_data = block_data[start_pos:end_pos] chunk_data = umsgpack.packb( { "type": "file_chunk", "path": filepath, "chunk_num": block_num, "sub_chunk_idx": sub_idx, "sub_chunk_total": sub_chunk_count, "data": sub_chunk_data, "size": file_size, "mode": "delta", "total_blocks": len(local_blocks), }, ) packet = RNS.Packet(link, chunk_data) packet.send() time.sleep(0.01) end_time = time.time() transfer_time = end_time - start_time with transfer_stats_lock: if transfer_time > 0: transfer_stats["current_speed"] = bytes_sent / transfer_time transfer_stats["last_transfer_time"] = transfer_time transfer_stats["last_transfer_bytes"] = bytes_sent if tui: tui.update_status(speed=transfer_stats["current_speed"]) with file_hashes_lock: file_hash = file_hashes.get(filepath, {}).get("hash") complete_data = umsgpack.packb( { "type": "file_complete", "path": filepath, "hash": file_hash, "mode": "delta", }, ) packet = RNS.Packet(link, complete_data) packet.send() RNS.log(f"Delta sent for {filepath}", RNS.LOG_VERBOSE) with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) except Exception as e: RNS.log(f"Error sending delta for {filepath}: {e}", RNS.LOG_ERROR) with active_outgoing_transfers_lock: active_outgoing_transfers.pop(transfer_key, None) def resource_callback(resource_advertisement): """Handle incoming resource advertisement and decide whether to accept it. Args: resource_advertisement: RNS ResourceAdvertisement object. Returns: True if resource should be accepted, False otherwise. """ try: transfer_size = resource_advertisement.get_transfer_size() data_size = resource_advertisement.get_data_size() has_meta = resource_advertisement.has_metadata() RNS.log( f"Resource advertisement received: transfer_size={transfer_size}, data_size={data_size}, has_metadata={has_meta}", RNS.LOG_VERBOSE, ) sender_identity = resource_advertisement.link.get_remote_identity() sender_hash = RNS.prettyhexrep(sender_identity.hash) if sender_identity else "unknown" if sender_identity: if whitelist_enabled: if not check_permission(sender_identity.hash, "write"): RNS.log( f"Rejecting resource from {sender_hash} - no write permission", RNS.LOG_WARNING, ) return False RNS.log(f"Accepting resource from {sender_hash} ({data_size} bytes)", RNS.LOG_INFO) return True if not whitelist_enabled: RNS.log(f"Accepting resource ({data_size} bytes, no whitelist)", RNS.LOG_INFO) return True RNS.log("Rejecting resource - no sender identity and whitelist enabled", RNS.LOG_WARNING) return False except Exception as e: RNS.log(f"Error in resource_callback: {e}", RNS.LOG_ERROR) import traceback RNS.log(traceback.format_exc(), RNS.LOG_ERROR) return False def resource_started(resource): """Handle resource transfer started. Args: resource: RNS Resource object. """ size = resource.size if hasattr(resource, "size") else 0 total_size = resource.total_size if hasattr(resource, "total_size") else 0 parts = len(resource.parts) if hasattr(resource, "parts") and resource.parts else 0 has_meta = resource.has_metadata if hasattr(resource, "has_metadata") else False RNS.log( f"Resource transfer started: size={size}, total_size={total_size}, parts={parts}, has_metadata={has_meta}", RNS.LOG_INFO, ) def resource_concluded(resource): """Handle resource transfer completed (receiver side). Args: resource: RNS Resource object. """ status_names = { 0x00: "NONE/REJECTED", 0x01: "QUEUED", 0x02: "ADVERTISED", 0x03: "TRANSFERRING", 0x04: "AWAITING_PROOF", 0x05: "ASSEMBLING", 0x06: "COMPLETE", 0x07: "FAILED", 0x08: "CORRUPT", } try: status_name = status_names.get(resource.status, f"UNKNOWN({resource.status})") if resource.status != RNS.Resource.COMPLETE: size = resource.size if hasattr(resource, "size") else 0 parts = len(resource.parts) if hasattr(resource, "parts") and resource.parts else 0 filepath = None if resource.metadata: try: if isinstance(resource.metadata, dict) and "filepath" in resource.metadata: filepath = resource.metadata["filepath"].decode("utf-8") if isinstance(resource.metadata["filepath"], bytes) else resource.metadata["filepath"] except Exception as e: RNS.log(f"Could not decode filepath from metadata: {e}", RNS.LOG_DEBUG) if filepath: RNS.log( f"Resource transfer failed for {filepath}: status={status_name}, size={size}, parts={parts}", RNS.LOG_ERROR, ) else: RNS.log( f"Resource transfer failed: status={status_name}, size={size}, parts={parts}", RNS.LOG_ERROR, ) return if not resource.metadata or "filepath" not in resource.metadata: RNS.log(f"Resource missing filepath metadata, type: {type(resource.metadata)}", RNS.LOG_WARNING) return filepath = resource.metadata["filepath"].decode("utf-8") if isinstance(resource.metadata["filepath"], bytes) else resource.metadata["filepath"] expected_hash_raw = resource.metadata.get("hash", b"") expected_hash = expected_hash_raw.decode("utf-8") if isinstance(expected_hash_raw, bytes) else expected_hash_raw RNS.log(f"Processing received resource for {filepath}", RNS.LOG_VERBOSE) except Exception as e: RNS.log(f"Error in resource_concluded metadata extraction: {e}", RNS.LOG_ERROR) import traceback RNS.log(traceback.format_exc(), RNS.LOG_ERROR) return try: remote_identity = resource.link.get_remote_identity() if remote_identity and whitelist_enabled: if not check_permission(remote_identity.hash, "write"): RNS.log( f"Peer {RNS.prettyhexrep(remote_identity.hash)} does not have write permission for {filepath}", RNS.LOG_WARNING, ) return except Exception as e: RNS.log(f"Error checking permissions: {e}", RNS.LOG_DEBUG) full_path = os.path.join(sync_directory, filepath) dir_path = os.path.dirname(full_path) if dir_path: os.makedirs(dir_path, exist_ok=True) try: shutil.move(resource.data.name, full_path) file_size = os.path.getsize(full_path) RNS.log(f"Received file {filepath} ({file_size} bytes)", RNS.LOG_INFO) actual_hash = hash_file(full_path) if expected_hash and actual_hash != expected_hash: RNS.log( f"Hash mismatch for {filepath}! Expected {expected_hash}, got {actual_hash}", RNS.LOG_ERROR, ) os.remove(full_path) return if actual_hash: RNS.log(f"File {filepath} verified successfully", RNS.LOG_VERBOSE) with file_hashes_lock: file_hashes[filepath] = { "hash": actual_hash, "size": file_size, "mtime": time.time(), } save_hash_db(sync_directory) broadcast_file_update(filepath, exclude_link=resource.link) except Exception as e: RNS.log(f"Error saving received file {filepath}: {e}", RNS.LOG_ERROR) def handle_empty_file(data, link): """Handle empty file notification from a peer. Args: data: Packet data dictionary containing file path and hash. link: RNS Link object for the peer. """ filepath = data.get("path") expected_hash = data.get("hash") if not filepath: return try: remote_identity = link.get_remote_identity() if remote_identity and whitelist_enabled: if not check_permission(remote_identity.hash, "write"): RNS.log( f"Peer {RNS.prettyhexrep(remote_identity.hash)} does not have write permission for {filepath}", RNS.LOG_WARNING, ) return except Exception as e: RNS.log(f"Error checking permissions: {e}", RNS.LOG_DEBUG) full_path = os.path.join(sync_directory, filepath) dir_path = os.path.dirname(full_path) if dir_path: os.makedirs(dir_path, exist_ok=True) try: with open(full_path, "wb") as f: pass RNS.log(f"Created empty file {filepath}", RNS.LOG_INFO) actual_hash = hash_file(full_path) if expected_hash and actual_hash != expected_hash: RNS.log( f"Hash mismatch for empty file {filepath}! Expected {expected_hash}, got {actual_hash}", RNS.LOG_WARNING, ) with file_hashes_lock: file_hashes[filepath] = { "hash": actual_hash, "size": 0, "mtime": time.time(), } save_hash_db(sync_directory) broadcast_file_update(filepath, exclude_link=link) except Exception as e: RNS.log(f"Error creating empty file {filepath}: {e}", RNS.LOG_ERROR) def handle_block_hashes_response(data, link): """Handle block hashes response (currently unused). Args: data: Packet data dictionary. link: RNS Link object for the peer. """ def handle_file_chunk(data, link): """Handle file chunk received from a peer. Args: data: Packet data dictionary containing chunk information. link: RNS Link object for the peer. """ filepath = data.get("path") chunk_num = data.get("chunk_num") chunk_data = data.get("data") mode = data.get("mode", "full") sub_chunk_idx = data.get("sub_chunk_idx", 0) sub_chunk_total = data.get("sub_chunk_total", 1) if filepath not in link.download_buffers: link.download_buffers[filepath] = { "chunks": {}, "mode": mode, "size": data.get("size", 0), "total_blocks": data.get("total_blocks", 0), } if chunk_num not in link.download_buffers[filepath]["chunks"]: link.download_buffers[filepath]["chunks"][chunk_num] = { "sub_chunks": {}, "total_sub_chunks": sub_chunk_total, } link.download_buffers[filepath]["chunks"][chunk_num]["sub_chunks"][sub_chunk_idx] = chunk_data link.download_buffers[filepath]["chunks"][chunk_num]["total_sub_chunks"] = sub_chunk_total if mode == "delta": RNS.log( f"Received delta block {chunk_num} sub-chunk {sub_chunk_idx + 1}/{sub_chunk_total} for {filepath}", RNS.LOG_DEBUG, ) else: RNS.log( f"Received chunk {chunk_num} sub-chunk {sub_chunk_idx + 1}/{sub_chunk_total} for {filepath}", RNS.LOG_DEBUG, ) def handle_file_complete(data, link): """Handle file transfer completion notification. Args: data: Packet data dictionary containing file path and hash. link: RNS Link object for the peer. """ filepath = data.get("path") expected_hash = data.get("hash") mode = data.get("mode", "full") if filepath not in link.download_buffers: RNS.log(f"No download buffer for {filepath}, waiting for chunks...", RNS.LOG_DEBUG) time.sleep(0.5) if filepath not in link.download_buffers: RNS.log(f"No download buffer for {filepath} after wait", RNS.LOG_WARNING) return try: remote_identity = link.get_remote_identity() if remote_identity and whitelist_enabled: if not check_permission(remote_identity.hash, "write"): RNS.log( f"Peer {RNS.prettyhexrep(remote_identity.hash)} does not have write permission for {filepath}", RNS.LOG_WARNING, ) if filepath in link.download_buffers: del link.download_buffers[filepath] return except Exception as e: RNS.log(f"Error checking permissions: {e}", RNS.LOG_DEBUG) if filepath not in link.download_buffers: RNS.log(f"No download buffer for {filepath}", RNS.LOG_WARNING) return try: buffer_info = link.download_buffers[filepath] reassembled_chunks = [] for chunk_num in sorted(buffer_info["chunks"].keys()): chunk_info = buffer_info["chunks"][chunk_num] sub_chunks = chunk_info["sub_chunks"] total_sub_chunks = chunk_info["total_sub_chunks"] if len(sub_chunks) < total_sub_chunks: RNS.log( f"Incomplete sub-chunks for block {chunk_num}: {len(sub_chunks)}/{total_sub_chunks}", RNS.LOG_WARNING, ) continue complete_chunk = b"".join([sub_chunks[i] for i in sorted(sub_chunks.keys())]) reassembled_chunks.append((chunk_num, complete_chunk)) chunks = reassembled_chunks expected_size = buffer_info.get("size", 0) if mode != "delta" and expected_size > 0: total_received = sum(len(chunk[1]) for chunk in chunks) expected_chunks = (expected_size + BLOCK_SIZE - 1) // BLOCK_SIZE if len(chunks) < expected_chunks or total_received < expected_size: RNS.log( f"File incomplete: {filepath} - received {len(chunks)}/{expected_chunks} chunks, " f"{total_received}/{expected_size} bytes. Waiting for remaining chunks...", RNS.LOG_WARNING, ) time.sleep(1.0) reassembled_chunks = [] for chunk_num in sorted(link.download_buffers[filepath]["chunks"].keys()): chunk_info = link.download_buffers[filepath]["chunks"][chunk_num] sub_chunks = chunk_info["sub_chunks"] total_sub_chunks = chunk_info["total_sub_chunks"] if len(sub_chunks) == total_sub_chunks: complete_chunk = b"".join([sub_chunks[i] for i in sorted(sub_chunks.keys())]) reassembled_chunks.append((chunk_num, complete_chunk)) chunks = reassembled_chunks total_received = sum(len(chunk[1]) for chunk in chunks) if len(chunks) < expected_chunks or total_received < expected_size: RNS.log( f"File still incomplete after wait: {filepath} - received {len(chunks)}/{expected_chunks} chunks, " f"{total_received}/{expected_size} bytes. Requesting file again...", RNS.LOG_ERROR, ) del link.download_buffers[filepath] request_file(link, filepath) return full_path = os.path.join(sync_directory, filepath) dir_path = os.path.dirname(full_path) if dir_path: os.makedirs(dir_path, exist_ok=True) if mode == "delta": if not os.path.exists(full_path): RNS.log( f"Delta received but base file missing: {filepath}", RNS.LOG_ERROR, ) del link.download_buffers[filepath] return with open(full_path, "r+b") as f: for chunk_num, chunk_data in chunks: f.seek(chunk_num * BLOCK_SIZE) f.write(chunk_data) RNS.log(f"Applied {len(chunks)} delta blocks to {filepath}", RNS.LOG_INFO) else: file_data = b"".join([chunk[1] for chunk in chunks]) if expected_size > 0 and len(file_data) != expected_size: RNS.log( f"File size mismatch: {filepath} - expected {expected_size} bytes, got {len(file_data)} bytes", RNS.LOG_ERROR, ) del link.download_buffers[filepath] return with open(full_path, "wb") as f: f.write(file_data) RNS.log( f"Received full file {filepath} ({len(file_data)} bytes)", RNS.LOG_INFO, ) actual_hash = hash_file(full_path) if actual_hash == expected_hash: RNS.log(f"File {filepath} verified successfully", RNS.LOG_VERBOSE) with file_hashes_lock: file_hashes[filepath] = { "hash": actual_hash, "size": os.path.getsize(full_path), "mtime": time.time(), } save_hash_db(sync_directory) broadcast_file_update(filepath, exclude_link=link) else: RNS.log( f"Hash mismatch for {filepath}! Expected {expected_hash}, got {actual_hash}", RNS.LOG_ERROR, ) if mode != "delta": os.remove(full_path) del link.download_buffers[filepath] except Exception as e: RNS.log(f"Error completing file {filepath}: {e}", RNS.LOG_ERROR) def handle_file_update_notification(data, link): """Handle file update notification from a peer. Args: data: Packet data dictionary containing file path and info. link: RNS Link object for the peer. """ filepath = data.get("path") peer_info = data.get("info") if not peer_info: RNS.log(f"Received file update without peer_info for {filepath}", RNS.LOG_WARNING) return with file_hashes_lock: local_info = file_hashes.get(filepath) if not local_info: RNS.log(f"New file available from peer: {filepath}", RNS.LOG_INFO) request_file(link, filepath) elif local_info["hash"] != peer_info["hash"]: RNS.log(f"File update available (hash differs): {filepath}", RNS.LOG_INFO) full_path = os.path.join(sync_directory, filepath) if os.path.exists(full_path): request_file_blocks(link, filepath) else: request_file(link, filepath) else: RNS.log(f"File {filepath} is already up to date", RNS.LOG_DEBUG) def broadcast_file_update(filepath, exclude_link=None): """Broadcast file update notification to all connected peers. Args: filepath: Relative path of the updated file. exclude_link: Optional link to exclude from broadcast. """ with connected_peers_lock: peers = list(connected_peers) if not peers: RNS.log(f"No peers to broadcast update for {filepath}", RNS.LOG_DEBUG) return for link in peers: if link != exclude_link and link.status == RNS.Link.ACTIVE: try: with file_hashes_lock: file_info = file_hashes.get(filepath) if file_info: data = umsgpack.packb( { "type": "file_update", "path": filepath, "info": file_info, }, ) packet = RNS.Packet(link, data) packet.send() RNS.log(f"Sent file update notification for {filepath} to peer", RNS.LOG_VERBOSE) else: RNS.log(f"No file info found for {filepath} in broadcast", RNS.LOG_WARNING) except Exception as e: RNS.log(f"Error broadcasting update: {e}", RNS.LOG_DEBUG) def broadcast_file_deletion(filepath, exclude_link=None): """Broadcast file deletion notification to all connected peers. Args: filepath: Relative path of the deleted file. exclude_link: Optional link to exclude from broadcast. """ with connected_peers_lock: peers = list(connected_peers) for link in peers: if link != exclude_link and link.status == RNS.Link.ACTIVE: try: data = umsgpack.packb( { "type": "file_deletion", "path": filepath, }, ) packet = RNS.Packet(link, data) packet.send() RNS.log(f"Notified peer of deletion: {filepath}", RNS.LOG_VERBOSE) except Exception as e: RNS.log(f"Error broadcasting deletion: {e}", RNS.LOG_DEBUG) def handle_file_deletion(data, link): """Handle file deletion notification from a peer. Args: data: Packet data dictionary containing file path. link: RNS Link object for the peer. """ filepath = data.get("path") if not filepath: return try: remote_identity = link.get_remote_identity() if remote_identity and whitelist_enabled: if not check_permission(remote_identity.hash, "delete"): RNS.log( f"Peer {RNS.prettyhexrep(remote_identity.hash)} does not have delete permission for {filepath}", RNS.LOG_WARNING, ) return except Exception as e: RNS.log(f"Error checking permissions: {e}", RNS.LOG_DEBUG) full_path = os.path.join(sync_directory, filepath) if os.path.exists(full_path): try: os.remove(full_path) RNS.log(f"Deleted file from peer deletion: {filepath}", RNS.LOG_INFO) with file_hashes_lock: file_hashes.pop(filepath, None) save_hash_db(sync_directory) broadcast_file_deletion(filepath, exclude_link=link) except Exception as e: RNS.log(f"Error deleting file {filepath}: {e}", RNS.LOG_ERROR) else: with file_hashes_lock: file_hashes.pop(filepath, None) save_hash_db(sync_directory) def file_monitor(): """Monitor directory for file changes and sync with peers.""" global file_monitor_active while file_monitor_active: try: current_files = scan_directory(sync_directory) with file_hashes_lock: old_files = set(file_hashes.keys()) new_files = set(current_files.keys()) added = new_files - old_files removed = old_files - new_files potentially_modified = old_files & new_files for filepath in added: RNS.log(f"New file detected: {filepath}", RNS.LOG_INFO) file_hashes[filepath] = current_files[filepath] for filepath in removed: RNS.log(f"File removed: {filepath}", RNS.LOG_INFO) del file_hashes[filepath] modified = [] for filepath in potentially_modified: if file_hashes[filepath]["hash"] != current_files[filepath]["hash"]: RNS.log(f"File modified: {filepath}", RNS.LOG_INFO) file_hashes[filepath] = current_files[filepath] modified.append(filepath) if added or removed or modified: save_hash_db(sync_directory) if tui: tui.update_status(files=len(file_hashes)) tui.update_file_list(sync_directory) for filepath in added: broadcast_file_update(filepath) for filepath in removed: broadcast_file_deletion(filepath) for filepath in modified: broadcast_file_update(filepath) time.sleep(SCAN_INTERVAL) except Exception as e: RNS.log(f"File monitor error: {e}", RNS.LOG_DEBUG) time.sleep(1) def connect_to_peer(peer_hash_hex): """Connect to a peer by identity hash. Args: peer_hash_hex: Hex string representation of peer identity hash. Returns: RNS Link object if successful, None otherwise. """ try: peer_hash = bytes.fromhex(peer_hash_hex) except Exception as e: RNS.log(f"Invalid peer hash: {e}", RNS.LOG_ERROR) return None if not RNS.Transport.has_path(peer_hash): RNS.log(f"Finding path to peer {RNS.prettyhexrep(peer_hash)}...", RNS.LOG_INFO) RNS.Transport.request_path(peer_hash) timeout = 30 start_time = time.time() while not RNS.Transport.has_path(peer_hash): time.sleep(0.5) if time.time() - start_time > timeout: RNS.log("Path request timed out", RNS.LOG_ERROR) return None RNS.log("Path found", RNS.LOG_INFO) peer_identity = RNS.Identity.recall(peer_hash) if not peer_identity: RNS.log( f"Could not recall identity for {RNS.prettyhexrep(peer_hash)}", RNS.LOG_ERROR, ) return None destination = RNS.Destination( peer_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "filesync", ) link = RNS.Link( destination, established_callback=peer_connected, closed_callback=peer_disconnected, ) link.download_buffers = {} link.upload_buffers = {} timeout = 10 start_time = time.time() while link.status not in (RNS.Link.ACTIVE, RNS.Link.CLOSED): time.sleep(0.1) if time.time() - start_time > timeout: RNS.log("Link establishment timed out", RNS.LOG_ERROR) return None if link.status != RNS.Link.ACTIVE: RNS.log("Failed to establish link", RNS.LOG_ERROR) return None link.set_packet_callback(packet_received) link.set_resource_strategy(RNS.Link.ACCEPT_APP) link.set_resource_callback(resource_callback) link.set_resource_started_callback(resource_started) link.set_resource_concluded_callback(resource_concluded) with connected_peers_lock: if link not in connected_peers: connected_peers.append(link) time.sleep(0.2) try: remote_identity = link.get_remote_identity() identity_hash = remote_identity.hash if remote_identity else None if ( (identity_hash and check_permission(identity_hash, "read")) or not whitelist_enabled ): send_file_list_to_peer(link) time.sleep(0.1) request_file_list_from_peer(link, browser_mode=False) else: RNS.log("Peer does not have read permission, skipping file list", RNS.LOG_INFO) except Exception as e: RNS.log(f"Error setting up file sync on connect: {e}", RNS.LOG_DEBUG) send_file_list_to_peer(link) time.sleep(0.1) request_file_list_from_peer(link, browser_mode=False) RNS.log(f"Connected to peer {RNS.prettyhexrep(peer_hash)}", RNS.LOG_INFO) return link def announce_loop(destination, interval): """Continuously announce destination at specified intervals. Args: destination: RNS Destination to announce. interval: Announcement interval in seconds. """ last_announce = 0 destination.announce() RNS.log(f"Announced: {RNS.prettyhexrep(destination.hash)}", RNS.LOG_INFO) while True: time.sleep(10) if time.time() - last_announce > interval: destination.announce() last_announce = time.time() RNS.log( f"Auto-announced: {RNS.prettyhexrep(destination.hash)}", RNS.LOG_DEBUG, ) def start_peer( configpath, directory, identity_name="rns_filesync", peers=None, monitor=True, announce_interval=300, use_tui=True, ): """Start the RNS FileSync peer. Args: configpath: Path to Reticulum config directory. directory: Directory to synchronize. identity_name: Name of the RNS identity to use. peers: List of peer identity hashes to connect to. monitor: Whether to enable file monitoring. announce_interval: Announcement interval in seconds. use_tui: Whether to enable terminal user interface. """ global \ peer_identity, \ peer_destination, \ sync_directory, \ file_monitor_active, \ tui, \ original_rns_log directory = os.path.abspath(os.path.expanduser(directory)) if not os.path.exists(directory): os.makedirs(directory) if use_tui: print(f"Created sync directory: {directory}") else: RNS.log(f"Created sync directory: {directory}", RNS.LOG_INFO) sync_directory = directory if use_tui: tui = SimpleTUI() RNS.loglevel = RNS.LOG_VERBOSE original_rns_log = RNS.log RNS.log = rns_log_hook tui.add_log("Initializing RNS FileSync...", RNS.LOG_NOTICE) RNS.Reticulum(configpath) peer_identity = load_or_create_identity(identity_name) peer_destination = RNS.Destination( peer_identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "filesync", ) peer_destination.set_link_established_callback(peer_connected) RNS.log("RNS FileSync peer started", RNS.LOG_NOTICE) RNS.log(f"Sync directory: {directory}", RNS.LOG_NOTICE) RNS.log(f"Identity: {RNS.prettyhexrep(peer_identity.hash)}", RNS.LOG_NOTICE) RNS.log(f"Destination: {RNS.prettyhexrep(peer_destination.hash)}", RNS.LOG_NOTICE) if tui: tui.update_status( directory=directory, identity=RNS.prettyhexrep(peer_identity.hash), destination=RNS.prettyhexrep(peer_destination.hash), permissions=whitelist_enabled, ) load_hash_db(directory) RNS.log("Performing initial directory scan...", RNS.LOG_INFO) current_files = scan_directory(directory) with file_hashes_lock: file_hashes.update(current_files) save_hash_db(directory) RNS.log(f"Tracking {len(file_hashes)} files", RNS.LOG_INFO) if tui: tui.update_status(files=len(file_hashes)) tui.update_file_list(directory) if monitor: file_monitor_active = True monitor_thread = threading.Thread(target=file_monitor, daemon=True) monitor_thread.start() RNS.log("File monitoring enabled", RNS.LOG_INFO) announce_thread = threading.Thread( target=announce_loop, args=(peer_destination, announce_interval), daemon=True, ) announce_thread.start() if peers: for peer_hash in peers: RNS.log(f"Connecting to peer: {peer_hash}", RNS.LOG_INFO) connect_thread = threading.Thread( target=connect_to_peer, args=(peer_hash,), daemon=True, ) connect_thread.start() RNS.log( "Commands: 'peers' - show peers, 'status' - show stats, 'connect ' - connect to peer, 'quit' - exit", RNS.LOG_INFO, ) if tui: tui.start_refresh_timer() time.sleep(0.5) tui.refresh_display(full_clear=True) while True: try: if tui: tui.draw_input_area(restore_input=False) entered = input("").strip() else: entered = input().strip() if tui: tui.clear_current_input() if entered.lower() in ["quit", "exit", "q"]: RNS.log("Shutting down...", RNS.LOG_INFO) save_hash_db(sync_directory) if tui: tui.stop() sys.exit(0) elif entered.lower() == "status": with file_hashes_lock: RNS.log(f"Tracking {len(file_hashes)} files", RNS.LOG_INFO) with connected_peers_lock: RNS.log(f"Connected peers: {len(connected_peers)}", RNS.LOG_INFO) for link in connected_peers: if link.status == RNS.Link.ACTIVE: try: remote_id = link.get_remote_identity() if remote_id: RNS.log( f" - {RNS.prettyhexrep(remote_id.hash)}", RNS.LOG_INFO, ) else: RNS.log( f" - {RNS.prettyhexrep(link.destination.hash)}", RNS.LOG_INFO, ) except Exception: RNS.log( f" - {RNS.prettyhexrep(link.destination.hash)}", RNS.LOG_INFO, ) elif entered.lower() == "peers": with connected_peers_lock: if not connected_peers: RNS.log("No connected peers", RNS.LOG_INFO) else: RNS.log( f"Connected to {len(connected_peers)} peer(s):", RNS.LOG_INFO, ) for link in connected_peers: if link.status == RNS.Link.ACTIVE: try: remote_id = link.get_remote_identity() if remote_id: RNS.log( f" {RNS.prettyhexrep(remote_id.hash)}", RNS.LOG_INFO, ) else: RNS.log( f" {RNS.prettyhexrep(link.destination.hash)}", RNS.LOG_INFO, ) except Exception: RNS.log( f" {RNS.prettyhexrep(link.destination.hash)}", RNS.LOG_INFO, ) elif entered.lower().startswith("connect "): peer_hash = entered[8:].strip() RNS.log(f"Attempting to connect to {peer_hash}...", RNS.LOG_INFO) connect_thread = threading.Thread( target=connect_to_peer, args=(peer_hash,), daemon=True, ) connect_thread.start() elif entered.lower() == "announce": peer_destination.announce() RNS.log( f"Announced: {RNS.prettyhexrep(peer_destination.hash)}", RNS.LOG_INFO, ) elif entered.lower() == "logs": if tui: tui.set_view_mode("logs") RNS.log( "Switched to logs view (type 'files' to return)", RNS.LOG_INFO, ) else: RNS.log("TUI is not enabled", RNS.LOG_INFO) elif entered.lower() == "files": if tui: tui.set_view_mode("files") tui.browser_peer = None RNS.log("Switched to files view", RNS.LOG_INFO) else: RNS.log("TUI is not enabled", RNS.LOG_INFO) elif entered.lower().startswith("browse "): peer_idx_str = entered[7:].strip() try: with connected_peers_lock: if not connected_peers: RNS.log("No connected peers to browse", RNS.LOG_WARNING) else: try: peer_idx = int(peer_idx_str) if 0 <= peer_idx < len(connected_peers): link = connected_peers[peer_idx] if link.status == RNS.Link.ACTIVE: remote_id = link.get_remote_identity() peer_hash = ( remote_id.hash if remote_id else link.destination.hash ) if tui: tui.set_view_mode("browser") tui.browser_peer = peer_hash with tui.remote_files_lock: tui.remote_files = [] request_file_list_from_peer(link, browser_mode=True) RNS.log( f"Browsing peer {peer_idx}: {RNS.prettyhexrep(peer_hash)}", RNS.LOG_INFO, ) else: RNS.log( f"Peer {peer_idx} is not active", RNS.LOG_WARNING, ) else: RNS.log( f"Invalid peer index. Use 'peers' to see available peers (0-{len(connected_peers) - 1})", RNS.LOG_WARNING, ) except ValueError: peer_hash = bytes.fromhex(peer_idx_str) for link in connected_peers: remote_id = link.get_remote_identity() link_hash = ( remote_id.hash if remote_id else link.destination.hash ) if link_hash == peer_hash: if tui: tui.set_view_mode("browser") tui.browser_peer = peer_hash with tui.remote_files_lock: tui.remote_files = [] request_file_list_from_peer(link, browser_mode=True) RNS.log( f"Browsing peer: {RNS.prettyhexrep(peer_hash)}", RNS.LOG_INFO, ) break else: RNS.log( "Peer not found in connected peers", RNS.LOG_WARNING, ) except Exception as e: RNS.log(f"Error browsing peer: {e}", RNS.LOG_ERROR) elif entered.lower().startswith("download "): filepath = entered[9:].strip() if tui and tui.browser_peer: with connected_peers_lock: for link in connected_peers: remote_id = link.get_remote_identity() link_hash = ( remote_id.hash if remote_id else link.destination.hash ) if link_hash == tui.browser_peer: download_file_from_peer(link, filepath) break else: RNS.log( "Browsed peer is no longer connected", RNS.LOG_WARNING, ) else: RNS.log( "Not in browser mode. Use 'browse ' first", RNS.LOG_INFO, ) elif entered.lower() == "download_all": if tui and tui.browser_peer: with tui.remote_files_lock: files_to_download = [f["path"] for f in tui.remote_files] if files_to_download: with connected_peers_lock: for link in connected_peers: remote_id = link.get_remote_identity() link_hash = ( remote_id.hash if remote_id else link.destination.hash ) if link_hash == tui.browser_peer: RNS.log( f"Downloading {len(files_to_download)} files...", RNS.LOG_INFO, ) for filepath in files_to_download: download_file_from_peer(link, filepath) break else: RNS.log( "Browsed peer is no longer connected", RNS.LOG_WARNING, ) else: RNS.log("No files to download", RNS.LOG_INFO) else: RNS.log( "Not in browser mode. Use 'browse ' first", RNS.LOG_INFO, ) elif entered: RNS.log( "Unknown command. Available: peers, status, connect , browse , download , download_all, logs, files, announce, quit", RNS.LOG_INFO, ) except KeyboardInterrupt: RNS.log("\nShutting down...", RNS.LOG_INFO) save_hash_db(sync_directory) if tui: tui.stop() sys.exit(0) except Exception as e: RNS.log(f"Input error: {e}", RNS.LOG_ERROR) def main(): """Run the RNS FileSync application.""" parser = argparse.ArgumentParser( description="RNS FileSync - Peer-to-Peer File Synchronization over Reticulum", ) parser.add_argument( "--config", action="store", default=None, help="path to Reticulum config directory", type=str, ) parser.add_argument( "-d", "--directory", action="store", required=True, help="directory to synchronize", type=str, ) parser.add_argument( "-i", "--identity", action="store", default=None, help="identity name to use (default: rns_filesync)", type=str, ) parser.add_argument( "-p", "--peer", action="append", dest="peers", help="peer identity hash to connect to (can be specified multiple times)", type=str, ) parser.add_argument( "-n", "--no-monitor", action="store_true", help="disable file monitoring (only sync on connect)", default=False, ) parser.add_argument( "-a", "--announce-interval", action="store", default=300, help="announce interval in seconds (default: 300)", type=int, ) parser.add_argument( "--permissions-file", action="store", default=None, help="path to permissions file (format: identity_hash permission1,permission2)", type=str, ) parser.add_argument( "--allow", action="append", dest="allowed_peers", help="allow specific identity hash (use with --perms)", type=str, ) parser.add_argument( "--perms", action="store", default="read,write,delete", help="permissions for --allow (comma-separated: read,write,delete)", type=str, ) parser.add_argument( "--no-tui", action="store_true", help="disable TUI mode (use plain logging)", default=False, ) parser.add_argument( "-v", "--verbose", action="count", default=0, ) args = parser.parse_args() if args.verbose == 1: loglevel = RNS.LOG_INFO elif args.verbose == 2: loglevel = RNS.LOG_VERBOSE elif args.verbose >= 3: loglevel = RNS.LOG_DEBUG else: loglevel = RNS.LOG_NOTICE RNS.loglevel = loglevel identity_name = args.identity if args.identity else "rns_filesync" if args.permissions_file: load_permissions(args.permissions_file) if args.allowed_peers: for peer_hash in args.allowed_peers: add_permission_from_args(peer_hash, args.perms) start_peer( configpath=args.config, directory=args.directory, identity_name=identity_name, peers=args.peers, monitor=not args.no_monitor, announce_interval=args.announce_interval, use_tui=not args.no_tui, ) if __name__ == "__main__": main()