#!/usr/bin/env python3 """ Terminal-Based Interactive LXMF Messaging Client """ import RNS import LXMF import time import os import json import threading from datetime import datetime import shutil import traceback import itertools import subprocess import sys import platform from prompt_toolkit import PromptSession from prompt_toolkit.patch_stdout import patch_stdout from prompt_toolkit.formatted_text import HTML try: from colorama import init, Fore, Style, just_fix_windows_console # type: ignore # Different approach for Windows import platform if platform.system() == 'Windows': just_fix_windows_console() else: init(autoreset=True) COLOR_ENABLED = True except ImportError: COLOR_ENABLED = False class Fore: RED = GREEN = YELLOW = CYAN = MAGENTA = BLUE = WHITE = "" class Style: BRIGHT = RESET_ALL = "" class LXMFClient: def __init__(self, identity_path="./lxmf_client_identity", storage_path="./lxmf_client_storage"): self.identity_path = identity_path self.storage_path = storage_path self.messages_path = os.path.join(storage_path, "messages") self.contacts = {} self.contacts_file = os.path.join(storage_path, "contacts.json") self.config_file = os.path.join(storage_path, "config.json") self.messages = [] self.messages_lock = threading.Lock() self.running = False self.last_sender_hash = None self.last_sender_name = None self.display_name = None self.announce_interval = 300 self.auto_announce_enabled = True self.stop_event = threading.Event() self.show_announces = True self.start_time = time.time() # Notification settings self.notify_sound = True # Platform-specific sounds (beeps/melody) self.notify_bell = True # Terminal bell self.notify_visual = True # Visual flash effect # Track pending messages self.pending_messages = {} # Cache for display names from announces self.display_name_cache = {} self.cache_file = os.path.join(storage_path, "display_names.json") self.cache_dirty = False self.last_cache_save = time.time() # Debug: Track suppressed errors self.suppressed_errors = 0 # Track announced LXMF peers with fixed index numbers self.announced_peers = {} self.peers_lock = threading.Lock() self.next_peer_index = 1 # Track contacts with fixed index numbers self.next_contact_index = 1 # Track conversations with fixed index numbers self.conversation_indices = {} self.next_conversation_index = 1 self.conversations_file = os.path.join(storage_path, "conversations.json") # Blacklist system self.blacklist = set() # Set of blocked destination hashes self.blacklist_file = os.path.join(storage_path, "blacklist.json") # Plugin system self.plugins = {} self.plugins_dir = os.path.join(storage_path, "plugins") self.plugins_enabled = {} self.plugins_config_file = os.path.join(storage_path, "plugins_config.json") # Stamp cost settings self.stamp_cost = 0 # 0 = disabled self.stamp_cost_enabled = False self.ignore_invalid_stamps = True # Reject messages with invalid stamps # Command aliases self.command_aliases = { 'h': 'help', 's': 'send', 're': 'reply', 'm': 'messages', 'c': 'contacts', 'a': 'add', 'e': 'edit', 'rm': 'remove', 'p': 'peers', 'sp': 'sendpeer', 'ap': 'addpeer', 'st': 'stats', 'addr': 'address', 'n': 'name', 'i': 'interval', 'cls': 'clear', 'r': 'restart', 'q': 'quit', 'set': 'settings', 'bl': 'blacklist', 'ann': 'announce', 'save': 'savecontact', } self.Fore = Fore self.Style = Style os.makedirs(storage_path, exist_ok=True) os.makedirs(self.messages_path, exist_ok=True) # === LOAD CONFIGURATION FIRST (before Reticulum) === self.load_config() # === NOW INITIALIZE RETICULUM === self._print_color("🌐 Initializing Reticulum...", Fore.CYAN) self.reticulum = RNS.Reticulum() # Load or create identity if os.path.exists(identity_path): self.identity = RNS.Identity.from_file(identity_path) self._print_success("Loaded identity") else: self.identity = RNS.Identity() self.identity.to_file(identity_path) self._print_success("Created new identity") # Load display name cache self.load_display_name_cache() # Create LXMF router lxmf_storage = os.path.join(storage_path, "lxmf_router") os.makedirs(lxmf_storage, exist_ok=True) self.router = LXMF.LXMRouter( identity=self.identity, storagepath=lxmf_storage ) self._print_success(f"LXMF storage: {lxmf_storage}") # Register destination with display name self.destination = self.router.register_delivery_identity( self.identity, display_name=self.display_name ) # Configure stamp cost on the destination if self.stamp_cost_enabled and self.stamp_cost > 0: try: # Set the stamp cost directly on the destination if hasattr(self.destination, 'stamp_cost'): setattr(self.destination, 'stamp_cost', self.stamp_cost) # type: ignore self._print_success(f"Stamp cost configured: {self.stamp_cost} bits") # Force an announce so the stamp cost is advertised if hasattr(self.destination, 'announce'): self.destination.announce() # type: ignore self._print_success("šŸ“” Announced with stamp cost") except Exception as e: self._print_warning(f"Could not set stamp cost: {e}") # Register callbacks self.router.register_delivery_callback(self.on_message_received) # Register announce handler to capture display names self.register_announce_handler() # Load contacts and messages self.load_contacts() self.load_messages() self.load_conversation_indices() self.load_blacklist() # Load plugins self.load_plugins() # Setup thread exception handler threading.excepthook = self.thread_exception_handler # Show info import shutil try: width = shutil.get_terminal_size().columns except: width = 60 sep_width = min(width, 60) print(f"\n{'─'*sep_width}") self._print_color(f"Display Name: {self.display_name}", Fore.GREEN + Style.BRIGHT) if hasattr(self.destination, 'hash'): self._print_color(f"LXMF Address: {RNS.prettyhexrep(self.destination.hash)}", Fore.CYAN) # type: ignore self._print_color(f"Auto-announce: Every {self.announce_interval} seconds", Fore.YELLOW) # Show stamp cost status if self.stamp_cost_enabled and self.stamp_cost > 0: self._print_color(f"Stamp Cost: ENABLED ({self.stamp_cost} bits)", Fore.RED + Style.BRIGHT) else: self._print_color(f"Stamp Cost: DISABLED", Fore.WHITE) print(f"{'─'*sep_width}\n") # Initial announce (this will now include stamp cost) self._print_color("šŸ“” Announcing to network...", Fore.CYAN) if hasattr(self.destination, 'announce'): self.destination.announce() # type: ignore self._print_success("Initial announce complete") # Start background threads self.announce_thread = threading.Thread(target=self.announce_loop, daemon=True) self.announce_thread.start() self.router_thread = threading.Thread(target=self.router_job_loop, daemon=True) self.router_thread.start() def resolve_contact_or_hash(self, target): """ Resolve a contact name, number, or hash to a destination hash. Returns normalized hash string or None if not found. """ if not target: return None # First, check if it's a direct hash (32 hex chars, possibly with colons/brackets) clean_target = target.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() if len(clean_target) == 32: # Valid hash length (16 bytes = 32 hex chars) # Validate it's actually hex try: bytes.fromhex(clean_target) return clean_target except ValueError: # Not valid hex, continue to other resolution methods pass # Try to parse as contact index number try: contact_idx = int(target) # Search contacts by index for name, data in self.contacts.items(): if data.get('index') == contact_idx: return data['hash'].replace(":", "").replace(" ", "").lower() # Search conversation indices for hash_str, conv_idx in self.conversation_indices.items(): if conv_idx == contact_idx: return hash_str.replace(":", "").replace(" ", "").lower() # Search peers by index with self.peers_lock: for hash_str, peer_data in self.announced_peers.items(): if peer_data.get('index') == contact_idx: return hash_str.replace(":", "").replace(" ", "").lower() return None except ValueError: # Not a number, treat as contact name pass # Search by contact name target_lower = target.lower() for name, data in self.contacts.items(): if name.lower() == target_lower: return data['hash'].replace(":", "").replace(" ", "").lower() # Search by display name in peers with self.peers_lock: for hash_str, peer_data in self.announced_peers.items(): display_name = peer_data.get('display_name', '') if display_name.lower() == target_lower: return hash_str.replace(":", "").replace(" ", "").lower() return None def load_plugins(self): """Load all enabled plugins from plugins directory""" import importlib.util import sys if not os.path.exists(self.plugins_dir): return # Load plugin configuration if os.path.exists(self.plugins_config_file): try: with open(self.plugins_config_file, 'r', encoding='utf-8') as f: config = json.load(f) self.plugins_enabled = config.get('enabled', {}) except Exception as e: self._print_warning(f"Error loading plugin config: {e}") # Scan plugins directory for filename in os.listdir(self.plugins_dir): if filename.endswith('.py') and not filename.startswith('_'): plugin_name = filename[:-3] # Check if plugin is enabled (default to enabled) if not self.plugins_enabled.get(plugin_name, True): continue try: # Load the plugin module plugin_path = os.path.join(self.plugins_dir, filename) spec = importlib.util.spec_from_file_location(plugin_name, plugin_path) if spec and spec.loader: module = importlib.util.module_from_spec(spec) sys.modules[plugin_name] = module spec.loader.exec_module(module) else: continue # Get plugin class if hasattr(module, 'Plugin'): plugin_instance = module.Plugin(self) self.plugins[plugin_name] = plugin_instance self._print_success(f"Loaded plugin: {plugin_name}") else: self._print_warning(f"Plugin {plugin_name} has no Plugin class") except Exception as e: self._print_warning(f"Failed to load plugin {plugin_name}: {e}") def save_plugins_config(self): """Save plugin configuration""" try: config = {'enabled': self.plugins_enabled} with open(self.plugins_config_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2) except Exception as e: self._print_warning(f"Error saving plugin config: {e}") def handle_plugin_command(self, cmd, parts): """Check if command should be handled by a plugin""" for plugin_name, plugin in self.plugins.items(): if hasattr(plugin, 'commands') and cmd in plugin.commands: try: plugin.handle_command(cmd, parts) return True except Exception as e: self._print_error(f"Plugin {plugin_name} error: {e}") return True return False def handle_plugin_message(self, message, msg_data): """Let plugins process incoming messages""" for plugin_name, plugin in self.plugins.items(): try: if hasattr(plugin, 'on_message'): # Plugin can return True to indicate it handled the message if plugin.on_message(message, msg_data): return True except Exception as e: self._print_warning(f"Plugin {plugin_name} message handler error: {e}") return False def list_plugins(self): """List all available plugins""" import shutil import os try: width = min(shutil.get_terminal_size().columns, 80) except: width = 80 print(f"\n{'─'*width}") self._print_color("PLUGINS", Fore.CYAN + Style.BRIGHT) print(f"{'─'*width}") # Scan plugins directory for all .py files available_plugins = {} if os.path.exists(self.plugins_dir): for filename in os.listdir(self.plugins_dir): if filename.endswith('.py') and not filename.startswith('_'): plugin_name = filename[:-3] available_plugins[plugin_name] = { 'loaded': plugin_name in self.plugins, 'enabled': self.plugins_enabled.get(plugin_name, True), 'instance': self.plugins.get(plugin_name) } if not available_plugins: print("\nNo plugins found") print(f"Place plugin files in: {self.plugins_dir}\n") return print(f"\n{'Plugin':<20} {'Status':<15} {'Description'}") print(f"{'─'*20} {'─'*15} {'─'*30}") for plugin_name, info in sorted(available_plugins.items()): # Determine status if info['loaded'] and info['enabled']: status = f"{Fore.GREEN}Loaded{Style.RESET_ALL}" elif info['enabled'] and not info['loaded']: status = f"{Fore.YELLOW}Enabled (reload){Style.RESET_ALL}" else: status = f"{Fore.RED}Disabled{Style.RESET_ALL}" # Get description if info['instance']: description = getattr(info['instance'], 'description', 'No description') else: description = "Not loaded" # Truncate description if too long if len(description) > 30: description = description[:27] + "..." print(f"{plugin_name:<20} {status:<25} {description}") print(f"{'─'*width}") self._print_color("\nšŸ’” Commands:", Fore.YELLOW) print(" plugin enable - Enable a plugin") print(" plugin disable - Disable a plugin") print(" plugin reload - Reload all plugins") print() def load_blacklist(self): """Load blacklist from file""" if os.path.exists(self.blacklist_file): try: with open(self.blacklist_file, 'r', encoding='utf-8') as f: blacklist_data = json.load(f) # Convert list to set and normalize hashes self.blacklist = set(hash_str.replace(":", "").replace(" ", "").lower() for hash_str in blacklist_data) if self.blacklist: self._print_success(f"Loaded {len(self.blacklist)} blocked addresses") except Exception as e: self._print_warning(f"Error loading blacklist: {e}") def save_blacklist(self): """Save blacklist to file""" try: # Convert set to sorted list for JSON blacklist_list = sorted(list(self.blacklist)) with open(self.blacklist_file, 'w', encoding='utf-8') as f: json.dump(blacklist_list, f, indent=2) except Exception as e: self._print_warning(f"Error saving blacklist: {e}") def is_blacklisted(self, destination_hash): """Check if a destination hash is blacklisted""" if not destination_hash: return False # Normalize the hash for comparison normalized = destination_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() return normalized in self.blacklist def add_to_blacklist(self, destination_hash): """Add a destination hash to the blacklist""" if not destination_hash: self._print_error("Invalid destination hash") return False # Normalize the hash normalized = destination_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() if normalized in self.blacklist: self._print_warning("Already blacklisted") return False self.blacklist.add(normalized) self.save_blacklist() return True def remove_from_blacklist(self, destination_hash): """Remove a destination hash from the blacklist""" if not destination_hash: self._print_error("Invalid destination hash") return False # Normalize the hash normalized = destination_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() if normalized not in self.blacklist: self._print_warning("Not in blacklist") return False self.blacklist.remove(normalized) self.save_blacklist() return True def list_blacklist(self): """List all blacklisted addresses""" import shutil try: width = min(shutil.get_terminal_size().columns, 80) except: width = 80 if not self.blacklist: print("\nNo blacklisted addresses\n") return print(f"\n{'─'*width}") self._print_color("BLACKLIST", Fore.RED + Style.BRIGHT) print(f"{'─'*width}") # Sort for consistent display sorted_blacklist = sorted(self.blacklist) print(f"\n{'#':<5} {'Hash':<32} {'Display Name'}") print(f"{'─'*5} {'─'*32} {'─'*30}") for idx, hash_str in enumerate(sorted_blacklist, 1): # Try to get display name for this hash display_name = self.get_lxmf_display_name(hash_str) contact_name = self.get_contact_name_by_hash(hash_str) if contact_name: name_display = f"{contact_name} ({display_name})" if display_name else contact_name elif display_name: name_display = display_name else: name_display = "" # Truncate if too long if len(name_display) > 30: name_display = name_display[:27] + "..." print(f"{idx:<5} {hash_str[:32]:<32} {name_display}") print(f"{'─'*width}") self._print_color(f"\nšŸ’” Total blocked: {len(self.blacklist)}", Fore.YELLOW) print() def get_terminal_width(self, default=70, max_width=90): """Get terminal width with safe defaults for mobile""" try: import shutil width = shutil.get_terminal_size().columns # On very narrow screens (mobile), use smaller width if width < 60: return min(width - 2, 50) # Leave margin, cap at 50 return min(width, max_width) except: return default def load_conversation_indices(self): """Load conversation indices from file""" if os.path.exists(self.conversations_file): try: with open(self.conversations_file, 'r', encoding='utf-8') as f: data = json.load(f) self.conversation_indices = data.get('indices', {}) self.next_conversation_index = data.get('next_index', 1) if self.conversation_indices: self._print_success(f"Loaded {len(self.conversation_indices)} conversation indices") except Exception as e: self._print_warning(f"Error loading conversation indices: {e}") def save_conversation_indices(self): """Save conversation indices to file""" try: data = { 'indices': self.conversation_indices, 'next_index': self.next_conversation_index } with open(self.conversations_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2) except Exception as e: self._print_warning(f"Error saving conversation indices: {e}") def assign_conversation_index(self, hash_str): """Assign a fixed index to a conversation if it doesn't have one""" clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() if clean_hash not in self.conversation_indices: self.conversation_indices[clean_hash] = self.next_conversation_index self.next_conversation_index += 1 self.save_conversation_indices() return self.conversation_indices[clean_hash] def register_announce_handler(self): """Register handler to capture display names from announces""" class LXMFPeerAnnounceHandler: def __init__(self, client): self.client = client self.aspect_filter = "lxmf.delivery" def received_announce(self, destination_hash, announced_identity, app_data): """Called when an LXMF delivery announce is received""" try: if app_data: display_name = LXMF.display_name_from_app_data(app_data) if display_name and isinstance(display_name, str): hash_str = RNS.prettyhexrep(destination_hash) clean_hash = hash_str.replace(":", "").replace(" ", "").lower() with self.client.peers_lock: is_new_peer = clean_hash not in self.client.announced_peers if is_new_peer: peer_index = self.client.next_peer_index self.client.next_peer_index += 1 self.client.announced_peers[clean_hash] = { 'display_name': display_name, 'last_seen': time.time(), 'index': peer_index } else: self.client.announced_peers[clean_hash]['display_name'] = display_name self.client.announced_peers[clean_hash]['last_seen'] = time.time() self.client.cache_display_name(hash_str, display_name) # Show discovery if enabled AND it's truly new if is_new_peer and self.client.show_announces: # Check if already a contact is_contact = self.client.get_contact_name_by_hash(clean_hash) != clean_hash # Use plain text - no ANSI codes in background threads print(f"\nšŸ“” New Announce: {display_name}") print(f"šŸ”— {hash_str}") if not is_contact: # Get peer index for this new peer peer_idx = self.client.announced_peers[clean_hash]['index'] print(f"šŸ’” Quick save: 'ap {peer_idx}' | Send: 'sp {peer_idx} '") if is_contact: # Get peer index for this new peer peer_idx = self.client.announced_peers[clean_hash]['index'] print(f"šŸ’” Quick Send: 'sp {peer_idx} '") except Exception: pass self.peer_announce_handler = LXMFPeerAnnounceHandler(self) RNS.Transport.register_announce_handler(self.peer_announce_handler) self._print_success("LXMF peer announce handler registered") def _print_color(self, text, color=""): """Print with color if available""" if COLOR_ENABLED: print(f"{color}{text}{Style.RESET_ALL}") else: print(text) def _print_success(self, text): """Print success message""" self._print_color(f"āœ“ {text}", Fore.GREEN) def _print_error(self, text): """Print error message""" self._print_color(f"āŒ {text}", Fore.RED) def _print_warning(self, text): """Print warning message""" self._print_color(f"⚠ {text}", Fore.YELLOW) def thread_exception_handler(self, args): """Log thread exceptions""" # Count and skip file-related errors if isinstance(args.exc_value, (PermissionError, FileNotFoundError, OSError)): self.suppressed_errors += 1 return # Log other unexpected errors self._print_warning(f"Thread error: {args.exc_type.__name__}: {args.exc_value}") print("> ", end="", flush=True) def load_display_name_cache(self): """Load cached display names""" if os.path.exists(self.cache_file): try: with open(self.cache_file, 'r', encoding='utf-8') as f: self.display_name_cache = json.load(f) if self.display_name_cache: self._print_success(f"Loaded {len(self.display_name_cache)} cached display names") except Exception as e: self._print_warning(f"Error loading display name cache: {e}") self.display_name_cache = {} else: self.display_name_cache = {} def cache_display_name(self, hash_str, display_name): """Cache a display name for a hash""" if display_name and isinstance(display_name, str) and display_name.strip(): # Normalize hash format (remove colons, spaces, brackets) clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() self.display_name_cache[clean_hash] = display_name.strip() self.cache_dirty = True def save_display_name_cache(self): """Save display name cache (only if dirty and enough time passed)""" # Only save if cache has changed and at least 5 seconds since last save if not self.cache_dirty: return current_time = time.time() if current_time - self.last_cache_save < 5: return # Too soon, skip save try: with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(self.display_name_cache, f, indent=2, ensure_ascii=False) self.cache_dirty = False self.last_cache_save = current_time except Exception as e: # Will retry on next call pass def router_job_loop(self): """Continuously process router jobs""" last_periodic_save = time.time() while not self.stop_event.is_set(): try: if hasattr(self.router, 'jobs'): self.router.jobs() if hasattr(self.router, 'process_outbound'): if not getattr(self.router, 'processing_outbound', False): self.router.process_outbound() # Periodic saves (every 10 seconds) current_time = time.time() if current_time - last_periodic_save > 10: self.save_display_name_cache() last_periodic_save = current_time time.sleep(0.1) except Exception: pass def announce_loop(self): """Periodically announce destination""" while not self.stop_event.is_set(): if self.stop_event.wait(self.announce_interval): break # CHECK IF AUTO-ANNOUNCE IS ENABLED if not self.stop_event.is_set() and self.auto_announce_enabled: if hasattr(self.destination, 'announce'): try: self.destination.announce() # type: ignore timestamp = datetime.now().strftime('%H:%M:%S') print(f"\n[Auto-announced at {timestamp}]") except Exception as e: print(f"\n[Auto-announce failed: {e}]") def load_config(self): """Load configuration from file""" if os.path.exists(self.config_file): try: with open(self.config_file, 'r', encoding='utf-8') as f: config = json.load(f) self.display_name = config.get('display_name', 'Anonymous') self.announce_interval = config.get('announce_interval', 300) self.auto_announce_enabled = config.get('auto_announce_enabled', True) # ADD THIS self.show_announces = config.get('show_announces', True) # Load notification settings self.notify_sound = config.get('notify_sound', True) self.notify_bell = config.get('notify_bell', True) self.notify_visual = config.get('notify_visual', True) # Load stamp cost settings self.stamp_cost_enabled = config.get('stamp_cost_enabled', False) self.stamp_cost = config.get('stamp_cost', 0) self.ignore_invalid_stamps = config.get('ignore_invalid_stamps', False) return except Exception as e: self._print_warning(f"Error loading config: {e}") # === FIRST TIME SETUP === try: width = shutil.get_terminal_size().columns except: width = 60 sep_width = min(width, 60) print(f"\n{'─'*sep_width}") self._print_color("FIRST TIME SETUP", Fore.CYAN + Style.BRIGHT) print(f"{'─'*sep_width}\n") self._print_color("Welcome to LXMF Client!", Fore.GREEN) print("Let's get you set up with a display name.\n") # Ask for display name while True: try: name = input(f"{Fore.YELLOW}Enter your display name: {Style.RESET_ALL}" if COLOR_ENABLED else "Enter your display name: ").strip() if name: self.display_name = name break else: self._print_warning("Display name cannot be empty. Please try again.") except KeyboardInterrupt: print("\n") self._print_warning("Using default name: Anonymous") self.display_name = 'Anonymous' break except: self.display_name = 'Anonymous' break print() self._print_success(f"Display name set to: {self.display_name}") # Ask for announce interval (optional) print(f"\n{Fore.CYAN}Auto-announce interval:{Style.RESET_ALL}" if COLOR_ENABLED else "\nAuto-announce interval:") print("This determines how often your presence is announced to the network.") try: interval_str = input(f"{Fore.YELLOW}Interval in seconds [300]: {Style.RESET_ALL}" if COLOR_ENABLED else "Interval in seconds [300]: ").strip() if interval_str and interval_str.isdigit(): self.announce_interval = max(30, int(interval_str)) else: self.announce_interval = 300 except: self.announce_interval = 300 self._print_success(f"Announce interval set to: {self.announce_interval}s") print(f"\n{'─'*sep_width}") self._print_color("Setup complete! Initializing...", Fore.GREEN) print(f"{'─'*sep_width}\n") # Save configuration self.save_config() def save_config(self): """Save configuration to file""" try: config = { 'display_name': self.display_name, 'announce_interval': self.announce_interval, 'auto_announce_enabled': self.auto_announce_enabled, # ADD THIS 'show_announces': self.show_announces, 'notify_sound': self.notify_sound, 'notify_bell': self.notify_bell, 'notify_visual': self.notify_visual, 'stamp_cost_enabled': self.stamp_cost_enabled, 'stamp_cost': self.stamp_cost, 'ignore_invalid_stamps': self.ignore_invalid_stamps } with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2) except Exception as e: self._print_warning(f"Error saving config: {e}") def load_messages(self): """Load all messages from the messages folder""" try: all_messages = [] for filename in os.listdir(self.messages_path): if filename.endswith('.json'): filepath = os.path.join(self.messages_path, filename) try: with open(filepath, 'r', encoding='utf-8') as f: message = json.load(f) all_messages.append(message) except Exception as e: self._print_warning(f"Error loading message {filename}: {e}") all_messages.sort(key=lambda x: x.get('timestamp', 0)) with self.messages_lock: self.messages = all_messages if all_messages: self._print_success(f"Loaded {len(all_messages)} messages") except Exception as e: self._print_warning(f"Error loading messages: {e}") def save_message(self, msg_data): """Save a single message to its own file""" try: timestamp = msg_data['timestamp'] direction = msg_data['direction'] filename = f"{int(timestamp)}_{direction}.json" filepath = os.path.join(self.messages_path, filename) with open(filepath, 'w', encoding='utf-8') as f: json.dump(msg_data, f, indent=2, ensure_ascii=False) except Exception as e: self._print_warning(f"Error saving message: {e}") def get_lxmf_display_name(self, hash_str): """Get LXMF display name from cache or by querying announce data""" # Normalize hash format - remove all separators and angle brackets clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() # Check cache first - try both with and without formatting if clean_hash in self.display_name_cache: return self.display_name_cache[clean_hash] # Also check if cache has it with colons (legacy format) for cached_hash, cached_name in self.display_name_cache.items(): if cached_hash.replace(":", "").replace(" ", "").lower() == clean_hash: # Found it with different formatting, normalize the cache self.display_name_cache[clean_hash] = cached_name return cached_name # Try to get from Reticulum's stored announce data try: hash_bytes = bytes.fromhex(clean_hash) app_data = RNS.Identity.recall_app_data(hash_bytes) if app_data: # Use LXMF's helper function to extract display name display_name = LXMF.display_name_from_app_data(app_data) if display_name: self.cache_display_name(clean_hash, display_name) return display_name except Exception as e: pass return None def on_message_received(self, message): """Callback when message is received""" try: source_hash_str = RNS.prettyhexrep(message.source_hash) if self.is_blacklisted(source_hash_str): print(f"\n[BLOCKED] Message from blacklisted address: {source_hash_str}") sender_display = self.get_lxmf_display_name(source_hash_str) if sender_display: print(f" Display name: {sender_display}") return # Validate stamp cost if enabled if self.stamp_cost_enabled and self.stamp_cost > 0: pass content = message.content if isinstance(content, bytes): content = content.decode('utf-8', errors='replace') title = message.title if isinstance(title, bytes): title = title.decode('utf-8', errors='replace') sender_display_name = self.get_lxmf_display_name(source_hash_str) msg_data = { 'timestamp': message.timestamp, 'source_hash': source_hash_str, 'title': title, 'content': content, 'direction': 'inbound', 'display_name': sender_display_name } if self.handle_plugin_message(message, msg_data): return with self.messages_lock: self.messages.append(msg_data) self.last_sender_hash = msg_data['source_hash'] self.last_sender_name = self.get_contact_name_by_hash(msg_data['source_hash']) self.save_message(msg_data) sender_display = self.format_contact_display(msg_data['source_hash'], show_hash=True) # Check if sender is in contacts is_saved_contact = self.get_contact_name_by_hash(source_hash_str) != source_hash_str # Trigger notification self.notify_new_message() try: width = min(shutil.get_terminal_size().columns, 60) except: width = 60 # SIMPLE PRINTS - NO COLOR CODES IN STRINGS print(f"\n{'─'*width}") timestamp = datetime.fromtimestamp(message.timestamp).strftime('%H:%M:%S') print(f"šŸ“Ø [{timestamp}] NEW MESSAGE from: {self.format_contact_display_short(msg_data['source_hash'])}") print(f"{'─'*width}") if title: print(f"Title: {title}") if content: print(f"\n{content}") print(f"{'─'*width}") # SHOW APPROPRIATE TIP BASED ON CONTACT STATUS if is_saved_contact: print(f"šŸ’” Type 'reply ' or 're ' to respond") else: # Not in contacts - show save command print(f"šŸ’” Reply: 're ' | Save contact: 'save' or 'savecontact'") print(f"{'─'*width}\n") except Exception as e: print(f"āŒ Error processing message: {e}") def load_contacts(self): """Load contacts from file""" if os.path.exists(self.contacts_file): try: with open(self.contacts_file, 'r', encoding='utf-8') as f: self.contacts = json.load(f) # Assign indices to contacts that don't have them needs_save = False for name, data in self.contacts.items(): if 'index' not in data: data['index'] = self.next_contact_index self.next_contact_index += 1 needs_save = True else: # Update next_contact_index to be higher than any existing index if data['index'] >= self.next_contact_index: self.next_contact_index = data['index'] + 1 if needs_save: self.save_contacts() if self.contacts: self._print_success(f"Loaded {len(self.contacts)} contacts") except Exception as e: self._print_warning(f"Error loading contacts: {e}") def load_announced_peers_from_cache(self): """Load already announced LXMF peers from Reticulum's identity cache""" try: # Try to access Reticulum's stored announces if hasattr(RNS.Transport, 'announces'): print(f"[DEBUG] Checking Transport.announces...") announces_dict = getattr(RNS.Transport, 'announces', {}) for destination_hash, announce_data in announces_dict.items(): try: hash_str = RNS.prettyhexrep(destination_hash) # Try to get app_data app_data = RNS.Identity.recall_app_data(destination_hash) if app_data: display_name = LXMF.display_name_from_app_data(app_data) if display_name and isinstance(display_name, str): clean_hash = hash_str.replace(":", "").replace(" ", "").lower() with self.peers_lock: self.announced_peers[clean_hash] = { 'display_name': display_name, 'last_seen': time.time() } print(f"[DEBUG] Loaded cached peer: {display_name} <{hash_str}>") except Exception as e: pass if self.announced_peers: self._print_success(f"Loaded {len(self.announced_peers)} announced peers from cache") except Exception as e: print(f"[DEBUG] Error loading cached peers: {e}") def save_contacts(self): """Save contacts to file""" try: with open(self.contacts_file, 'w', encoding='utf-8') as f: json.dump(self.contacts, f, indent=2) self._print_success("Contacts saved") except Exception as e: self._print_error(f"Error saving contacts: {e}") def get_contact_name_by_hash(self, hash_str: str) -> str: """Get contact name from hash string, return hash if not found""" clean_hash = hash_str.replace(":", "").replace("<", "").replace(">", "").strip().lower() for name, data in self.contacts.items(): stored_hash = data['hash'].replace(":", "").strip().lower() if stored_hash == clean_hash: return name return hash_str def format_contact_display(self, hash_str, show_hash=True): """ Format contact for display with priority: 1. LXMF display name (from network announces) 2. Contact nickname (from local contacts list) 3. Hash address (fallback) If both display name and nickname exist, show: Nickname (Display Name) """ nickname = self.get_contact_name_by_hash(hash_str) display_name = self.get_lxmf_display_name(hash_str) # Check if we have a saved contact (nickname != hash means we found a contact) has_contact = (nickname != hash_str) if has_contact and display_name: # Both nickname and display name exist if display_name != nickname: # They're different, show both if show_hash: return f"{nickname} ({display_name}) <{hash_str}>" else: return f"{nickname} ({display_name})" else: # They're the same, just show one if show_hash: return f"{nickname} <{hash_str}>" else: return nickname elif has_contact: # Only nickname exists (no display name from network) if show_hash: return f"{nickname} <{hash_str}>" else: return nickname elif display_name: # Only display name exists (no saved contact) if show_hash: return f"{display_name} <{hash_str}>" else: return display_name else: # Nothing exists, just show hash return hash_str def format_contact_display_short(self, hash_str): """ Short format for display - prioritizes readability over showing hash. Priority: Display Name > Nickname > Hash """ display_name = self.get_lxmf_display_name(hash_str) if display_name: return display_name nickname = self.get_contact_name_by_hash(hash_str) if nickname != hash_str: return nickname return hash_str def add_contact(self, name, hash_str): """Add a contact""" clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "") self.contacts[name] = { 'hash': clean_hash, 'index': self.next_contact_index } self.next_contact_index += 1 self.save_contacts() self._print_success(f"Added contact: {name}") display_name = self.get_lxmf_display_name(clean_hash) if display_name: print(f" Display name: {display_name}") def edit_contact(self, identifier): """Edit an existing contact's name or hash""" # Find contact by name or index target_contact = None contact_name = None # Try as index first try: idx = int(identifier) for name, data in self.contacts.items(): if data.get('index') == idx: target_contact = data contact_name = name break except ValueError: # Not a number, try as name if identifier in self.contacts: target_contact = self.contacts[identifier] contact_name = identifier if not target_contact: self._print_error(f"Contact not found: {identifier}") print("Use 'contacts' to see the list") return current_hash = target_contact['hash'] current_index = target_contact.get('index', '?') display_name = self.get_lxmf_display_name(current_hash) print(f"\n{'─'*60}") self._print_color(f"EDITING CONTACT: {contact_name}", Fore.CYAN + Style.BRIGHT) print(f"{'─'*60}") print(f"Current name: {contact_name}") print(f"Current hash: {current_hash}") if display_name: print(f"LXMF display name: {display_name}") print(f"Index: #{current_index}") print(f"{'─'*60}\n") print("What would you like to edit?") print(" [1] Change nickname") print(" [2] Change LXMF address (hash)") print(" [3] Both") print(" [c] Cancel") choice = input("\nSelect option: ").strip().lower() new_name = contact_name new_hash = current_hash if choice in ['1', '3']: # Edit name name_input = input(f"\nEnter new nickname [{contact_name}]: ").strip() if name_input: # Check if name already exists if name_input in self.contacts and name_input != contact_name: self._print_error(f"Contact '{name_input}' already exists!") return new_name = name_input if choice in ['2', '3']: # Edit hash hash_input = input(f"\nEnter new LXMF address [{current_hash}]: ").strip() if hash_input: # Validate hash clean_hash = hash_input.replace(":", "").replace(" ", "").replace("<", "").replace(">", "") if len(clean_hash) == 32: try: bytes.fromhex(clean_hash) new_hash = clean_hash except ValueError: self._print_error("Invalid hash format!") return else: self._print_error("Hash must be 32 hex characters!") return if choice == 'c': print("Cancelled") return if choice not in ['1', '2', '3']: self._print_error("Invalid option") return # Confirm changes print(f"\n{'─'*60}") print("CONFIRM CHANGES:") if new_name != contact_name: print(f" Name: {contact_name} → {new_name}") if new_hash != current_hash: print(f" Hash: {current_hash[:16]}... → {new_hash[:16]}...") print(f"{'─'*60}") confirm = input("\nSave changes? [y/N]: ").strip().lower() if confirm == 'y': # Remove old contact del self.contacts[contact_name] # Add updated contact (keep same index) self.contacts[new_name] = { 'hash': new_hash, 'index': current_index } self.save_contacts() self._print_success(f"Contact updated: {new_name}") # Show new display name if hash changed if new_hash != current_hash: new_display = self.get_lxmf_display_name(new_hash) if new_display: print(f" LXMF display name: {new_display}") else: print("Cancelled") def save_contact_from_hash(self, hash_str, suggested_name=None): """Quick save a contact from hash with optional suggested name""" clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() # Check if already in contacts for name, data in self.contacts.items(): if data['hash'].lower() == clean_hash: self._print_warning(f"Already in contacts as: {name}") return # Get display name display_name = self.get_lxmf_display_name(clean_hash) # Suggest name if suggested_name: default_name = suggested_name elif display_name: default_name = display_name else: default_name = clean_hash[:8] print(f"\n{'─'*60}") self._print_color("SAVE NEW CONTACT", Fore.GREEN + Style.BRIGHT) print(f"{'─'*60}") print(f"LXMF Address: {clean_hash}") if display_name: print(f"Display Name: {display_name}") print(f"{'─'*60}\n") name = input(f"Enter nickname [{default_name}]: ").strip() if not name: name = default_name # Check if name exists if name in self.contacts: self._print_error(f"Contact '{name}' already exists!") return self.add_contact(name, clean_hash) def list_contacts(self): """List all contacts""" if not self.contacts: print("\nNo contacts saved\n") return import shutil try: width = shutil.get_terminal_size().columns except: width = 80 sorted_contacts = sorted(self.contacts.items(), key=lambda x: x[1].get('index', 999999)) sep_width = min(width, 90) print(f"\n{'─'*sep_width}") self._print_color("CONTACTS", Fore.CYAN + Style.BRIGHT) print(f"{'─'*sep_width}") if width < 70: # Mobile: Vertical layout for name, data in sorted_contacts: idx = data.get('index', '?') hash_str = data['hash'] display_name = self.get_lxmf_display_name(hash_str) print(f"\n[{idx}] {Fore.CYAN}{name} - {display_name}{Style.RESET_ALL}") #if display_name: # print(f" {display_name}") print(f" {hash_str}") else: # Desktop: Clean table with separators print(f"\n{'#':<5} {'Name':<20} {'Display Name':<30} {'Hash'}") print(f"{'─'*5} {'─'*20} {'─'*30} {'─'*32}") for name, data in sorted_contacts: idx = data.get('index', '?') hash_str = data['hash'] display_name = self.get_lxmf_display_name(hash_str) name_shown = name[:18] + ".." if len(name) > 20 else name if display_name: display_shown = display_name[:28] + ".." if len(display_name) > 30 else display_name print(f"{idx:<5} {name_shown:<20} {display_shown:<30} {hash_str}") else: print(f"{idx:<5} {name_shown:<20} {'':<30} {hash_str}") print(f"{'─'*sep_width}") self._print_color("\nšŸ’” Send: 's <#> '", Fore.YELLOW) print() def list_peers(self): """List all announced LXMF peers""" with self.peers_lock: peers_copy = dict(self.announced_peers) if not peers_copy: print("\nNo peers announced yet\n") return import shutil try: width = shutil.get_terminal_size().columns except: width = 80 sorted_peers = sorted(peers_copy.items(), key=lambda x: x[1]['index']) sep_width = min(width, 90) print(f"\n{'─'*sep_width}") self._print_color("ANNOUNCED PEERS", Fore.CYAN + Style.BRIGHT) print(f"{'─'*sep_width}") if width < 70: # Mobile: Vertical layout for hash_str, peer_data in sorted_peers: peer_index = peer_data['index'] display_name = peer_data['display_name'] last_seen = peer_data['last_seen'] time_diff = time.time() - last_seen if time_diff < 60: time_str = "now" elif time_diff < 3600: time_str = f"{int(time_diff/60)}m ago" elif time_diff < 86400: time_str = f"{int(time_diff/3600)}h ago" else: time_str = f"{int(time_diff/86400)}d ago" is_contact = any(data['hash'].lower() == hash_str for data in self.contacts.values()) marker = "ā˜… " if is_contact else "" print(f"\n{marker}[{peer_index}] {Fore.CYAN}{display_name}{Style.RESET_ALL} • {time_str}") #print(f" {time_str}") else: # Desktop: Clean table with separators print(f"\n{'#':<5} {'Display Name':<35} {'Hash':<32} {'Last Seen'}") print(f"{'─'*5} {'─'*35} {'─'*32} {'─'*15}") for hash_str, peer_data in sorted_peers: peer_index = peer_data['index'] display_name = peer_data['display_name'] last_seen = peer_data['last_seen'] time_diff = time.time() - last_seen if time_diff < 60: time_str = "just now" elif time_diff < 3600: time_str = f"{int(time_diff/60)}m ago" elif time_diff < 86400: time_str = f"{int(time_diff/3600)}h ago" else: time_str = f"{int(time_diff/86400)}d ago" is_contact = any(data['hash'].lower() == hash_str for data in self.contacts.values()) marker = "ā˜…" if is_contact else " " display_shown = display_name[:33] + ".." if len(display_name) > 35 else display_name print(f"{marker}{peer_index:<4} {display_shown:<35} {hash_str:<32} {time_str}") print(f"{'─'*sep_width}") self._print_color("\nšŸ’” sp <#> | ap <#> [name]", Fore.YELLOW) print() def send_message(self, recipient, content, title=None, fields=None): """Send a message with optimized processing Args: recipient: Contact name, index, or hash content: Message content title: Optional message title fields: Optional dict of LXMF fields (e.g., {LXMF.FIELD_TELEMETRY: data}) """ try: send_start_time = time.time() # Check if recipient is a number (contact index) if recipient.isdigit(): contact_idx = int(recipient) # Find contact by index found_contact = None for name, data in self.contacts.items(): if data.get('index') == contact_idx: found_contact = data['hash'] print(f"Resolved contact #{contact_idx} to: {name}") break if found_contact: dest_hash_str = found_contact else: self._print_error(f"No contact with index #{contact_idx}. Use 'contacts' to see the list") return False # Check if it's a contact name elif recipient in self.contacts: dest_hash_str = self.contacts[recipient]['hash'] # Otherwise treat as direct hash else: dest_hash_str = recipient # Normalize hash dest_hash_str = dest_hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "") dest_hash_bytes = bytes.fromhex(dest_hash_str) # Recall identity or request path dest_identity = RNS.Identity.recall(dest_hash_bytes) if dest_identity is None: self._print_warning("Destination identity unknown, requesting...") path_request_time = time.time() RNS.Transport.request_path(dest_hash_bytes) waited = 0 while waited < 3 and dest_identity is None: time.sleep(0.5) waited += 0.5 dest_identity = RNS.Identity.recall(dest_hash_bytes) if dest_identity is None: self._print_error("Could not get destination identity") print(" Ask recipient to announce their address") return False else: path_time = time.time() - path_request_time self._print_success(f"Got identity after {path_time:.1f}s") # Create destination dest = RNS.Destination( dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery" ) # Create LXMF message message = LXMF.LXMessage( destination=dest, source=self.destination, content=content, title=title or "", desired_method=LXMF.LXMessage.DIRECT ) # Add custom LXMF fields if provided if fields: if not hasattr(message, 'fields'): message.fields = {} message.fields.update(fields) # Add custom attribute for tracking setattr(message, 'send_timestamp', send_start_time) message.register_delivery_callback(self.on_delivery) message.register_failed_callback(self.on_failed) # Save to history msg_data = { 'timestamp': time.time(), 'destination_hash': RNS.prettyhexrep(dest_hash_bytes), 'title': title, 'content': content, 'direction': 'outbound' } with self.messages_lock: self.messages.append(msg_data) self.save_message(msg_data) # Build recipient display string - SHORT FORMAT (no hash) dest_hash = msg_data['destination_hash'] recipient_display = self.format_contact_display_short(dest_hash) self._print_color(f"šŸ“¤ Sending to: {recipient_display}...", Fore.CYAN) # DO NOT PRINT ANYTHING HERE - NO NEWLINE, NO PROMPT! # Track pending self.pending_messages[message.hash] = { 'message': message, 'start_time': send_start_time, 'recipient': recipient_display, 'last_progress': 0 } # Progress monitor def monitor_progress(): msg_hash = message.hash last_status = "" while msg_hash in self.pending_messages: try: elapsed = time.time() - self.pending_messages[msg_hash]['start_time'] progress = self.router.get_outbound_progress(msg_hash) if progress is not None: progress_pct = progress * 100 status = f"[{elapsed:.0f}s] Progress: {progress_pct:.1f}%" if progress < 0.02: phase = " - Waiting for path..." elif progress < 0.04: phase = " - Establishing link..." elif progress < 0.10: phase = " - Link established, preparing transfer..." elif progress < 0.95: phase = " - Transferring data..." else: phase = " - Waiting for confirmation..." status += phase if status != last_status: print(f"\r {status}", end="", flush=True) last_status = status time.sleep(0.5) except: break if last_status: print("\r" + " " * (len(last_status) + 4), end="\r", flush=True) # DO NOT PRINT PROMPT HERE EITHER! threading.Thread(target=monitor_progress, daemon=True).start() # Send message self.router.handle_outbound(message) # DO NOT PRINT ANYTHING AFTER THIS! return True except ValueError: self._print_error("Invalid destination hash or contact index") return False except Exception as e: self._print_error(f"Error sending message: {e}") traceback.print_exc() return False def on_delivery(self, message): """Callback for successful delivery""" dest_hash = RNS.prettyhexrep(message.destination_hash) recipient_str = self.format_contact_display_short(dest_hash) if message.hash in self.pending_messages: del self.pending_messages[message.hash] if hasattr(message, 'send_timestamp'): delivery_time = time.time() - message.send_timestamp if delivery_time < 60: time_str = f"{delivery_time:.1f}s" else: minutes = int(delivery_time // 60) seconds = int(delivery_time % 60) time_str = f"{minutes}m {seconds}s" else: time_str = "?" print() print(f"āœ… Delivered to {recipient_str} ({time_str})") def on_failed(self, message): """Callback for failed delivery""" dest_hash = RNS.prettyhexrep(message.destination_hash) recipient_str = self.format_contact_display_short(dest_hash) if message.hash in self.pending_messages: del self.pending_messages[message.hash] if hasattr(message, 'send_timestamp'): fail_time = time.time() - message.send_timestamp if fail_time < 60: time_str = f"{fail_time:.1f}s" else: minutes = int(fail_time // 60) seconds = int(fail_time % 60) time_str = f"{minutes}m {seconds}s" else: time_str = "?" print() print(f"āŒ Failed to {recipient_str} (after {time_str})") def show_stats(self): """Show messaging statistics""" with self.messages_lock: messages_copy = self.messages.copy() if not messages_copy: print("\nNo messages yet\n") return import shutil try: width = min(shutil.get_terminal_size().columns, 80) is_mobile = width < 70 except: width = 80 is_mobile = False # Calculate overall stats total_sent = sum(1 for msg in messages_copy if msg['direction'] == 'outbound') total_received = sum(1 for msg in messages_copy if msg['direction'] == 'inbound') total_messages = len(messages_copy) # Calculate per-user stats user_stats = {} for msg in messages_copy: if msg['direction'] == 'outbound': hash_key = msg.get('destination_hash', 'unknown') else: hash_key = msg.get('source_hash', 'unknown') if hash_key not in user_stats: user_stats[hash_key] = {'sent': 0, 'received': 0, 'total': 0} if msg['direction'] == 'outbound': user_stats[hash_key]['sent'] += 1 else: user_stats[hash_key]['received'] += 1 user_stats[hash_key]['total'] += 1 # Display overall stats print(f"\n{'─'*width}") self._print_color("MESSAGING STATISTICS", Fore.CYAN + Style.BRIGHT) print(f"{'─'*width}") print(f"\n{Fore.GREEN}Overall Stats:{Style.RESET_ALL}") print(f" Total Messages: {total_messages}") print(f" Sent: {total_sent}") print(f" Received: {total_received}") print(f" Unique Contacts: {len(user_stats)}") # Display per-user stats print(f"\n{Fore.CYAN}Per-User Statistics:{Style.RESET_ALL}") print(f"{'─'*width}") # Sort by total messages descending sorted_users = sorted(user_stats.items(), key=lambda x: x[1]['total'], reverse=True) if is_mobile: # Mobile layout - vertical for hash_str, stats in sorted_users: contact_display = self.format_contact_display_short(hash_str) print(f"\n{contact_display}") print(f" ↑{stats['sent']} ↓{stats['received']} (Total: {stats['total']})") else: # Desktop layout - table print(f"{'Contact':<35} {'Sent':<8} {'Received':<10} {'Total':<10}") print(f"{'─'*35} {'─'*8} {'─'*10} {'─'*10}") for hash_str, stats in sorted_users: contact_display = self.format_contact_display_short(hash_str) # Truncate if too long if len(contact_display) > 33: contact_display = contact_display[:30] + "..." print(f"{contact_display:<35} {stats['sent']:<8} {stats['received']:<10} {stats['total']:<10}") print(f"{'─'*width}\n") def show_status(self): """Show current status and connection info""" try: width = min(shutil.get_terminal_size().columns, 80) except: width = 80 print(f"\n{'─'*width}") self._print_color("SYSTEM STATUS", Fore.CYAN + Style.BRIGHT) print(f"{'─'*width}") # Identity info print(f"\n{Fore.GREEN}Identity:{Style.RESET_ALL}") print(f" Display Name: {self.display_name}") if hasattr(self.destination, 'hash'): print(f" LXMF Address: {RNS.prettyhexrep(self.destination.hash)}") # type: ignore # Network info print(f"\n{Fore.CYAN}Network:{Style.RESET_ALL}") if self.auto_announce_enabled: print(f" Auto-announce: ENABLED (every {self.announce_interval}s)") else: print(f" Auto-announce: DISABLED") print(f" Discovery alerts: {'ON' if self.show_announces else 'OFF'}") # Add thread status if hasattr(self, 'announce_thread'): thread_alive = self.announce_thread.is_alive() print(f" Announce thread: {'RUNNING' if thread_alive else 'STOPPED'}") # Security settings print(f"\n{Fore.RED}Security:{Style.RESET_ALL}") if self.stamp_cost_enabled and self.stamp_cost > 0: print(f" Stamp Cost: {Fore.GREEN}ENABLED{Style.RESET_ALL}") print(f" Required Proof: {Fore.YELLOW}{self.stamp_cost} bits{Style.RESET_ALL}") print(f" Ignore Invalid: {Fore.GREEN}{'YES' if self.ignore_invalid_stamps else 'NO'}{Style.RESET_ALL}") else: print(f" Stamp Cost: {Fore.RED}DISABLED{Style.RESET_ALL}") if self.blacklist: print(f" Blacklist: {Fore.YELLOW}{len(self.blacklist)} blocked{Style.RESET_ALL}") else: print(f" Blacklist: {Fore.GREEN}Empty{Style.RESET_ALL}") # Notification settings print(f"\n{Fore.MAGENTA}Notifications:{Style.RESET_ALL}") print(f" Sound: {'ON' if self.notify_sound else 'OFF'}") print(f" Terminal Bell: {'ON' if self.notify_bell else 'OFF'}") print(f" Visual Flash: {'ON' if self.notify_visual else 'OFF'}") # Statistics with self.messages_lock: total_messages = len(self.messages) sent = sum(1 for m in self.messages if m['direction'] == 'outbound') received = sum(1 for m in self.messages if m['direction'] == 'inbound') with self.peers_lock: peer_count = len(self.announced_peers) contact_count = len(self.contacts) print(f"\n{Fore.YELLOW}Statistics:{Style.RESET_ALL}") print(f" Contacts: {contact_count}") print(f" Announced peers: {peer_count}") print(f" Total messages: {total_messages} (↑{sent} ↓{received})") # Plugins if self.plugins: plugin_count = len(self.plugins) enabled_count = sum(1 for name in self.plugins.keys() if self.plugins_enabled.get(name, True)) print(f" Plugins: {enabled_count}/{plugin_count} enabled") # System info print(f"\n{Fore.WHITE}System:{Style.RESET_ALL}") uptime = time.time() - self.start_time hours = int(uptime // 3600) minutes = int((uptime % 3600) // 60) print(f" Uptime: {hours}h {minutes}m") if self.suppressed_errors > 0: print(f" Suppressed errors: {self.suppressed_errors}") print(f"{'─'*width}\n") def show_messages(self, limit=10, filter_hash=None): """Show recent messages, optionally filtered by user hash""" with self.messages_lock: messages_copy = self.messages.copy() if not messages_copy: print("\nNo messages yet\n") return # Get responsive width try: width = min(shutil.get_terminal_size().columns, 80) except: width = 80 # Filter by hash if provided if filter_hash: clean_filter = filter_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() filtered_messages = [] for msg in messages_copy: msg_hash = "" if msg['direction'] == 'outbound': msg_hash = msg.get('destination_hash', '') else: msg_hash = msg.get('source_hash', '') clean_msg_hash = msg_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower() if clean_msg_hash == clean_filter: filtered_messages.append(msg) messages_copy = filtered_messages if not messages_copy: contact_display = self.format_contact_display_short(filter_hash) print(f"\nNo messages with {contact_display}\n") return # Show messages if filter_hash: contact_display = self.format_contact_display_short(filter_hash) print(f"\n{'─'*width}") # Truncate contact name if too long for header if len(contact_display) > width - 10: contact_display = contact_display[:width-13] + "..." self._print_color(f"CHAT: {contact_display.upper()}", Fore.CYAN + Style.BRIGHT) print(f"{'─'*width}") else: print(f"\n{'─'*width}") self._print_color(f"RECENT MESSAGES ({min(limit, len(messages_copy))})", Fore.CYAN + Style.BRIGHT) print(f"{'─'*width}") # Display messages display_messages = messages_copy[-limit:] if not filter_hash else messages_copy for idx, msg in enumerate(display_messages, 1): try: ts = datetime.fromtimestamp(msg['timestamp']).strftime('%Y-%m-%d %H:%M:%S') direction = "→" if msg['direction'] == 'outbound' else "←" if msg['direction'] == 'outbound': contact = self.format_contact_display_short(msg['destination_hash']) else: contact = self.format_contact_display_short(msg['source_hash']) if filter_hash: # In conversation view, show full messages print(f"\n[{idx}] {ts} {direction}") if msg.get('title'): print(f"Title: {msg['title']}") print(f"\n{msg.get('content', '')}\n") print(f"{'─'*width}") else: # In list view, show preview print(f"\n[{idx}] {ts} {direction} {contact}") if msg.get('title'): print(f" Title: {msg['title']}") content = str(msg.get('content', '')) if len(content) > 100: print(f" {content[:100]}...") else: print(f" {content}") except Exception as e: print(f"\n[{idx}] [Error displaying message: {e}]") print(f"\n{'─'*width}") if filter_hash: self.last_sender_hash = filter_hash self.last_sender_name = self.get_contact_name_by_hash(filter_hash) self._print_color(f"\nšŸ’” Reply: 're '", Fore.GREEN) else: self._print_color("šŸ’” Tip: 'm list' or 'm user <#>'", Fore.YELLOW) print() def show_message_list_with_users(self): """Show a list of users you've messaged with, indexed for selection""" with self.messages_lock: messages_copy = self.messages.copy() if not messages_copy: print("\nNo messages yet\n") return import shutil try: width = min(shutil.get_terminal_size().columns, 90) is_mobile = width < 70 except: width = 90 is_mobile = False # Build user list with message counts user_data = {} for msg in messages_copy: if msg['direction'] == 'outbound': hash_key = msg.get('destination_hash', 'unknown') else: hash_key = msg.get('source_hash', 'unknown') if hash_key == 'unknown': continue # Normalize hash clean_hash = hash_key.replace(":", "").replace(" ", "").lower() if clean_hash not in user_data: # Assign fixed index if new conversation conv_index = self.assign_conversation_index(clean_hash) user_data[clean_hash] = { 'sent': 0, 'received': 0, 'last_message_time': 0, 'index': conv_index, 'display_hash': hash_key } if msg['direction'] == 'outbound': user_data[clean_hash]['sent'] += 1 else: user_data[clean_hash]['received'] += 1 if msg['timestamp'] > user_data[clean_hash]['last_message_time']: user_data[clean_hash]['last_message_time'] = msg['timestamp'] # Sort by fixed index sorted_users = sorted(user_data.items(), key=lambda x: x[1]['index']) print(f"\n{'─'*width}") self._print_color("MESSAGE CONVERSATIONS", Fore.CYAN + Style.BRIGHT) print(f"{'─'*width}") if is_mobile: # Mobile layout - vertical list for clean_hash, data in sorted_users: conv_index = data['index'] hash_str = data['display_hash'] contact_display = self.format_contact_display_short(hash_str) time_diff = time.time() - data['last_message_time'] if time_diff < 60: time_str = "now" elif time_diff < 3600: time_str = f"{int(time_diff/60)}m ago" elif time_diff < 86400: time_str = f"{int(time_diff/3600)}h ago" else: time_str = f"{int(time_diff/86400)}d ago" print(f"\n[{conv_index}] {Fore.CYAN}{contact_display}{Style.RESET_ALL}") print(f" ↑{data['sent']} ↓{data['received']} • {time_str}") else: # Desktop layout - table print(f"\n{'#':<5} {'Contact':<35} {'Sent':<6} {'Recv':<6} {'Last':<12}") print(f"{'─'*5} {'─'*35} {'─'*6} {'─'*6} {'─'*12}") for clean_hash, data in sorted_users: conv_index = data['index'] hash_str = data['display_hash'] contact_display = self.format_contact_display_short(hash_str) # Truncate contact name if too long if len(contact_display) > 33: contact_display = contact_display[:30] + "..." time_diff = time.time() - data['last_message_time'] if time_diff < 60: time_str = "just now" elif time_diff < 3600: time_str = f"{int(time_diff/60)}m ago" elif time_diff < 86400: time_str = f"{int(time_diff/3600)}h ago" else: time_str = f"{int(time_diff/86400)}d ago" print(f"{conv_index:<5} {contact_display:<35} {data['sent']:<6} {data['received']:<6} {time_str:<12}") print(f"{'─'*width}") self._print_color("\nšŸ’” Commands:", Fore.YELLOW) print(f" m user <#> - View conversation") print(f" m [count] - Recent messages") print() return sorted_users def show_help(self, category=None): """Show help with optional category filtering""" if category == 'messaging': self._show_messaging_help() elif category == 'contacts': self._show_contacts_help() elif category == 'settings': self._show_settings_help() elif category == 'system': self._show_system_help() else: self._show_main_help() def _show_messaging_help(self): """Show messaging help""" self._print_color("Messaging commands: send, reply, messages", Fore.CYAN) def _show_contacts_help(self): """Show contacts help""" self._print_color("Contact commands: contacts, add, remove, peers", Fore.CYAN) def _show_settings_help(self): """Show settings help""" self._print_color("Settings commands: settings, name, interval", Fore.CYAN) def _show_system_help(self): """Show system help""" self._print_color("System commands: status, restart, clear, help, quit", Fore.CYAN) def _show_main_help(self): """Show main help menu with categories""" try: width = shutil.get_terminal_size().columns is_mobile = width < 70 except: width = 80 is_mobile = False if COLOR_ENABLED: if is_mobile: # === MOBILE LAYOUT === print(f"\n{Fore.WHITE}{'─'*width}") print(f"LXMF CLIENT COMMANDS".center(width)) print(f"{'─'*width}{Style.RESET_ALL}\n") # Messaging self._print_color("šŸ“Ø MESSAGING", Fore.CYAN + Style.BRIGHT) print(f"{'─'*width}") commands = [ ("send <#> ", "s"), ("reply ", "re"), ("messages [n]", "m"), ("messages list", ""), ("messages user <#>", ""), ] for cmd, alias in commands: if alias: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}") else: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}") # Contacts & Peers print(f"\n{Fore.GREEN}šŸ‘„ CONTACTS & PEERS{Style.RESET_ALL}") print(f"{'─'*width}") commands = [ ("contacts", "c"), ("add ", "a"), ("edit ", "e"), ("remove ", "rm"), ("savecontact [hash]", "save"), ("peers", "p"), ("sendpeer <#> ", "sp"), ("addpeer <#> [name]", "ap"), ] for cmd, alias in commands: if alias: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}") else: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}") # Info & Stats print(f"\n{Fore.MAGENTA}šŸ“Š INFO & STATS{Style.RESET_ALL}") print(f"{'─'*width}") commands = [ ("stats", "st"), ("status", ""), ] for cmd, alias in commands: if alias: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}") else: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}") # Network print(f"\n{Fore.BLUE}🌐 NETWORK{Style.RESET_ALL}") print(f"{'─'*width}") commands = [ ("address", "addr"), ("announce", "ann"), ] for cmd, alias in commands: if alias: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}") else: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}") # Settings print(f"\n{Fore.YELLOW}āš™ļø SETTINGS{Style.RESET_ALL}") print(f"{'─'*width}") commands = [ ("settings", "set"), ("name ", "n"), ("interval ", "i"), ] for cmd, alias in commands: if alias: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}") else: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}") # Mobile security print(f"\n{Fore.RED}šŸ›”ļø SECURITY{Style.RESET_ALL}") print(f"{'─'*width}") commands = [ ("blacklist [list]", "bl"), ("block <#/name>", ""), ("unblock <#/name>", ""), ] for cmd, alias in commands: if alias: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}") else: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}") # System print(f"\n{Fore.RED}šŸ–„ļø SYSTEM{Style.RESET_ALL}") print(f"{'─'*width}") commands = [ ("plugin [list]", ""), ("clear", "cls"), ("restart", "r"), ("help", "h"), ("quit", "q"), ] for cmd, alias in commands: if alias: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}") else: print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}") else: # === DESKTOP LAYOUT (clean separator lines) === print(f"\n{Fore.WHITE}{'─'*70}") print(f"LXMF CLIENT COMMANDS".center(70)) print(f"{'─'*70}{Style.RESET_ALL}\n") # Messaging commands self._print_color("šŸ“Ø MESSAGING", Fore.CYAN + Style.BRIGHT) print(f"{'─'*70}") commands = [ ("send <#> ", "s", "Send message"), ("reply ", "re", "Reply to last"), ("messages [n]", "m", "Recent messages"), ("messages list", "", "All conversations"), ("messages user <#>", "", "View conversation"), ] for long_cmd, short_cmd, description in commands: if short_cmd: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}") else: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}") # Contacts & Peers print(f"\n{Fore.GREEN}šŸ‘„ CONTACTS & PEERS{Style.RESET_ALL}") print(f"{'─'*70}") commands = [ ("contacts", "c", "List contacts"), ("add ", "a", "Add contact"), ("edit ", "e", "Edit contact"), ("remove ", "rm", "Remove contact"), ("savecontact [hash]", "save", "Quick save contact"), ("peers", "p", "List peers"), ("sendpeer <#> ", "sp", "Send to peer"), ("addpeer <#> [name]", "ap", "Add to contacts"), ] for long_cmd, short_cmd, description in commands: if short_cmd: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}") else: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}") # Info & Stats print(f"\n{Fore.MAGENTA}šŸ“Š INFO & STATS{Style.RESET_ALL}") print(f"{'─'*70}") commands = [ ("stats", "st", "Messaging stats"), ("status", "", "System status"), ] for long_cmd, short_cmd, description in commands: if short_cmd: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}") else: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}") # Network print(f"\n{Fore.BLUE}🌐 NETWORK{Style.RESET_ALL}") print(f"{'─'*70}") commands = [ ("address", "addr", "Your LXMF address info"), ("announce", "ann", "Announce manually now!"), ] for long_cmd, short_cmd, description in commands: if short_cmd: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}") else: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}") # Settings print(f"\n{Fore.YELLOW}āš™ļø SETTINGS{Style.RESET_ALL}") print(f"{'─'*70}") commands = [ ("settings", "set", "Settings menu"), ("name ", "n", "Change name"), ("interval ", "i", "Announce interval"), ] for long_cmd, short_cmd, description in commands: if short_cmd: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}") else: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}") # Security section in help menu (desktop version) print(f"\n{Fore.RED}šŸ›”ļø SECURITY{Style.RESET_ALL}") print(f"{'─'*70}") commands = [ ("blacklist [list]", "bl", "Manage blacklist"), ("block <#/name>", "", "Block contact"), ("unblock <#/name>", "", "Unblock contact"), ] for long_cmd, short_cmd, description in commands: if short_cmd: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}") else: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}") # System print(f"\n{Fore.RED}šŸ–„ļø SYSTEM{Style.RESET_ALL}") print(f"{'─'*70}") commands = [ ("plugin [list]", "", "Manage plugins"), ("clear", "cls", "Clear screen"), ("restart", "r", "Restart client"), ("help", "h", "Show help"), ("quit", "q", "Exit"), ] for long_cmd, short_cmd, description in commands: if short_cmd: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}") else: print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}") print(f"\n{Fore.YELLOW}šŸ’” Type 'settings' for options{Style.RESET_ALL}") print(f"{'─'*70}\n") else: # No color fallback print("\nLXMF CLI - Commands available") print("Type 'help' for full list\n") def show_settings_menu(self): """Show interactive settings menu""" while True: # Get responsive width try: width = min(shutil.get_terminal_size().columns, 70) except: width = 70 print(f"\n{'─'*width}") self._print_color("SETTINGS MENU", Fore.YELLOW + Style.BRIGHT) print(f"{'─'*width}") print(f"\n{Fore.CYAN}General Settings:{Style.RESET_ALL}") print(f" [1] Display Name: {Fore.GREEN}{self.display_name}{Style.RESET_ALL}") print(f" [2] Auto-Announce: {Fore.GREEN}{'ON' if self.auto_announce_enabled else 'OFF'}{Style.RESET_ALL}") # ADD THIS print(f" [3] Announce Interval: {Fore.GREEN}{self.announce_interval}s{Style.RESET_ALL}") print(f" [4] Discovery Alerts: {Fore.GREEN}{'ON' if self.show_announces else 'OFF'}{Style.RESET_ALL}") print(f"\n{Fore.MAGENTA}Notification Settings:{Style.RESET_ALL}") print(f" [5] Sound (beeps/melody): {Fore.GREEN}{'ON' if self.notify_sound else 'OFF'}{Style.RESET_ALL}") print(f" [6] Terminal Bell: {Fore.GREEN}{'ON' if self.notify_bell else 'OFF'}{Style.RESET_ALL}") print(f" [7] Visual Flash: {Fore.GREEN}{'ON' if self.notify_visual else 'OFF'}{Style.RESET_ALL}") print(f"\n{Fore.RED}Security Settings:{Style.RESET_ALL}") print(f" [8] Stamp Cost: {Fore.GREEN}{'ON' if self.stamp_cost_enabled else 'OFF'}{Style.RESET_ALL}") if self.stamp_cost_enabled: print(f" Amount: {Fore.YELLOW}{self.stamp_cost} bits{Style.RESET_ALL}") print(f" [9] Ignore Invalid Stamps: {Fore.GREEN}{'ON' if self.ignore_invalid_stamps else 'OFF'}{Style.RESET_ALL}") print(f"\n{Fore.YELLOW}Options:{Style.RESET_ALL}") print(" [1-9] - Change setting") print(" [t] - Test notification") print(" [b] - Back to main menu") print(" [s] - Save and exit") print(f"{'─'*width}") choice = input("\nSelect option: ").strip().lower() if choice == '1': new_name = input(f"\nEnter new display name [{self.display_name}]: ").strip() if new_name: self.display_name = new_name if hasattr(self.destination, 'display_name'): setattr(self.destination, 'display_name', self.display_name) # type: ignore self.save_config() if hasattr(self.destination, 'announce'): self.destination.announce() # type: ignore self._print_success(f"Display name changed to: {self.display_name}") self._print_success("Announced to network") else: print("Cancelled") elif choice == '2': # Toggle auto-announce self.auto_announce_enabled = not self.auto_announce_enabled self.save_config() # Restart announce thread to apply changes if self.auto_announce_enabled: self.stop_event.set() time.sleep(0.2) self.stop_event.clear() # Restart the thread if needed if not self.announce_thread.is_alive(): self.announce_thread = threading.Thread(target=self.announce_loop, daemon=True) self.announce_thread.start() self._print_success("Auto-announce enabled and thread restarted") else: self._print_success("Auto-announce enabled") else: self._print_success("Auto-announce disabled") print(" You can still announce manually using 'announce' command") elif choice == '3': current = self.announce_interval interval_str = input(f"\nEnter announce interval in seconds [{current}]: ").strip() if interval_str: try: new_interval = int(interval_str) if new_interval < 30: self._print_warning("Minimum interval is 30 seconds, setting to 30") new_interval = 30 self.announce_interval = new_interval self.save_config() # Restart announce thread properly self.stop_event.set() # Signal thread to stop if hasattr(self, 'announce_thread'): self.announce_thread.join(timeout=2) # Wait for it to stop self.stop_event.clear() # Clear the stop signal # Start new thread with new interval self.announce_thread = threading.Thread(target=self.announce_loop, daemon=True) self.announce_thread.start() self._print_success(f"Announce interval changed to: {self.announce_interval}s") self._print_success("Announce thread restarted") except ValueError: self._print_error("Invalid number") else: print("Cancelled") elif choice == '4': # Toggle discovery alerts self.show_announces = not self.show_announces self.save_config() status = "enabled" if self.show_announces else "disabled" self._print_success(f"Discovery alerts {status}") elif choice == '5': # Toggle sound notifications self.notify_sound = not self.notify_sound self.save_config() status = "enabled" if self.notify_sound else "disabled" self._print_success(f"Sound notifications {status}") elif choice == '6': # Toggle terminal bell self.notify_bell = not self.notify_bell self.save_config() status = "enabled" if self.notify_bell else "disabled" self._print_success(f"Terminal bell {status}") elif choice == '7': # Toggle visual flash self.notify_visual = not self.notify_visual self.save_config() status = "enabled" if self.notify_visual else "disabled" self._print_success(f"Visual flash {status}") elif choice == '8': # Toggle stamp cost (existing code stays the same) if not self.stamp_cost_enabled: # ... existing stamp cost enable code ... pass else: # ... existing stamp cost disable code ... pass elif choice == '9': # Toggle ignore invalid stamps self.ignore_invalid_stamps = not self.ignore_invalid_stamps self.save_config() status = "enabled" if self.ignore_invalid_stamps else "disabled" self._print_success(f"Ignore invalid stamps {status}") if self.ignore_invalid_stamps: print(" Messages with insufficient/invalid stamps will be rejected") else: print(" Messages with insufficient/invalid stamps will be accepted") elif choice == 't': # Test notification print("\nTesting notification...") self.notify_new_message() time.sleep(1) self._print_success("Test complete!") elif choice in ['b', 'back']: break elif choice in ['s', 'save']: self.save_config() self._print_success("Settings saved") break else: self._print_error("Invalid option") def resolve_command(self, cmd): """Resolve command aliases to full commands""" return self.command_aliases.get(cmd, cmd) def send_to_peer(self, peer_index, content, title=None): """Send message to a peer by index number""" with self.peers_lock: peers_copy = dict(self.announced_peers) try: idx = int(peer_index) # Find peer by index target_peer = None for hash_str, peer_data in peers_copy.items(): if peer_data['index'] == idx: target_peer = (hash_str, peer_data) break if not target_peer: self._print_error(f"Invalid peer number #{idx}. Use 'peers' to see the list") return False hash_str, peer_data = target_peer display_name = peer_data['display_name'] print(f"Sending to peer #{idx}: {display_name}") return self.send_message(hash_str, content, title) except ValueError: self._print_error("Peer number must be a valid number") return False def add_peer_to_contacts(self, peer_index, custom_name=None): """Add an announced peer to contacts""" with self.peers_lock: peers_copy = dict(self.announced_peers) try: idx = int(peer_index) # Find peer by index target_peer = None for hash_str, peer_data in peers_copy.items(): if peer_data['index'] == idx: target_peer = (hash_str, peer_data) break if not target_peer: self._print_error(f"Invalid peer number #{idx}. Use 'peers' to see the list") return hash_str, peer_data = target_peer display_name = peer_data['display_name'] # Check if already in contacts for contact_name, contact_data in self.contacts.items(): if contact_data['hash'].lower() == hash_str: self._print_warning(f"Already in contacts as: {contact_name}") return # Use custom name if provided, otherwise use display name contact_name = custom_name if custom_name else display_name self.add_contact(contact_name, hash_str) except ValueError: self._print_error("Peer number must be a valid number") def notify_new_message(self): """Visual and audio notification for new message - respects user settings""" # === SOUND NOTIFICATION === if self.notify_sound or self.notify_bell: system = platform.system() is_termux = os.path.exists('/data/data/com.termux') # Check for custom sound file sound_file = None sound_dir = os.path.join(self.storage_path, "sounds") if os.path.exists(sound_dir): # Look for notification sound files (in order of preference) for filename in ['notification.wav', 'notification.mp3', 'notification.ogg', 'message.wav', 'beep.wav']: filepath = os.path.join(sound_dir, filename) if os.path.exists(filepath): sound_file = filepath break try: if is_termux: # === TERMUX/ANDROID === if self.notify_sound: try: # Try to play custom sound first if sound_file: os.system(f'termux-media-player play "{sound_file}" >/dev/null 2>&1 &') time.sleep(0.5) else: # Vibration pattern fallback os.system('termux-vibrate -d 80 2>/dev/null &') time.sleep(0.09) os.system('termux-vibrate -d 80 2>/dev/null &') time.sleep(0.09) os.system('termux-vibrate -d 80 2>/dev/null &') time.sleep(0.09) os.system('termux-vibrate -d 150 2>/dev/null &') time.sleep(0.16) os.system('termux-vibrate -d 100 2>/dev/null &') # System notification os.system('termux-notification --title "šŸ“Ø LXMF Message" --content "New message received" --sound 2>/dev/null &') except: pass # Terminal bells if self.notify_bell: for _ in range(3): print("\a", end="", flush=True) time.sleep(0.1) elif system == 'Windows': # === WINDOWS === if self.notify_sound: sound_played = False # Try custom sound file first if sound_file: try: import winsound # type: ignore winsound.PlaySound(sound_file, winsound.SND_FILENAME | winsound.SND_ASYNC) # type: ignore sound_played = True except Exception as e: pass # Fallback to beep melody if not sound_played: try: import winsound # type: ignore melody = [ (523, 80), # C5 (659, 80), # E5 (784, 80), # G5 (1047, 150), # C6 (784, 100), # G5 ] for freq, duration in melody: winsound.Beep(freq, duration) # type: ignore time.sleep(0.01) except Exception: if self.notify_bell: for _ in range(3): print("\a", end="", flush=True) time.sleep(0.1) elif self.notify_bell: for _ in range(3): print("\a", end="", flush=True) time.sleep(0.1) elif system == 'Darwin': # === MACOS === sound_played = False if self.notify_sound: # Try custom sound file first if sound_file: try: subprocess.Popen(["afplay", sound_file], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) sound_played = True except Exception: pass # Fallback to system sounds if not sound_played: try: sound_candidates = [ "/System/Library/Sounds/Ping.aiff", "/System/Library/Sounds/Glass.aiff", "/System/Library/Sounds/Submarine.aiff" ] sound_path = next((p for p in sound_candidates if os.path.exists(p)), None) if sound_path: subprocess.Popen(["afplay", sound_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) else: subprocess.run(["osascript", "-e", "beep"], check=False) except Exception: if self.notify_bell: print("\a", end="", flush=True) if self.notify_bell: for _ in range(2): print("\a", end="", flush=True) time.sleep(0.12) elif system == 'Linux': # === LINUX === sound_played = False if self.notify_sound: # Try custom sound file first if sound_file: # Try multiple players in order of preference players = [ f'paplay "{sound_file}"', f'aplay "{sound_file}"', f'mpg123 -q "{sound_file}"', f'ffplay -nodisp -autoexit -hide_banner -loglevel quiet "{sound_file}"' ] for player_cmd in players: try: result = os.system(f'{player_cmd} 2>/dev/null &') if result == 0: sound_played = True break except: continue # Fallback to system sounds if not sound_played: try: result = os.system('paplay /usr/share/sounds/freedesktop/stereo/message-new-instant.oga 2>/dev/null &') if result != 0: os.system('beep -f 523 -l 80 -n -f 659 -l 80 -n -f 784 -l 80 -n -f 1047 -l 150 -n -f 784 -l 100 2>/dev/null &') except: pass if self.notify_bell: for _ in range(2): print("\a", end="", flush=True) time.sleep(0.15) else: # === OTHER/UNKNOWN SYSTEMS === if self.notify_bell: for _ in range(3): print("\a", end="", flush=True) time.sleep(0.1) except Exception as e: # Ultimate fallback to bell if available if self.notify_bell: try: for _ in range(3): print("\a", end="", flush=True) time.sleep(0.1) except: pass # === VISUAL NOTIFICATION === if self.notify_visual: try: terminal_width = shutil.get_terminal_size().columns except: terminal_width = 80 is_termux = os.path.exists('/data/data/com.termux') # Determine message if is_termux: msg = " šŸ“± NEW MESSAGE! " else: msg = " šŸ“¬ NEW MESSAGE! " # Calculate centered position msg_width = min(60, terminal_width) padding = (terminal_width - msg_width) // 2 # Quick flash sequence (3 flashes) - PLAIN TEXT for _ in range(3): # Flash on line = " " * padding + "─" * msg_width print(f"\r{line}", end="", flush=True) time.sleep(0.08) # Flash off (clear) print(f"\r{' ' * terminal_width}", end="\r", flush=True) time.sleep(0.08) # Final message display (brief) - PLAIN TEXT centered_msg = msg.center(msg_width, '═') line = " " * padding + centered_msg print(f"\r{line}", end="", flush=True) time.sleep(0.2) # Clear completely print(f"\r{' ' * terminal_width}", end="\r", flush=True) def shutdown(self): """Clean shutdown""" print("\nShutting down...") self.stop_event.set() # Force save any pending cache updates try: if self.cache_dirty: with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(self.display_name_cache, f, indent=2, ensure_ascii=False) except: pass if hasattr(self, 'announce_thread'): self.announce_thread.join(timeout=2) if hasattr(self, 'router_thread'): self.router_thread.join(timeout=2) time.sleep(0.5) def clear_screen(self): """Clear the terminal screen""" import os # Windows if os.name == 'nt': os.system('cls') # Unix/Linux/Mac else: os.system('clear') # Get responsive width try: width = shutil.get_terminal_size().columns except: width = 60 # Banner with proper centering banner_lines = [ "ā–ˆā–ˆā•— ā–ˆā–ˆā•— ā–ˆā–ˆā•—ā–ˆā–ˆā–ˆā•— ā–ˆā–ˆā–ˆā•—ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā•—", "ā–ˆā–ˆā•‘ ā•šā–ˆā–ˆā•—ā–ˆā–ˆā•”ā•ā–ˆā–ˆā–ˆā–ˆā•— ā–ˆā–ˆā–ˆā–ˆā•‘ā–ˆā–ˆā•”ā•ā•ā•ā•ā•", "ā–ˆā–ˆā•‘ ā•šā–ˆā–ˆā–ˆā•”ā• ā–ˆā–ˆā•”ā–ˆā–ˆā–ˆā–ˆā•”ā–ˆā–ˆā•‘ā–ˆā–ˆā–ˆā–ˆā–ˆā•— ", "ā–ˆā–ˆā•‘ ā–ˆā–ˆā•”ā–ˆā–ˆā•— ā–ˆā–ˆā•‘ā•šā–ˆā–ˆā•”ā•ā–ˆā–ˆā•‘ā–ˆā–ˆā•”ā•ā•ā• ", "ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā•—ā–ˆā–ˆā•”ā• ā–ˆā–ˆā•—ā–ˆā–ˆā•‘ ā•šā•ā• ā–ˆā–ˆā•‘ā–ˆā–ˆā•‘ ", "ā•šā•ā•ā•ā•ā•ā•ā•ā•šā•ā• ā•šā•ā•ā•šā•ā• ā•šā•ā•ā•šā•ā• ", "", "Interactive LXMF Client" ] sep_width = min(width, 60) print("\n" + "─" * sep_width) if COLOR_ENABLED: for line in banner_lines: # Center each line centered = line.center(sep_width) print(f"{Fore.WHITE}{Style.BRIGHT}{centered}{Style.RESET_ALL}") else: for line in banner_lines: centered = line.center(sep_width) print(centered) print("─" * sep_width + "\n") def restart_client(self): """Restart the client""" print("\n" + "─" * 60) self._print_color("Restarting LXMF Client...", Fore.YELLOW + Style.BRIGHT) print("─" * 60 + "\n") # Shutdown current instance self.shutdown() # Restart the Python script import sys import os python = sys.executable os.execl(python, python, *sys.argv) def show_progress_spinner(self, message, duration=2): """Show a spinner for background operations""" spinner = itertools.cycle(['ā ‹', 'ā ™', 'ā ¹', 'ā ø', 'ā ¼', 'ā “', 'ā ¦', 'ā §', 'ā ‡', 'ā ']) end_time = time.time() + duration while time.time() < end_time: if COLOR_ENABLED: print(f"\r{Fore.CYAN}{next(spinner)}{Style.RESET_ALL} {message}...", end="", flush=True) else: print(f"\r{next(spinner)} {message}...", end="", flush=True) time.sleep(0.1) print("\r" + " " * (len(message) + 10), end="\r") def _handle_address_command(self, parts): """Handle address command""" print(f"\nDisplay Name: {self.display_name}") if hasattr(self.destination, 'hash'): print(f"LXMF Address: {RNS.prettyhexrep(self.destination.hash)}") # type: ignore print(f"Auto-announce: Every {self.announce_interval}s\n") def _handle_name_command(self, parts): """Handle name change command""" if len(parts) < 2: print("šŸ’” Usage: name ") else: self.display_name = ' '.join(parts[1:]) if hasattr(self.destination, 'display_name'): setattr(self.destination, 'display_name', self.display_name) # type: ignore self.save_config() if hasattr(self.destination, 'announce'): self.destination.announce() # type: ignore self._print_success(f"Display name: {self.display_name}") self._print_success("Announced to network") def _handle_interval_command(self, parts): """Handle announce interval command""" if len(parts) < 2: print(f"Current interval: {self.announce_interval}s") print("šŸ’” Usage: interval ") print("Minimum: 30 seconds") else: try: new_interval = int(parts[1]) if new_interval < 30: self._print_warning("Minimum interval is 30 seconds, setting to 30") new_interval = 30 self.announce_interval = new_interval self.save_config() self.stop_event.set() time.sleep(0.1) self.stop_event.clear() self._print_success(f"Announce interval changed to: {self.announce_interval}s") self._print_success("New interval will apply from next cycle") except ValueError: self._print_error("Invalid number") def _handle_announce_command(self, parts): """Handle manual announce command""" if hasattr(self.destination, 'announce'): self.destination.announce() # type: ignore self._print_success("Announced manually") def _handle_add_command(self, parts): """Handle add contact command""" if len(parts) < 3: print("šŸ’” Usage: add ") else: self.add_contact(parts[1], parts[2]) def _handle_edit_command(self, parts): """Handle edit contact command""" if len(parts) < 2: print("šŸ’” Usage: edit ") print("Example: edit Alice") print("Example: edit 3") else: self.edit_contact(parts[1]) def _handle_savecontact_command(self, parts): """Handle quick save contact command""" # If called without args, use last sender if len(parts) < 2: if self.last_sender_hash: self.save_contact_from_hash(self.last_sender_hash, self.last_sender_name) else: print("šŸ’” Usage: savecontact [hash]") print("Or receive a message first, then just type 'save'") else: # Save specific hash target_hash = parts[1] self.save_contact_from_hash(target_hash) def _handle_remove_command(self, parts): """Handle remove contact command""" if len(parts) < 2: print("šŸ’” Usage: remove ") else: target = parts[1] # Try to remove by index first if target.isdigit(): index = int(target) found = None for name, data in self.contacts.items(): if data.get('index') == index: found = name break if found: del self.contacts[found] self.save_contacts() self._print_success(f"Removed: {found} (#{index})") else: self._print_error(f"Contact #{index} not found") # Otherwise try to remove by name elif target in self.contacts: del self.contacts[target] self.save_contacts() self._print_success(f"Removed: {target}") else: self._print_error(f"Not found: {target}") def _handle_reply_command(self, parts): """Handle reply command""" if len(parts) < 2: print("šŸ’” Usage: reply ") if self.last_sender_hash: sender_display = self.format_contact_display(self.last_sender_hash, show_hash=False) print(f"Will reply to: {sender_display}") else: self._print_warning("No recent message to reply to") else: if self.last_sender_hash is None: self._print_error("No recent message to reply to") print(" Receive a message first, then use 'reply'") else: message_text = ' '.join(parts[1:]) self.send_message(self.last_sender_hash, message_text) def _handle_replyto_command(self, parts): """Handle replyto command""" if self.last_sender_hash: sender_display = self.format_contact_display(self.last_sender_hash, show_hash=True) print(f"\nCurrent reply target: {sender_display}\n") else: print("\nNo reply target set") print("Receive a message first\n") def _handle_send_command(self, parts): """Handle send message command""" if len(parts) < 3: print("šŸ’” Usage: send ") else: message_text = ' '.join(parts[2:]) self.send_message(parts[1], message_text) def _handle_messages_command(self, parts): """Handle messages command""" if len(parts) >= 2 and parts[1].lower() == 'user': # View conversation with specific user by index if len(parts) >= 3: try: user_idx = int(parts[2]) # Find the conversation by fixed index target_hash = None for hash_str, conv_idx in self.conversation_indices.items(): if conv_idx == user_idx: target_hash = hash_str break if target_hash: self.show_messages(limit=9999, filter_hash=target_hash) else: self._print_error(f"No conversation with index #{user_idx}. Use 'messages list' to see available conversations") except ValueError: self._print_error("User number must be a valid number") else: print("šŸ’” Usage: messages user <#>") print("Use 'messages list' to see numbered user list") elif len(parts) >= 2 and parts[1].lower() == 'list': # Show list of users with message counts self.show_message_list_with_users() else: # Show recent messages limit = 10 if len(parts) > 1: try: limit = int(parts[1]) except ValueError: self._print_warning("Invalid number, showing last 10 messages") self.show_messages(limit) def _handle_sendpeer_command(self, parts): """Handle sendpeer command""" if len(parts) < 3: print("šŸ’” Usage: sendpeer ") print("Use 'peers' to see the list first") else: message_text = ' '.join(parts[2:]) self.send_to_peer(parts[1], message_text) def _handle_addpeer_command(self, parts): """Handle addpeer command""" if len(parts) < 2: print("šŸ’” Usage: addpeer [custom_name]") print("Use 'peers' to see the list first") else: custom_name = ' '.join(parts[2:]) if len(parts) > 2 else None self.add_peer_to_contacts(parts[1], custom_name) def _handle_discoverannounce_command(self, parts): """Handle discoverannounce command""" if len(parts) < 2: status = "ON" if self.show_announces else "OFF" print(f"\nDiscovery announces: {status}") print("šŸ’” Usage: discoverannounce ") print(" Controls whether new peer discoveries are shown\n") else: setting = parts[1].lower() if setting in ['on', 'yes', 'true', '1']: self.show_announces = True self.save_config() self._print_success("Discovery announces enabled") elif setting in ['off', 'no', 'false', '0']: self.show_announces = False self.save_config() self._print_success("Discovery announces disabled") else: self._print_error("Use 'on' or 'off'") def _handle_blacklist_command(self, parts): """Handle blacklist command""" if len(parts) < 2: self.list_blacklist() else: subcmd = parts[1].lower() if subcmd == 'list': self.list_blacklist() elif subcmd == 'add' and len(parts) >= 3: target = ' '.join(parts[2:]) dest_hash = self.resolve_contact_or_hash(target) if dest_hash: if self.add_to_blacklist(dest_hash): contact_display = self.format_contact_display_short(dest_hash) self._print_success(f"Blacklisted: {contact_display}") else: self._print_error(f"Unknown contact or invalid hash: {target}") elif subcmd == 'remove' and len(parts) >= 3: target = ' '.join(parts[2:]) dest_hash = self.resolve_contact_or_hash(target) if dest_hash: if self.remove_from_blacklist(dest_hash): contact_display = self.format_contact_display_short(dest_hash) self._print_success(f"Unblocked: {contact_display}") else: self._print_error(f"Unknown contact or invalid hash: {target}") elif subcmd == 'clear': confirm = input("Clear entire blacklist? [y/N]: ").strip().lower() if confirm == 'y': count = len(self.blacklist) self.blacklist.clear() self.save_blacklist() self._print_success(f"Cleared {count} entries from blacklist") else: print("Cancelled") else: print("šŸ’” Usage:") print(" blacklist [list] - Show blacklist") print(" blacklist add <#/name> - Block contact/peer") print(" blacklist remove <#/name> - Unblock") print(" blacklist clear - Clear all") def _handle_block_command(self, parts): """Handle block command""" if len(parts) < 2: print("šŸ’” Usage: block ") else: target = ' '.join(parts[1:]) dest_hash = self.resolve_contact_or_hash(target) if dest_hash: if self.add_to_blacklist(dest_hash): contact_display = self.format_contact_display_short(dest_hash) self._print_success(f"Blocked: {contact_display}") else: self._print_error(f"Unknown contact: {target}") def _handle_unblock_command(self, parts): """Handle unblock command""" if len(parts) < 2: print("šŸ’” Usage: unblock ") else: target = ' '.join(parts[1:]) dest_hash = self.resolve_contact_or_hash(target) if dest_hash: if self.remove_from_blacklist(dest_hash): contact_display = self.format_contact_display_short(dest_hash) self._print_success(f"Unblocked: {contact_display}") else: self._print_error(f"Unknown contact: {target}") def _handle_plugin_command(self, parts): """Handle plugin command""" if len(parts) < 2: self.list_plugins() else: subcmd = parts[1].lower() if subcmd == 'list': self.list_plugins() elif subcmd == 'enable' and len(parts) >= 3: plugin_name = parts[2] self.plugins_enabled[plugin_name] = True self.save_plugins_config() self._print_success(f"Plugin {plugin_name} enabled") self._print_warning("Use 'plugin reload' to activate") elif subcmd == 'disable' and len(parts) >= 3: plugin_name = parts[2] self.plugins_enabled[plugin_name] = False self.save_plugins_config() self._print_success(f"Plugin {plugin_name} disabled") self._print_warning("Use 'plugin reload' to deactivate") elif subcmd == 'reload': self.plugins = {} self.load_plugins() self._print_success("Plugins reloaded") else: print("šŸ’” Usage: plugin [list|enable|disable|reload]") def _handle_debug_command(self, parts): """Handle debug command""" print(f"\n=== Debug Info ===") print(f"Suppressed file errors: {self.suppressed_errors}") print(f"Cache dirty: {self.cache_dirty}") print(f"Last cache save: {time.time() - self.last_cache_save:.1f}s ago") print(f"Announced peers: {len(self.announced_peers)}") print(f"Cached display names: {len(self.display_name_cache)}") print() def run(self): """Main command loop with proper async input handling""" self.running = True print(f"\n{Fore.CYAN}Welcome to LXMF Client!{Style.RESET_ALL}" if COLOR_ENABLED else "\nWelcome to LXMF Client!") print(f"{Fore.YELLOW}Type 'help' or 'h' to see available commands{Style.RESET_ALL}\n" if COLOR_ENABLED else "Type 'help' or 'h' to see available commands\n") # Create prompt session session = PromptSession() try: while self.running: try: # Build dynamic prompt with proper formatting with self.messages_lock: if self.messages and self.messages[-1]['direction'] == 'inbound': # Use HTML formatting for prompt_toolkit if COLOR_ENABLED: prompt_text = HTML(' > ') else: prompt_text = "ā— > " else: prompt_text = "> " # Use patch_stdout to allow background prints without corrupting input with patch_stdout(): cmd_line = session.prompt(prompt_text).strip() if not cmd_line: continue parts = cmd_line.split(maxsplit=2) cmd = self.resolve_command(parts[0].lower()) # Check for plugin commands first if self.handle_plugin_command(cmd, parts): continue # Command routing if cmd in ['quit', 'exit']: self.running = False print("Goodbye!") break elif cmd == 'help': self.show_help() elif cmd == 'status': self.show_status() elif cmd == 'settings': self.show_settings_menu() elif cmd == 'address': self._handle_address_command(parts) elif cmd == 'name': self._handle_name_command(parts) elif cmd == 'interval': self._handle_interval_command(parts) elif cmd == 'announce': self._handle_announce_command(parts) elif cmd == 'contacts': self.list_contacts() elif cmd == 'add': self._handle_add_command(parts) elif cmd == 'edit': self._handle_edit_command(parts) elif cmd == 'remove': self._handle_remove_command(parts) elif cmd == 'savecontact': self._handle_savecontact_command(parts) elif cmd == 'reply': self._handle_reply_command(parts) elif cmd == 'replyto': self._handle_replyto_command(parts) elif cmd == 'send': self._handle_send_command(parts) elif cmd == 'messages': self._handle_messages_command(parts) elif cmd == 'stats': self.show_stats() elif cmd == 'peers': self.list_peers() elif cmd == 'sendpeer': self._handle_sendpeer_command(parts) elif cmd == 'addpeer': self._handle_addpeer_command(parts) elif cmd == 'discoverannounce': self._handle_discoverannounce_command(parts) elif cmd == 'blacklist': self._handle_blacklist_command(parts) elif cmd == 'block': self._handle_block_command(parts) elif cmd == 'unblock': self._handle_unblock_command(parts) elif cmd == 'clear': self.clear_screen() elif cmd == 'restart': self.restart_client() break elif cmd == 'plugin': self._handle_plugin_command(parts) elif cmd == 'debug': self._handle_debug_command(parts) else: print(f"Unknown command: {cmd}") print("Type 'help' or 'h' for commands") except EOFError: break except KeyboardInterrupt: print("\nType 'quit' or 'q' to exit") continue except Exception as e: self._print_error(f"Error: {e}") finally: self.shutdown() def main(): # Get responsive width try: width = shutil.get_terminal_size().columns except: width = 60 sep_width = min(width, 60) banner_lines = [ "", " ā–ˆā–ˆā•— ā–ˆā–ˆā•— ā–ˆā–ˆā•—ā–ˆā–ˆā–ˆā•— ā–ˆā–ˆā–ˆā•—ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā•—", " ā–ˆā–ˆā•‘ ā•šā–ˆā–ˆā•—ā–ˆā–ˆā•”ā•ā–ˆā–ˆā–ˆā–ˆā•— ā–ˆā–ˆā–ˆā–ˆā•‘ā–ˆā–ˆā•”ā•ā•ā•ā•ā•", " ā–ˆā–ˆā•‘ ā•šā–ˆā–ˆā–ˆā•”ā• ā–ˆā–ˆā•”ā–ˆā–ˆā–ˆā–ˆā•”ā–ˆā–ˆā•‘ā–ˆā–ˆā–ˆā–ˆā–ˆā•— ", " ā–ˆā–ˆā•‘ ā–ˆā–ˆā•”ā–ˆā–ˆā•— ā–ˆā–ˆā•‘ā•šā–ˆā–ˆā•”ā•ā–ˆā–ˆā•‘ā–ˆā–ˆā•”ā•ā•ā• ", " ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā•—ā–ˆā–ˆā•”ā• ā–ˆā–ˆā•—ā–ˆā–ˆā•‘ ā•šā•ā• ā–ˆā–ˆā•‘ā–ˆā–ˆā•‘ ", " ā•šā•ā•ā•ā•ā•ā•ā•ā•šā•ā• ā•šā•ā•ā•šā•ā• ā•šā•ā•ā•šā•ā• ", "", "Interactive LXMF Client" ] print("\n" + "─"*sep_width) if COLOR_ENABLED: for line in banner_lines: centered = line.center(sep_width) print(f"{Fore.WHITE}{Style.BRIGHT}{centered}{Style.RESET_ALL}") else: for line in banner_lines: centered = line.center(sep_width) print(centered) print("─"*sep_width + "\n") client = LXMFClient() client.run() if __name__ == "__main__": main()