Files
F 3a0f5d6ea9 Fix 32 hex chars hash error
Fix 32 hex chars hash error bug in edit contact and clean target functions
2025-12-15 19:14:57 +01:00

3366 lines
142 KiB
Python

#!/usr/bin/env python3
"""
Terminal-Based Interactive LXMF Messaging Client
"""
import RNS
import LXMF
import time
import os
import json
import threading
from datetime import datetime
import shutil
import traceback
import itertools
import subprocess
import sys
import platform
from prompt_toolkit import PromptSession
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.formatted_text import HTML
try:
from colorama import init, Fore, Style, just_fix_windows_console # type: ignore
# Different approach for Windows
import platform
if platform.system() == 'Windows':
just_fix_windows_console()
else:
init(autoreset=True)
COLOR_ENABLED = True
except ImportError:
COLOR_ENABLED = False
class Fore:
RED = GREEN = YELLOW = CYAN = MAGENTA = BLUE = WHITE = ""
class Style:
BRIGHT = RESET_ALL = ""
class LXMFClient:
def __init__(self, identity_path="./lxmf_client_identity", storage_path="./lxmf_client_storage"):
self.identity_path = identity_path
self.storage_path = storage_path
self.messages_path = os.path.join(storage_path, "messages")
self.contacts = {}
self.contacts_file = os.path.join(storage_path, "contacts.json")
self.config_file = os.path.join(storage_path, "config.json")
self.messages = []
self.messages_lock = threading.Lock()
self.running = False
self.last_sender_hash = None
self.last_sender_name = None
self.display_name = None
self.announce_interval = 300
self.auto_announce_enabled = True
self.stop_event = threading.Event()
self.show_announces = True
self.start_time = time.time()
# Notification settings
self.notify_sound = True # Platform-specific sounds (beeps/melody)
self.notify_bell = True # Terminal bell
self.notify_visual = True # Visual flash effect
# Track pending messages
self.pending_messages = {}
# Cache for display names from announces
self.display_name_cache = {}
self.cache_file = os.path.join(storage_path, "display_names.json")
self.cache_dirty = False
self.last_cache_save = time.time()
# Debug: Track suppressed errors
self.suppressed_errors = 0
# Track announced LXMF peers with fixed index numbers
self.announced_peers = {}
self.peers_lock = threading.Lock()
self.next_peer_index = 1
# Track contacts with fixed index numbers
self.next_contact_index = 1
# Track conversations with fixed index numbers
self.conversation_indices = {}
self.next_conversation_index = 1
self.conversations_file = os.path.join(storage_path, "conversations.json")
# Blacklist system
self.blacklist = set() # Set of blocked destination hashes
self.blacklist_file = os.path.join(storage_path, "blacklist.json")
# Plugin system
self.plugins = {}
self.plugins_dir = os.path.join(storage_path, "plugins")
self.plugins_enabled = {}
self.plugins_config_file = os.path.join(storage_path, "plugins_config.json")
# Stamp cost settings
self.stamp_cost = 0 # 0 = disabled
self.stamp_cost_enabled = False
self.ignore_invalid_stamps = True # Reject messages with invalid stamps
# Command aliases
self.command_aliases = {
'h': 'help',
's': 'send',
're': 'reply',
'm': 'messages',
'c': 'contacts',
'a': 'add',
'e': 'edit',
'rm': 'remove',
'p': 'peers',
'sp': 'sendpeer',
'ap': 'addpeer',
'st': 'stats',
'addr': 'address',
'n': 'name',
'i': 'interval',
'cls': 'clear',
'r': 'restart',
'q': 'quit',
'set': 'settings',
'bl': 'blacklist',
'ann': 'announce',
'save': 'savecontact',
}
self.Fore = Fore
self.Style = Style
os.makedirs(storage_path, exist_ok=True)
os.makedirs(self.messages_path, exist_ok=True)
# === LOAD CONFIGURATION FIRST (before Reticulum) ===
self.load_config()
# === NOW INITIALIZE RETICULUM ===
self._print_color("🌐 Initializing Reticulum...", Fore.CYAN)
self.reticulum = RNS.Reticulum()
# Load or create identity
if os.path.exists(identity_path):
self.identity = RNS.Identity.from_file(identity_path)
self._print_success("Loaded identity")
else:
self.identity = RNS.Identity()
self.identity.to_file(identity_path)
self._print_success("Created new identity")
# Load display name cache
self.load_display_name_cache()
# Create LXMF router
lxmf_storage = os.path.join(storage_path, "lxmf_router")
os.makedirs(lxmf_storage, exist_ok=True)
self.router = LXMF.LXMRouter(
identity=self.identity,
storagepath=lxmf_storage
)
self._print_success(f"LXMF storage: {lxmf_storage}")
# Register destination with display name
self.destination = self.router.register_delivery_identity(
self.identity,
display_name=self.display_name
)
# Configure stamp cost on the destination
if self.stamp_cost_enabled and self.stamp_cost > 0:
try:
# Set the stamp cost directly on the destination
if hasattr(self.destination, 'stamp_cost'):
setattr(self.destination, 'stamp_cost', self.stamp_cost) # type: ignore
self._print_success(f"Stamp cost configured: {self.stamp_cost} bits")
# Force an announce so the stamp cost is advertised
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success("📡 Announced with stamp cost")
except Exception as e:
self._print_warning(f"Could not set stamp cost: {e}")
# Register callbacks
self.router.register_delivery_callback(self.on_message_received)
# Register announce handler to capture display names
self.register_announce_handler()
# Load contacts and messages
self.load_contacts()
self.load_messages()
self.load_conversation_indices()
self.load_blacklist()
# Load plugins
self.load_plugins()
# Setup thread exception handler
threading.excepthook = self.thread_exception_handler
# Show info
import shutil
try:
width = shutil.get_terminal_size().columns
except:
width = 60
sep_width = min(width, 60)
print(f"\n{''*sep_width}")
self._print_color(f"Display Name: {self.display_name}", Fore.GREEN + Style.BRIGHT)
if hasattr(self.destination, 'hash'):
self._print_color(f"LXMF Address: {RNS.prettyhexrep(self.destination.hash)}", Fore.CYAN) # type: ignore
self._print_color(f"Auto-announce: Every {self.announce_interval} seconds", Fore.YELLOW)
# Show stamp cost status
if self.stamp_cost_enabled and self.stamp_cost > 0:
self._print_color(f"Stamp Cost: ENABLED ({self.stamp_cost} bits)", Fore.RED + Style.BRIGHT)
else:
self._print_color(f"Stamp Cost: DISABLED", Fore.WHITE)
print(f"{''*sep_width}\n")
# Initial announce (this will now include stamp cost)
self._print_color("📡 Announcing to network...", Fore.CYAN)
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success("Initial announce complete")
# Start background threads
self.announce_thread = threading.Thread(target=self.announce_loop, daemon=True)
self.announce_thread.start()
self.router_thread = threading.Thread(target=self.router_job_loop, daemon=True)
self.router_thread.start()
def resolve_contact_or_hash(self, target):
"""
Resolve a contact name, number, or hash to a destination hash.
Returns normalized hash string or None if not found.
"""
if not target:
return None
# First, check if it's a direct hash (32 hex chars, possibly with colons/brackets)
clean_target = target.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
if len(clean_target) == 32: # Valid hash length (16 bytes = 32 hex chars)
# Validate it's actually hex
try:
bytes.fromhex(clean_target)
return clean_target
except ValueError:
# Not valid hex, continue to other resolution methods
pass
# Try to parse as contact index number
try:
contact_idx = int(target)
# Search contacts by index
for name, data in self.contacts.items():
if data.get('index') == contact_idx:
return data['hash'].replace(":", "").replace(" ", "").lower()
# Search conversation indices
for hash_str, conv_idx in self.conversation_indices.items():
if conv_idx == contact_idx:
return hash_str.replace(":", "").replace(" ", "").lower()
# Search peers by index
with self.peers_lock:
for hash_str, peer_data in self.announced_peers.items():
if peer_data.get('index') == contact_idx:
return hash_str.replace(":", "").replace(" ", "").lower()
return None
except ValueError:
# Not a number, treat as contact name
pass
# Search by contact name
target_lower = target.lower()
for name, data in self.contacts.items():
if name.lower() == target_lower:
return data['hash'].replace(":", "").replace(" ", "").lower()
# Search by display name in peers
with self.peers_lock:
for hash_str, peer_data in self.announced_peers.items():
display_name = peer_data.get('display_name', '')
if display_name.lower() == target_lower:
return hash_str.replace(":", "").replace(" ", "").lower()
return None
def load_plugins(self):
"""Load all enabled plugins from plugins directory"""
import importlib.util
import sys
if not os.path.exists(self.plugins_dir):
return
# Load plugin configuration
if os.path.exists(self.plugins_config_file):
try:
with open(self.plugins_config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
self.plugins_enabled = config.get('enabled', {})
except Exception as e:
self._print_warning(f"Error loading plugin config: {e}")
# Scan plugins directory
for filename in os.listdir(self.plugins_dir):
if filename.endswith('.py') and not filename.startswith('_'):
plugin_name = filename[:-3]
# Check if plugin is enabled (default to enabled)
if not self.plugins_enabled.get(plugin_name, True):
continue
try:
# Load the plugin module
plugin_path = os.path.join(self.plugins_dir, filename)
spec = importlib.util.spec_from_file_location(plugin_name, plugin_path)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
sys.modules[plugin_name] = module
spec.loader.exec_module(module)
else:
continue
# Get plugin class
if hasattr(module, 'Plugin'):
plugin_instance = module.Plugin(self)
self.plugins[plugin_name] = plugin_instance
self._print_success(f"Loaded plugin: {plugin_name}")
else:
self._print_warning(f"Plugin {plugin_name} has no Plugin class")
except Exception as e:
self._print_warning(f"Failed to load plugin {plugin_name}: {e}")
def save_plugins_config(self):
"""Save plugin configuration"""
try:
config = {'enabled': self.plugins_enabled}
with open(self.plugins_config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
except Exception as e:
self._print_warning(f"Error saving plugin config: {e}")
def handle_plugin_command(self, cmd, parts):
"""Check if command should be handled by a plugin"""
for plugin_name, plugin in self.plugins.items():
if hasattr(plugin, 'commands') and cmd in plugin.commands:
try:
plugin.handle_command(cmd, parts)
return True
except Exception as e:
self._print_error(f"Plugin {plugin_name} error: {e}")
return True
return False
def handle_plugin_message(self, message, msg_data):
"""Let plugins process incoming messages"""
for plugin_name, plugin in self.plugins.items():
try:
if hasattr(plugin, 'on_message'):
# Plugin can return True to indicate it handled the message
if plugin.on_message(message, msg_data):
return True
except Exception as e:
self._print_warning(f"Plugin {plugin_name} message handler error: {e}")
return False
def list_plugins(self):
"""List all available plugins"""
import shutil
import os
try:
width = min(shutil.get_terminal_size().columns, 80)
except:
width = 80
print(f"\n{''*width}")
self._print_color("PLUGINS", Fore.CYAN + Style.BRIGHT)
print(f"{''*width}")
# Scan plugins directory for all .py files
available_plugins = {}
if os.path.exists(self.plugins_dir):
for filename in os.listdir(self.plugins_dir):
if filename.endswith('.py') and not filename.startswith('_'):
plugin_name = filename[:-3]
available_plugins[plugin_name] = {
'loaded': plugin_name in self.plugins,
'enabled': self.plugins_enabled.get(plugin_name, True),
'instance': self.plugins.get(plugin_name)
}
if not available_plugins:
print("\nNo plugins found")
print(f"Place plugin files in: {self.plugins_dir}\n")
return
print(f"\n{'Plugin':<20} {'Status':<15} {'Description'}")
print(f"{''*20} {''*15} {''*30}")
for plugin_name, info in sorted(available_plugins.items()):
# Determine status
if info['loaded'] and info['enabled']:
status = f"{Fore.GREEN}Loaded{Style.RESET_ALL}"
elif info['enabled'] and not info['loaded']:
status = f"{Fore.YELLOW}Enabled (reload){Style.RESET_ALL}"
else:
status = f"{Fore.RED}Disabled{Style.RESET_ALL}"
# Get description
if info['instance']:
description = getattr(info['instance'], 'description', 'No description')
else:
description = "Not loaded"
# Truncate description if too long
if len(description) > 30:
description = description[:27] + "..."
print(f"{plugin_name:<20} {status:<25} {description}")
print(f"{''*width}")
self._print_color("\n💡 Commands:", Fore.YELLOW)
print(" plugin enable <name> - Enable a plugin")
print(" plugin disable <name> - Disable a plugin")
print(" plugin reload - Reload all plugins")
print()
def load_blacklist(self):
"""Load blacklist from file"""
if os.path.exists(self.blacklist_file):
try:
with open(self.blacklist_file, 'r', encoding='utf-8') as f:
blacklist_data = json.load(f)
# Convert list to set and normalize hashes
self.blacklist = set(hash_str.replace(":", "").replace(" ", "").lower()
for hash_str in blacklist_data)
if self.blacklist:
self._print_success(f"Loaded {len(self.blacklist)} blocked addresses")
except Exception as e:
self._print_warning(f"Error loading blacklist: {e}")
def save_blacklist(self):
"""Save blacklist to file"""
try:
# Convert set to sorted list for JSON
blacklist_list = sorted(list(self.blacklist))
with open(self.blacklist_file, 'w', encoding='utf-8') as f:
json.dump(blacklist_list, f, indent=2)
except Exception as e:
self._print_warning(f"Error saving blacklist: {e}")
def is_blacklisted(self, destination_hash):
"""Check if a destination hash is blacklisted"""
if not destination_hash:
return False
# Normalize the hash for comparison
normalized = destination_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
return normalized in self.blacklist
def add_to_blacklist(self, destination_hash):
"""Add a destination hash to the blacklist"""
if not destination_hash:
self._print_error("Invalid destination hash")
return False
# Normalize the hash
normalized = destination_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
if normalized in self.blacklist:
self._print_warning("Already blacklisted")
return False
self.blacklist.add(normalized)
self.save_blacklist()
return True
def remove_from_blacklist(self, destination_hash):
"""Remove a destination hash from the blacklist"""
if not destination_hash:
self._print_error("Invalid destination hash")
return False
# Normalize the hash
normalized = destination_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
if normalized not in self.blacklist:
self._print_warning("Not in blacklist")
return False
self.blacklist.remove(normalized)
self.save_blacklist()
return True
def list_blacklist(self):
"""List all blacklisted addresses"""
import shutil
try:
width = min(shutil.get_terminal_size().columns, 80)
except:
width = 80
if not self.blacklist:
print("\nNo blacklisted addresses\n")
return
print(f"\n{''*width}")
self._print_color("BLACKLIST", Fore.RED + Style.BRIGHT)
print(f"{''*width}")
# Sort for consistent display
sorted_blacklist = sorted(self.blacklist)
print(f"\n{'#':<5} {'Hash':<32} {'Display Name'}")
print(f"{''*5} {''*32} {''*30}")
for idx, hash_str in enumerate(sorted_blacklist, 1):
# Try to get display name for this hash
display_name = self.get_lxmf_display_name(hash_str)
contact_name = self.get_contact_name_by_hash(hash_str)
if contact_name:
name_display = f"{contact_name} ({display_name})" if display_name else contact_name
elif display_name:
name_display = display_name
else:
name_display = "<unknown>"
# Truncate if too long
if len(name_display) > 30:
name_display = name_display[:27] + "..."
print(f"{idx:<5} {hash_str[:32]:<32} {name_display}")
print(f"{''*width}")
self._print_color(f"\n💡 Total blocked: {len(self.blacklist)}", Fore.YELLOW)
print()
def get_terminal_width(self, default=70, max_width=90):
"""Get terminal width with safe defaults for mobile"""
try:
import shutil
width = shutil.get_terminal_size().columns
# On very narrow screens (mobile), use smaller width
if width < 60:
return min(width - 2, 50) # Leave margin, cap at 50
return min(width, max_width)
except:
return default
def load_conversation_indices(self):
"""Load conversation indices from file"""
if os.path.exists(self.conversations_file):
try:
with open(self.conversations_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.conversation_indices = data.get('indices', {})
self.next_conversation_index = data.get('next_index', 1)
if self.conversation_indices:
self._print_success(f"Loaded {len(self.conversation_indices)} conversation indices")
except Exception as e:
self._print_warning(f"Error loading conversation indices: {e}")
def save_conversation_indices(self):
"""Save conversation indices to file"""
try:
data = {
'indices': self.conversation_indices,
'next_index': self.next_conversation_index
}
with open(self.conversations_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
except Exception as e:
self._print_warning(f"Error saving conversation indices: {e}")
def assign_conversation_index(self, hash_str):
"""Assign a fixed index to a conversation if it doesn't have one"""
clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
if clean_hash not in self.conversation_indices:
self.conversation_indices[clean_hash] = self.next_conversation_index
self.next_conversation_index += 1
self.save_conversation_indices()
return self.conversation_indices[clean_hash]
def register_announce_handler(self):
"""Register handler to capture display names from announces"""
class LXMFPeerAnnounceHandler:
def __init__(self, client):
self.client = client
self.aspect_filter = "lxmf.delivery"
def received_announce(self, destination_hash, announced_identity, app_data):
"""Called when an LXMF delivery announce is received"""
try:
if app_data:
display_name = LXMF.display_name_from_app_data(app_data)
if display_name and isinstance(display_name, str):
hash_str = RNS.prettyhexrep(destination_hash)
clean_hash = hash_str.replace(":", "").replace(" ", "").lower()
with self.client.peers_lock:
is_new_peer = clean_hash not in self.client.announced_peers
if is_new_peer:
peer_index = self.client.next_peer_index
self.client.next_peer_index += 1
self.client.announced_peers[clean_hash] = {
'display_name': display_name,
'last_seen': time.time(),
'index': peer_index
}
else:
self.client.announced_peers[clean_hash]['display_name'] = display_name
self.client.announced_peers[clean_hash]['last_seen'] = time.time()
self.client.cache_display_name(hash_str, display_name)
# Show discovery if enabled AND it's truly new
if is_new_peer and self.client.show_announces:
# Check if already a contact
is_contact = self.client.get_contact_name_by_hash(clean_hash) != clean_hash
# Use plain text - no ANSI codes in background threads
print(f"\n📡 New Announce: {display_name}")
print(f"🔗 {hash_str}")
if not is_contact:
# Get peer index for this new peer
peer_idx = self.client.announced_peers[clean_hash]['index']
print(f"💡 Quick save: 'ap {peer_idx}' | Send: 'sp {peer_idx} <msg>'")
if is_contact:
# Get peer index for this new peer
peer_idx = self.client.announced_peers[clean_hash]['index']
print(f"💡 Quick Send: 'sp {peer_idx} <msg>'")
except Exception:
pass
self.peer_announce_handler = LXMFPeerAnnounceHandler(self)
RNS.Transport.register_announce_handler(self.peer_announce_handler)
self._print_success("LXMF peer announce handler registered")
def _print_color(self, text, color=""):
"""Print with color if available"""
if COLOR_ENABLED:
print(f"{color}{text}{Style.RESET_ALL}")
else:
print(text)
def _print_success(self, text):
"""Print success message"""
self._print_color(f"{text}", Fore.GREEN)
def _print_error(self, text):
"""Print error message"""
self._print_color(f"{text}", Fore.RED)
def _print_warning(self, text):
"""Print warning message"""
self._print_color(f"{text}", Fore.YELLOW)
def thread_exception_handler(self, args):
"""Log thread exceptions"""
# Count and skip file-related errors
if isinstance(args.exc_value, (PermissionError, FileNotFoundError, OSError)):
self.suppressed_errors += 1
return
# Log other unexpected errors
self._print_warning(f"Thread error: {args.exc_type.__name__}: {args.exc_value}")
print("> ", end="", flush=True)
def load_display_name_cache(self):
"""Load cached display names"""
if os.path.exists(self.cache_file):
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
self.display_name_cache = json.load(f)
if self.display_name_cache:
self._print_success(f"Loaded {len(self.display_name_cache)} cached display names")
except Exception as e:
self._print_warning(f"Error loading display name cache: {e}")
self.display_name_cache = {}
else:
self.display_name_cache = {}
def cache_display_name(self, hash_str, display_name):
"""Cache a display name for a hash"""
if display_name and isinstance(display_name, str) and display_name.strip():
# Normalize hash format (remove colons, spaces, brackets)
clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
self.display_name_cache[clean_hash] = display_name.strip()
self.cache_dirty = True
def save_display_name_cache(self):
"""Save display name cache (only if dirty and enough time passed)"""
# Only save if cache has changed and at least 5 seconds since last save
if not self.cache_dirty:
return
current_time = time.time()
if current_time - self.last_cache_save < 5:
return # Too soon, skip save
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.display_name_cache, f, indent=2, ensure_ascii=False)
self.cache_dirty = False
self.last_cache_save = current_time
except Exception as e:
# Will retry on next call
pass
def router_job_loop(self):
"""Continuously process router jobs"""
last_periodic_save = time.time()
while not self.stop_event.is_set():
try:
if hasattr(self.router, 'jobs'):
self.router.jobs()
if hasattr(self.router, 'process_outbound'):
if not getattr(self.router, 'processing_outbound', False):
self.router.process_outbound()
# Periodic saves (every 10 seconds)
current_time = time.time()
if current_time - last_periodic_save > 10:
self.save_display_name_cache()
last_periodic_save = current_time
time.sleep(0.1)
except Exception:
pass
def announce_loop(self):
"""Periodically announce destination"""
while not self.stop_event.is_set():
if self.stop_event.wait(self.announce_interval):
break
# CHECK IF AUTO-ANNOUNCE IS ENABLED
if not self.stop_event.is_set() and self.auto_announce_enabled:
if hasattr(self.destination, 'announce'):
try:
self.destination.announce() # type: ignore
timestamp = datetime.now().strftime('%H:%M:%S')
print(f"\n[Auto-announced at {timestamp}]")
except Exception as e:
print(f"\n[Auto-announce failed: {e}]")
def load_config(self):
"""Load configuration from file"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
self.display_name = config.get('display_name', 'Anonymous')
self.announce_interval = config.get('announce_interval', 300)
self.auto_announce_enabled = config.get('auto_announce_enabled', True) # ADD THIS
self.show_announces = config.get('show_announces', True)
# Load notification settings
self.notify_sound = config.get('notify_sound', True)
self.notify_bell = config.get('notify_bell', True)
self.notify_visual = config.get('notify_visual', True)
# Load stamp cost settings
self.stamp_cost_enabled = config.get('stamp_cost_enabled', False)
self.stamp_cost = config.get('stamp_cost', 0)
self.ignore_invalid_stamps = config.get('ignore_invalid_stamps', False)
return
except Exception as e:
self._print_warning(f"Error loading config: {e}")
# === FIRST TIME SETUP ===
try:
width = shutil.get_terminal_size().columns
except:
width = 60
sep_width = min(width, 60)
print(f"\n{''*sep_width}")
self._print_color("FIRST TIME SETUP", Fore.CYAN + Style.BRIGHT)
print(f"{''*sep_width}\n")
self._print_color("Welcome to LXMF Client!", Fore.GREEN)
print("Let's get you set up with a display name.\n")
# Ask for display name
while True:
try:
name = input(f"{Fore.YELLOW}Enter your display name: {Style.RESET_ALL}" if COLOR_ENABLED else "Enter your display name: ").strip()
if name:
self.display_name = name
break
else:
self._print_warning("Display name cannot be empty. Please try again.")
except KeyboardInterrupt:
print("\n")
self._print_warning("Using default name: Anonymous")
self.display_name = 'Anonymous'
break
except:
self.display_name = 'Anonymous'
break
print()
self._print_success(f"Display name set to: {self.display_name}")
# Ask for announce interval (optional)
print(f"\n{Fore.CYAN}Auto-announce interval:{Style.RESET_ALL}" if COLOR_ENABLED else "\nAuto-announce interval:")
print("This determines how often your presence is announced to the network.")
try:
interval_str = input(f"{Fore.YELLOW}Interval in seconds [300]: {Style.RESET_ALL}" if COLOR_ENABLED else "Interval in seconds [300]: ").strip()
if interval_str and interval_str.isdigit():
self.announce_interval = max(30, int(interval_str))
else:
self.announce_interval = 300
except:
self.announce_interval = 300
self._print_success(f"Announce interval set to: {self.announce_interval}s")
print(f"\n{''*sep_width}")
self._print_color("Setup complete! Initializing...", Fore.GREEN)
print(f"{''*sep_width}\n")
# Save configuration
self.save_config()
def save_config(self):
"""Save configuration to file"""
try:
config = {
'display_name': self.display_name,
'announce_interval': self.announce_interval,
'auto_announce_enabled': self.auto_announce_enabled, # ADD THIS
'show_announces': self.show_announces,
'notify_sound': self.notify_sound,
'notify_bell': self.notify_bell,
'notify_visual': self.notify_visual,
'stamp_cost_enabled': self.stamp_cost_enabled,
'stamp_cost': self.stamp_cost,
'ignore_invalid_stamps': self.ignore_invalid_stamps
}
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
except Exception as e:
self._print_warning(f"Error saving config: {e}")
def load_messages(self):
"""Load all messages from the messages folder"""
try:
all_messages = []
for filename in os.listdir(self.messages_path):
if filename.endswith('.json'):
filepath = os.path.join(self.messages_path, filename)
try:
with open(filepath, 'r', encoding='utf-8') as f:
message = json.load(f)
all_messages.append(message)
except Exception as e:
self._print_warning(f"Error loading message {filename}: {e}")
all_messages.sort(key=lambda x: x.get('timestamp', 0))
with self.messages_lock:
self.messages = all_messages
if all_messages:
self._print_success(f"Loaded {len(all_messages)} messages")
except Exception as e:
self._print_warning(f"Error loading messages: {e}")
def save_message(self, msg_data):
"""Save a single message to its own file"""
try:
timestamp = msg_data['timestamp']
direction = msg_data['direction']
filename = f"{int(timestamp)}_{direction}.json"
filepath = os.path.join(self.messages_path, filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(msg_data, f, indent=2, ensure_ascii=False)
except Exception as e:
self._print_warning(f"Error saving message: {e}")
def get_lxmf_display_name(self, hash_str):
"""Get LXMF display name from cache or by querying announce data"""
# Normalize hash format - remove all separators and angle brackets
clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
# Check cache first - try both with and without formatting
if clean_hash in self.display_name_cache:
return self.display_name_cache[clean_hash]
# Also check if cache has it with colons (legacy format)
for cached_hash, cached_name in self.display_name_cache.items():
if cached_hash.replace(":", "").replace(" ", "").lower() == clean_hash:
# Found it with different formatting, normalize the cache
self.display_name_cache[clean_hash] = cached_name
return cached_name
# Try to get from Reticulum's stored announce data
try:
hash_bytes = bytes.fromhex(clean_hash)
app_data = RNS.Identity.recall_app_data(hash_bytes)
if app_data:
# Use LXMF's helper function to extract display name
display_name = LXMF.display_name_from_app_data(app_data)
if display_name:
self.cache_display_name(clean_hash, display_name)
return display_name
except Exception as e:
pass
return None
def on_message_received(self, message):
"""Callback when message is received"""
try:
source_hash_str = RNS.prettyhexrep(message.source_hash)
if self.is_blacklisted(source_hash_str):
print(f"\n[BLOCKED] Message from blacklisted address: {source_hash_str}")
sender_display = self.get_lxmf_display_name(source_hash_str)
if sender_display:
print(f" Display name: {sender_display}")
return
# Validate stamp cost if enabled
if self.stamp_cost_enabled and self.stamp_cost > 0:
pass
content = message.content
if isinstance(content, bytes):
content = content.decode('utf-8', errors='replace')
title = message.title
if isinstance(title, bytes):
title = title.decode('utf-8', errors='replace')
sender_display_name = self.get_lxmf_display_name(source_hash_str)
msg_data = {
'timestamp': message.timestamp,
'source_hash': source_hash_str,
'title': title,
'content': content,
'direction': 'inbound',
'display_name': sender_display_name
}
if self.handle_plugin_message(message, msg_data):
return
with self.messages_lock:
self.messages.append(msg_data)
self.last_sender_hash = msg_data['source_hash']
self.last_sender_name = self.get_contact_name_by_hash(msg_data['source_hash'])
self.save_message(msg_data)
sender_display = self.format_contact_display(msg_data['source_hash'], show_hash=True)
# Check if sender is in contacts
is_saved_contact = self.get_contact_name_by_hash(source_hash_str) != source_hash_str
# Trigger notification
self.notify_new_message()
try:
width = min(shutil.get_terminal_size().columns, 60)
except:
width = 60
# SIMPLE PRINTS - NO COLOR CODES IN STRINGS
print(f"\n{''*width}")
timestamp = datetime.fromtimestamp(message.timestamp).strftime('%H:%M:%S')
print(f"📨 [{timestamp}] NEW MESSAGE from: {self.format_contact_display_short(msg_data['source_hash'])}")
print(f"{''*width}")
if title:
print(f"Title: {title}")
if content:
print(f"\n{content}")
print(f"{''*width}")
# SHOW APPROPRIATE TIP BASED ON CONTACT STATUS
if is_saved_contact:
print(f"💡 Type 'reply <message>' or 're <message>' to respond")
else:
# Not in contacts - show save command
print(f"💡 Reply: 're <msg>' | Save contact: 'save' or 'savecontact'")
print(f"{''*width}\n")
except Exception as e:
print(f"❌ Error processing message: {e}")
def load_contacts(self):
"""Load contacts from file"""
if os.path.exists(self.contacts_file):
try:
with open(self.contacts_file, 'r', encoding='utf-8') as f:
self.contacts = json.load(f)
# Assign indices to contacts that don't have them
needs_save = False
for name, data in self.contacts.items():
if 'index' not in data:
data['index'] = self.next_contact_index
self.next_contact_index += 1
needs_save = True
else:
# Update next_contact_index to be higher than any existing index
if data['index'] >= self.next_contact_index:
self.next_contact_index = data['index'] + 1
if needs_save:
self.save_contacts()
if self.contacts:
self._print_success(f"Loaded {len(self.contacts)} contacts")
except Exception as e:
self._print_warning(f"Error loading contacts: {e}")
def load_announced_peers_from_cache(self):
"""Load already announced LXMF peers from Reticulum's identity cache"""
try:
# Try to access Reticulum's stored announces
if hasattr(RNS.Transport, 'announces'):
print(f"[DEBUG] Checking Transport.announces...")
announces_dict = getattr(RNS.Transport, 'announces', {})
for destination_hash, announce_data in announces_dict.items():
try:
hash_str = RNS.prettyhexrep(destination_hash)
# Try to get app_data
app_data = RNS.Identity.recall_app_data(destination_hash)
if app_data:
display_name = LXMF.display_name_from_app_data(app_data)
if display_name and isinstance(display_name, str):
clean_hash = hash_str.replace(":", "").replace(" ", "").lower()
with self.peers_lock:
self.announced_peers[clean_hash] = {
'display_name': display_name,
'last_seen': time.time()
}
print(f"[DEBUG] Loaded cached peer: {display_name} <{hash_str}>")
except Exception as e:
pass
if self.announced_peers:
self._print_success(f"Loaded {len(self.announced_peers)} announced peers from cache")
except Exception as e:
print(f"[DEBUG] Error loading cached peers: {e}")
def save_contacts(self):
"""Save contacts to file"""
try:
with open(self.contacts_file, 'w', encoding='utf-8') as f:
json.dump(self.contacts, f, indent=2)
self._print_success("Contacts saved")
except Exception as e:
self._print_error(f"Error saving contacts: {e}")
def get_contact_name_by_hash(self, hash_str: str) -> str:
"""Get contact name from hash string, return hash if not found"""
clean_hash = hash_str.replace(":", "").replace("<", "").replace(">", "").strip().lower()
for name, data in self.contacts.items():
stored_hash = data['hash'].replace(":", "").strip().lower()
if stored_hash == clean_hash:
return name
return hash_str
def format_contact_display(self, hash_str, show_hash=True):
"""
Format contact for display with priority:
1. LXMF display name (from network announces)
2. Contact nickname (from local contacts list)
3. Hash address (fallback)
If both display name and nickname exist, show: Nickname (Display Name)
"""
nickname = self.get_contact_name_by_hash(hash_str)
display_name = self.get_lxmf_display_name(hash_str)
# Check if we have a saved contact (nickname != hash means we found a contact)
has_contact = (nickname != hash_str)
if has_contact and display_name:
# Both nickname and display name exist
if display_name != nickname:
# They're different, show both
if show_hash:
return f"{nickname} ({display_name}) <{hash_str}>"
else:
return f"{nickname} ({display_name})"
else:
# They're the same, just show one
if show_hash:
return f"{nickname} <{hash_str}>"
else:
return nickname
elif has_contact:
# Only nickname exists (no display name from network)
if show_hash:
return f"{nickname} <{hash_str}>"
else:
return nickname
elif display_name:
# Only display name exists (no saved contact)
if show_hash:
return f"{display_name} <{hash_str}>"
else:
return display_name
else:
# Nothing exists, just show hash
return hash_str
def format_contact_display_short(self, hash_str):
"""
Short format for display - prioritizes readability over showing hash.
Priority: Display Name > Nickname > Hash
"""
display_name = self.get_lxmf_display_name(hash_str)
if display_name:
return display_name
nickname = self.get_contact_name_by_hash(hash_str)
if nickname != hash_str:
return nickname
return hash_str
def add_contact(self, name, hash_str):
"""Add a contact"""
clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "")
self.contacts[name] = {
'hash': clean_hash,
'index': self.next_contact_index
}
self.next_contact_index += 1
self.save_contacts()
self._print_success(f"Added contact: {name}")
display_name = self.get_lxmf_display_name(clean_hash)
if display_name:
print(f" Display name: {display_name}")
def edit_contact(self, identifier):
"""Edit an existing contact's name or hash"""
# Find contact by name or index
target_contact = None
contact_name = None
# Try as index first
try:
idx = int(identifier)
for name, data in self.contacts.items():
if data.get('index') == idx:
target_contact = data
contact_name = name
break
except ValueError:
# Not a number, try as name
if identifier in self.contacts:
target_contact = self.contacts[identifier]
contact_name = identifier
if not target_contact:
self._print_error(f"Contact not found: {identifier}")
print("Use 'contacts' to see the list")
return
current_hash = target_contact['hash']
current_index = target_contact.get('index', '?')
display_name = self.get_lxmf_display_name(current_hash)
print(f"\n{''*60}")
self._print_color(f"EDITING CONTACT: {contact_name}", Fore.CYAN + Style.BRIGHT)
print(f"{''*60}")
print(f"Current name: {contact_name}")
print(f"Current hash: {current_hash}")
if display_name:
print(f"LXMF display name: {display_name}")
print(f"Index: #{current_index}")
print(f"{''*60}\n")
print("What would you like to edit?")
print(" [1] Change nickname")
print(" [2] Change LXMF address (hash)")
print(" [3] Both")
print(" [c] Cancel")
choice = input("\nSelect option: ").strip().lower()
new_name = contact_name
new_hash = current_hash
if choice in ['1', '3']:
# Edit name
name_input = input(f"\nEnter new nickname [{contact_name}]: ").strip()
if name_input:
# Check if name already exists
if name_input in self.contacts and name_input != contact_name:
self._print_error(f"Contact '{name_input}' already exists!")
return
new_name = name_input
if choice in ['2', '3']:
# Edit hash
hash_input = input(f"\nEnter new LXMF address [{current_hash}]: ").strip()
if hash_input:
# Validate hash
clean_hash = hash_input.replace(":", "").replace(" ", "").replace("<", "").replace(">", "")
if len(clean_hash) == 32:
try:
bytes.fromhex(clean_hash)
new_hash = clean_hash
except ValueError:
self._print_error("Invalid hash format!")
return
else:
self._print_error("Hash must be 32 hex characters!")
return
if choice == 'c':
print("Cancelled")
return
if choice not in ['1', '2', '3']:
self._print_error("Invalid option")
return
# Confirm changes
print(f"\n{''*60}")
print("CONFIRM CHANGES:")
if new_name != contact_name:
print(f" Name: {contact_name}{new_name}")
if new_hash != current_hash:
print(f" Hash: {current_hash[:16]}... → {new_hash[:16]}...")
print(f"{''*60}")
confirm = input("\nSave changes? [y/N]: ").strip().lower()
if confirm == 'y':
# Remove old contact
del self.contacts[contact_name]
# Add updated contact (keep same index)
self.contacts[new_name] = {
'hash': new_hash,
'index': current_index
}
self.save_contacts()
self._print_success(f"Contact updated: {new_name}")
# Show new display name if hash changed
if new_hash != current_hash:
new_display = self.get_lxmf_display_name(new_hash)
if new_display:
print(f" LXMF display name: {new_display}")
else:
print("Cancelled")
def save_contact_from_hash(self, hash_str, suggested_name=None):
"""Quick save a contact from hash with optional suggested name"""
clean_hash = hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
# Check if already in contacts
for name, data in self.contacts.items():
if data['hash'].lower() == clean_hash:
self._print_warning(f"Already in contacts as: {name}")
return
# Get display name
display_name = self.get_lxmf_display_name(clean_hash)
# Suggest name
if suggested_name:
default_name = suggested_name
elif display_name:
default_name = display_name
else:
default_name = clean_hash[:8]
print(f"\n{''*60}")
self._print_color("SAVE NEW CONTACT", Fore.GREEN + Style.BRIGHT)
print(f"{''*60}")
print(f"LXMF Address: {clean_hash}")
if display_name:
print(f"Display Name: {display_name}")
print(f"{''*60}\n")
name = input(f"Enter nickname [{default_name}]: ").strip()
if not name:
name = default_name
# Check if name exists
if name in self.contacts:
self._print_error(f"Contact '{name}' already exists!")
return
self.add_contact(name, clean_hash)
def list_contacts(self):
"""List all contacts"""
if not self.contacts:
print("\nNo contacts saved\n")
return
import shutil
try:
width = shutil.get_terminal_size().columns
except:
width = 80
sorted_contacts = sorted(self.contacts.items(), key=lambda x: x[1].get('index', 999999))
sep_width = min(width, 90)
print(f"\n{''*sep_width}")
self._print_color("CONTACTS", Fore.CYAN + Style.BRIGHT)
print(f"{''*sep_width}")
if width < 70:
# Mobile: Vertical layout
for name, data in sorted_contacts:
idx = data.get('index', '?')
hash_str = data['hash']
display_name = self.get_lxmf_display_name(hash_str)
print(f"\n[{idx}] {Fore.CYAN}{name} - {display_name}{Style.RESET_ALL}")
#if display_name:
# print(f" {display_name}")
print(f" {hash_str}")
else:
# Desktop: Clean table with separators
print(f"\n{'#':<5} {'Name':<20} {'Display Name':<30} {'Hash'}")
print(f"{''*5} {''*20} {''*30} {''*32}")
for name, data in sorted_contacts:
idx = data.get('index', '?')
hash_str = data['hash']
display_name = self.get_lxmf_display_name(hash_str)
name_shown = name[:18] + ".." if len(name) > 20 else name
if display_name:
display_shown = display_name[:28] + ".." if len(display_name) > 30 else display_name
print(f"{idx:<5} {name_shown:<20} {display_shown:<30} {hash_str}")
else:
print(f"{idx:<5} {name_shown:<20} {'<unknown>':<30} {hash_str}")
print(f"{''*sep_width}")
self._print_color("\n💡 Send: 's <#> <msg>'", Fore.YELLOW)
print()
def list_peers(self):
"""List all announced LXMF peers"""
with self.peers_lock:
peers_copy = dict(self.announced_peers)
if not peers_copy:
print("\nNo peers announced yet\n")
return
import shutil
try:
width = shutil.get_terminal_size().columns
except:
width = 80
sorted_peers = sorted(peers_copy.items(), key=lambda x: x[1]['index'])
sep_width = min(width, 90)
print(f"\n{''*sep_width}")
self._print_color("ANNOUNCED PEERS", Fore.CYAN + Style.BRIGHT)
print(f"{''*sep_width}")
if width < 70:
# Mobile: Vertical layout
for hash_str, peer_data in sorted_peers:
peer_index = peer_data['index']
display_name = peer_data['display_name']
last_seen = peer_data['last_seen']
time_diff = time.time() - last_seen
if time_diff < 60:
time_str = "now"
elif time_diff < 3600:
time_str = f"{int(time_diff/60)}m ago"
elif time_diff < 86400:
time_str = f"{int(time_diff/3600)}h ago"
else:
time_str = f"{int(time_diff/86400)}d ago"
is_contact = any(data['hash'].lower() == hash_str for data in self.contacts.values())
marker = "" if is_contact else ""
print(f"\n{marker}[{peer_index}] {Fore.CYAN}{display_name}{Style.RESET_ALL}{time_str}")
#print(f" {time_str}")
else:
# Desktop: Clean table with separators
print(f"\n{'#':<5} {'Display Name':<35} {'Hash':<32} {'Last Seen'}")
print(f"{''*5} {''*35} {''*32} {''*15}")
for hash_str, peer_data in sorted_peers:
peer_index = peer_data['index']
display_name = peer_data['display_name']
last_seen = peer_data['last_seen']
time_diff = time.time() - last_seen
if time_diff < 60:
time_str = "just now"
elif time_diff < 3600:
time_str = f"{int(time_diff/60)}m ago"
elif time_diff < 86400:
time_str = f"{int(time_diff/3600)}h ago"
else:
time_str = f"{int(time_diff/86400)}d ago"
is_contact = any(data['hash'].lower() == hash_str for data in self.contacts.values())
marker = "" if is_contact else " "
display_shown = display_name[:33] + ".." if len(display_name) > 35 else display_name
print(f"{marker}{peer_index:<4} {display_shown:<35} {hash_str:<32} {time_str}")
print(f"{''*sep_width}")
self._print_color("\n💡 sp <#> <msg> | ap <#> [name]", Fore.YELLOW)
print()
def send_message(self, recipient, content, title=None, fields=None):
"""Send a message with optimized processing
Args:
recipient: Contact name, index, or hash
content: Message content
title: Optional message title
fields: Optional dict of LXMF fields (e.g., {LXMF.FIELD_TELEMETRY: data})
"""
try:
send_start_time = time.time()
# Check if recipient is a number (contact index)
if recipient.isdigit():
contact_idx = int(recipient)
# Find contact by index
found_contact = None
for name, data in self.contacts.items():
if data.get('index') == contact_idx:
found_contact = data['hash']
print(f"Resolved contact #{contact_idx} to: {name}")
break
if found_contact:
dest_hash_str = found_contact
else:
self._print_error(f"No contact with index #{contact_idx}. Use 'contacts' to see the list")
return False
# Check if it's a contact name
elif recipient in self.contacts:
dest_hash_str = self.contacts[recipient]['hash']
# Otherwise treat as direct hash
else:
dest_hash_str = recipient
# Normalize hash
dest_hash_str = dest_hash_str.replace(":", "").replace(" ", "").replace("<", "").replace(">", "")
dest_hash_bytes = bytes.fromhex(dest_hash_str)
# Recall identity or request path
dest_identity = RNS.Identity.recall(dest_hash_bytes)
if dest_identity is None:
self._print_warning("Destination identity unknown, requesting...")
path_request_time = time.time()
RNS.Transport.request_path(dest_hash_bytes)
waited = 0
while waited < 3 and dest_identity is None:
time.sleep(0.5)
waited += 0.5
dest_identity = RNS.Identity.recall(dest_hash_bytes)
if dest_identity is None:
self._print_error("Could not get destination identity")
print(" Ask recipient to announce their address")
return False
else:
path_time = time.time() - path_request_time
self._print_success(f"Got identity after {path_time:.1f}s")
# Create destination
dest = RNS.Destination(
dest_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
"lxmf",
"delivery"
)
# Create LXMF message
message = LXMF.LXMessage(
destination=dest,
source=self.destination,
content=content,
title=title or "",
desired_method=LXMF.LXMessage.DIRECT
)
# Add custom LXMF fields if provided
if fields:
if not hasattr(message, 'fields'):
message.fields = {}
message.fields.update(fields)
# Add custom attribute for tracking
setattr(message, 'send_timestamp', send_start_time)
message.register_delivery_callback(self.on_delivery)
message.register_failed_callback(self.on_failed)
# Save to history
msg_data = {
'timestamp': time.time(),
'destination_hash': RNS.prettyhexrep(dest_hash_bytes),
'title': title,
'content': content,
'direction': 'outbound'
}
with self.messages_lock:
self.messages.append(msg_data)
self.save_message(msg_data)
# Build recipient display string - SHORT FORMAT (no hash)
dest_hash = msg_data['destination_hash']
recipient_display = self.format_contact_display_short(dest_hash)
self._print_color(f"📤 Sending to: {recipient_display}...", Fore.CYAN)
# DO NOT PRINT ANYTHING HERE - NO NEWLINE, NO PROMPT!
# Track pending
self.pending_messages[message.hash] = {
'message': message,
'start_time': send_start_time,
'recipient': recipient_display,
'last_progress': 0
}
# Progress monitor
def monitor_progress():
msg_hash = message.hash
last_status = ""
while msg_hash in self.pending_messages:
try:
elapsed = time.time() - self.pending_messages[msg_hash]['start_time']
progress = self.router.get_outbound_progress(msg_hash)
if progress is not None:
progress_pct = progress * 100
status = f"[{elapsed:.0f}s] Progress: {progress_pct:.1f}%"
if progress < 0.02:
phase = " - Waiting for path..."
elif progress < 0.04:
phase = " - Establishing link..."
elif progress < 0.10:
phase = " - Link established, preparing transfer..."
elif progress < 0.95:
phase = " - Transferring data..."
else:
phase = " - Waiting for confirmation..."
status += phase
if status != last_status:
print(f"\r {status}", end="", flush=True)
last_status = status
time.sleep(0.5)
except:
break
if last_status:
print("\r" + " " * (len(last_status) + 4), end="\r", flush=True)
# DO NOT PRINT PROMPT HERE EITHER!
threading.Thread(target=monitor_progress, daemon=True).start()
# Send message
self.router.handle_outbound(message)
# DO NOT PRINT ANYTHING AFTER THIS!
return True
except ValueError:
self._print_error("Invalid destination hash or contact index")
return False
except Exception as e:
self._print_error(f"Error sending message: {e}")
traceback.print_exc()
return False
def on_delivery(self, message):
"""Callback for successful delivery"""
dest_hash = RNS.prettyhexrep(message.destination_hash)
recipient_str = self.format_contact_display_short(dest_hash)
if message.hash in self.pending_messages:
del self.pending_messages[message.hash]
if hasattr(message, 'send_timestamp'):
delivery_time = time.time() - message.send_timestamp
if delivery_time < 60:
time_str = f"{delivery_time:.1f}s"
else:
minutes = int(delivery_time // 60)
seconds = int(delivery_time % 60)
time_str = f"{minutes}m {seconds}s"
else:
time_str = "?"
print()
print(f"✅ Delivered to {recipient_str} ({time_str})")
def on_failed(self, message):
"""Callback for failed delivery"""
dest_hash = RNS.prettyhexrep(message.destination_hash)
recipient_str = self.format_contact_display_short(dest_hash)
if message.hash in self.pending_messages:
del self.pending_messages[message.hash]
if hasattr(message, 'send_timestamp'):
fail_time = time.time() - message.send_timestamp
if fail_time < 60:
time_str = f"{fail_time:.1f}s"
else:
minutes = int(fail_time // 60)
seconds = int(fail_time % 60)
time_str = f"{minutes}m {seconds}s"
else:
time_str = "?"
print()
print(f"❌ Failed to {recipient_str} (after {time_str})")
def show_stats(self):
"""Show messaging statistics"""
with self.messages_lock:
messages_copy = self.messages.copy()
if not messages_copy:
print("\nNo messages yet\n")
return
import shutil
try:
width = min(shutil.get_terminal_size().columns, 80)
is_mobile = width < 70
except:
width = 80
is_mobile = False
# Calculate overall stats
total_sent = sum(1 for msg in messages_copy if msg['direction'] == 'outbound')
total_received = sum(1 for msg in messages_copy if msg['direction'] == 'inbound')
total_messages = len(messages_copy)
# Calculate per-user stats
user_stats = {}
for msg in messages_copy:
if msg['direction'] == 'outbound':
hash_key = msg.get('destination_hash', 'unknown')
else:
hash_key = msg.get('source_hash', 'unknown')
if hash_key not in user_stats:
user_stats[hash_key] = {'sent': 0, 'received': 0, 'total': 0}
if msg['direction'] == 'outbound':
user_stats[hash_key]['sent'] += 1
else:
user_stats[hash_key]['received'] += 1
user_stats[hash_key]['total'] += 1
# Display overall stats
print(f"\n{''*width}")
self._print_color("MESSAGING STATISTICS", Fore.CYAN + Style.BRIGHT)
print(f"{''*width}")
print(f"\n{Fore.GREEN}Overall Stats:{Style.RESET_ALL}")
print(f" Total Messages: {total_messages}")
print(f" Sent: {total_sent}")
print(f" Received: {total_received}")
print(f" Unique Contacts: {len(user_stats)}")
# Display per-user stats
print(f"\n{Fore.CYAN}Per-User Statistics:{Style.RESET_ALL}")
print(f"{''*width}")
# Sort by total messages descending
sorted_users = sorted(user_stats.items(), key=lambda x: x[1]['total'], reverse=True)
if is_mobile:
# Mobile layout - vertical
for hash_str, stats in sorted_users:
contact_display = self.format_contact_display_short(hash_str)
print(f"\n{contact_display}")
print(f"{stats['sent']}{stats['received']} (Total: {stats['total']})")
else:
# Desktop layout - table
print(f"{'Contact':<35} {'Sent':<8} {'Received':<10} {'Total':<10}")
print(f"{''*35} {''*8} {''*10} {''*10}")
for hash_str, stats in sorted_users:
contact_display = self.format_contact_display_short(hash_str)
# Truncate if too long
if len(contact_display) > 33:
contact_display = contact_display[:30] + "..."
print(f"{contact_display:<35} {stats['sent']:<8} {stats['received']:<10} {stats['total']:<10}")
print(f"{''*width}\n")
def show_status(self):
"""Show current status and connection info"""
try:
width = min(shutil.get_terminal_size().columns, 80)
except:
width = 80
print(f"\n{''*width}")
self._print_color("SYSTEM STATUS", Fore.CYAN + Style.BRIGHT)
print(f"{''*width}")
# Identity info
print(f"\n{Fore.GREEN}Identity:{Style.RESET_ALL}")
print(f" Display Name: {self.display_name}")
if hasattr(self.destination, 'hash'):
print(f" LXMF Address: {RNS.prettyhexrep(self.destination.hash)}") # type: ignore
# Network info
print(f"\n{Fore.CYAN}Network:{Style.RESET_ALL}")
if self.auto_announce_enabled:
print(f" Auto-announce: ENABLED (every {self.announce_interval}s)")
else:
print(f" Auto-announce: DISABLED")
print(f" Discovery alerts: {'ON' if self.show_announces else 'OFF'}")
# Add thread status
if hasattr(self, 'announce_thread'):
thread_alive = self.announce_thread.is_alive()
print(f" Announce thread: {'RUNNING' if thread_alive else 'STOPPED'}")
# Security settings
print(f"\n{Fore.RED}Security:{Style.RESET_ALL}")
if self.stamp_cost_enabled and self.stamp_cost > 0:
print(f" Stamp Cost: {Fore.GREEN}ENABLED{Style.RESET_ALL}")
print(f" Required Proof: {Fore.YELLOW}{self.stamp_cost} bits{Style.RESET_ALL}")
print(f" Ignore Invalid: {Fore.GREEN}{'YES' if self.ignore_invalid_stamps else 'NO'}{Style.RESET_ALL}")
else:
print(f" Stamp Cost: {Fore.RED}DISABLED{Style.RESET_ALL}")
if self.blacklist:
print(f" Blacklist: {Fore.YELLOW}{len(self.blacklist)} blocked{Style.RESET_ALL}")
else:
print(f" Blacklist: {Fore.GREEN}Empty{Style.RESET_ALL}")
# Notification settings
print(f"\n{Fore.MAGENTA}Notifications:{Style.RESET_ALL}")
print(f" Sound: {'ON' if self.notify_sound else 'OFF'}")
print(f" Terminal Bell: {'ON' if self.notify_bell else 'OFF'}")
print(f" Visual Flash: {'ON' if self.notify_visual else 'OFF'}")
# Statistics
with self.messages_lock:
total_messages = len(self.messages)
sent = sum(1 for m in self.messages if m['direction'] == 'outbound')
received = sum(1 for m in self.messages if m['direction'] == 'inbound')
with self.peers_lock:
peer_count = len(self.announced_peers)
contact_count = len(self.contacts)
print(f"\n{Fore.YELLOW}Statistics:{Style.RESET_ALL}")
print(f" Contacts: {contact_count}")
print(f" Announced peers: {peer_count}")
print(f" Total messages: {total_messages} (↑{sent}{received})")
# Plugins
if self.plugins:
plugin_count = len(self.plugins)
enabled_count = sum(1 for name in self.plugins.keys() if self.plugins_enabled.get(name, True))
print(f" Plugins: {enabled_count}/{plugin_count} enabled")
# System info
print(f"\n{Fore.WHITE}System:{Style.RESET_ALL}")
uptime = time.time() - self.start_time
hours = int(uptime // 3600)
minutes = int((uptime % 3600) // 60)
print(f" Uptime: {hours}h {minutes}m")
if self.suppressed_errors > 0:
print(f" Suppressed errors: {self.suppressed_errors}")
print(f"{''*width}\n")
def show_messages(self, limit=10, filter_hash=None):
"""Show recent messages, optionally filtered by user hash"""
with self.messages_lock:
messages_copy = self.messages.copy()
if not messages_copy:
print("\nNo messages yet\n")
return
# Get responsive width
try:
width = min(shutil.get_terminal_size().columns, 80)
except:
width = 80
# Filter by hash if provided
if filter_hash:
clean_filter = filter_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
filtered_messages = []
for msg in messages_copy:
msg_hash = ""
if msg['direction'] == 'outbound':
msg_hash = msg.get('destination_hash', '')
else:
msg_hash = msg.get('source_hash', '')
clean_msg_hash = msg_hash.replace(":", "").replace(" ", "").replace("<", "").replace(">", "").lower()
if clean_msg_hash == clean_filter:
filtered_messages.append(msg)
messages_copy = filtered_messages
if not messages_copy:
contact_display = self.format_contact_display_short(filter_hash)
print(f"\nNo messages with {contact_display}\n")
return
# Show messages
if filter_hash:
contact_display = self.format_contact_display_short(filter_hash)
print(f"\n{''*width}")
# Truncate contact name if too long for header
if len(contact_display) > width - 10:
contact_display = contact_display[:width-13] + "..."
self._print_color(f"CHAT: {contact_display.upper()}", Fore.CYAN + Style.BRIGHT)
print(f"{''*width}")
else:
print(f"\n{''*width}")
self._print_color(f"RECENT MESSAGES ({min(limit, len(messages_copy))})", Fore.CYAN + Style.BRIGHT)
print(f"{''*width}")
# Display messages
display_messages = messages_copy[-limit:] if not filter_hash else messages_copy
for idx, msg in enumerate(display_messages, 1):
try:
ts = datetime.fromtimestamp(msg['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
direction = "" if msg['direction'] == 'outbound' else ""
if msg['direction'] == 'outbound':
contact = self.format_contact_display_short(msg['destination_hash'])
else:
contact = self.format_contact_display_short(msg['source_hash'])
if filter_hash:
# In conversation view, show full messages
print(f"\n[{idx}] {ts} {direction}")
if msg.get('title'):
print(f"Title: {msg['title']}")
print(f"\n{msg.get('content', '')}\n")
print(f"{''*width}")
else:
# In list view, show preview
print(f"\n[{idx}] {ts} {direction} {contact}")
if msg.get('title'):
print(f" Title: {msg['title']}")
content = str(msg.get('content', ''))
if len(content) > 100:
print(f" {content[:100]}...")
else:
print(f" {content}")
except Exception as e:
print(f"\n[{idx}] [Error displaying message: {e}]")
print(f"\n{''*width}")
if filter_hash:
self.last_sender_hash = filter_hash
self.last_sender_name = self.get_contact_name_by_hash(filter_hash)
self._print_color(f"\n💡 Reply: 're <msg>'", Fore.GREEN)
else:
self._print_color("💡 Tip: 'm list' or 'm user <#>'", Fore.YELLOW)
print()
def show_message_list_with_users(self):
"""Show a list of users you've messaged with, indexed for selection"""
with self.messages_lock:
messages_copy = self.messages.copy()
if not messages_copy:
print("\nNo messages yet\n")
return
import shutil
try:
width = min(shutil.get_terminal_size().columns, 90)
is_mobile = width < 70
except:
width = 90
is_mobile = False
# Build user list with message counts
user_data = {}
for msg in messages_copy:
if msg['direction'] == 'outbound':
hash_key = msg.get('destination_hash', 'unknown')
else:
hash_key = msg.get('source_hash', 'unknown')
if hash_key == 'unknown':
continue
# Normalize hash
clean_hash = hash_key.replace(":", "").replace(" ", "").lower()
if clean_hash not in user_data:
# Assign fixed index if new conversation
conv_index = self.assign_conversation_index(clean_hash)
user_data[clean_hash] = {
'sent': 0,
'received': 0,
'last_message_time': 0,
'index': conv_index,
'display_hash': hash_key
}
if msg['direction'] == 'outbound':
user_data[clean_hash]['sent'] += 1
else:
user_data[clean_hash]['received'] += 1
if msg['timestamp'] > user_data[clean_hash]['last_message_time']:
user_data[clean_hash]['last_message_time'] = msg['timestamp']
# Sort by fixed index
sorted_users = sorted(user_data.items(), key=lambda x: x[1]['index'])
print(f"\n{''*width}")
self._print_color("MESSAGE CONVERSATIONS", Fore.CYAN + Style.BRIGHT)
print(f"{''*width}")
if is_mobile:
# Mobile layout - vertical list
for clean_hash, data in sorted_users:
conv_index = data['index']
hash_str = data['display_hash']
contact_display = self.format_contact_display_short(hash_str)
time_diff = time.time() - data['last_message_time']
if time_diff < 60:
time_str = "now"
elif time_diff < 3600:
time_str = f"{int(time_diff/60)}m ago"
elif time_diff < 86400:
time_str = f"{int(time_diff/3600)}h ago"
else:
time_str = f"{int(time_diff/86400)}d ago"
print(f"\n[{conv_index}] {Fore.CYAN}{contact_display}{Style.RESET_ALL}")
print(f"{data['sent']}{data['received']}{time_str}")
else:
# Desktop layout - table
print(f"\n{'#':<5} {'Contact':<35} {'Sent':<6} {'Recv':<6} {'Last':<12}")
print(f"{''*5} {''*35} {''*6} {''*6} {''*12}")
for clean_hash, data in sorted_users:
conv_index = data['index']
hash_str = data['display_hash']
contact_display = self.format_contact_display_short(hash_str)
# Truncate contact name if too long
if len(contact_display) > 33:
contact_display = contact_display[:30] + "..."
time_diff = time.time() - data['last_message_time']
if time_diff < 60:
time_str = "just now"
elif time_diff < 3600:
time_str = f"{int(time_diff/60)}m ago"
elif time_diff < 86400:
time_str = f"{int(time_diff/3600)}h ago"
else:
time_str = f"{int(time_diff/86400)}d ago"
print(f"{conv_index:<5} {contact_display:<35} {data['sent']:<6} {data['received']:<6} {time_str:<12}")
print(f"{''*width}")
self._print_color("\n💡 Commands:", Fore.YELLOW)
print(f" m user <#> - View conversation")
print(f" m [count] - Recent messages")
print()
return sorted_users
def show_help(self, category=None):
"""Show help with optional category filtering"""
if category == 'messaging':
self._show_messaging_help()
elif category == 'contacts':
self._show_contacts_help()
elif category == 'settings':
self._show_settings_help()
elif category == 'system':
self._show_system_help()
else:
self._show_main_help()
def _show_messaging_help(self):
"""Show messaging help"""
self._print_color("Messaging commands: send, reply, messages", Fore.CYAN)
def _show_contacts_help(self):
"""Show contacts help"""
self._print_color("Contact commands: contacts, add, remove, peers", Fore.CYAN)
def _show_settings_help(self):
"""Show settings help"""
self._print_color("Settings commands: settings, name, interval", Fore.CYAN)
def _show_system_help(self):
"""Show system help"""
self._print_color("System commands: status, restart, clear, help, quit", Fore.CYAN)
def _show_main_help(self):
"""Show main help menu with categories"""
try:
width = shutil.get_terminal_size().columns
is_mobile = width < 70
except:
width = 80
is_mobile = False
if COLOR_ENABLED:
if is_mobile:
# === MOBILE LAYOUT ===
print(f"\n{Fore.WHITE}{''*width}")
print(f"LXMF CLIENT COMMANDS".center(width))
print(f"{''*width}{Style.RESET_ALL}\n")
# Messaging
self._print_color("📨 MESSAGING", Fore.CYAN + Style.BRIGHT)
print(f"{''*width}")
commands = [
("send <#> <msg>", "s"),
("reply <msg>", "re"),
("messages [n]", "m"),
("messages list", ""),
("messages user <#>", ""),
]
for cmd, alias in commands:
if alias:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}")
else:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}")
# Contacts & Peers
print(f"\n{Fore.GREEN}👥 CONTACTS & PEERS{Style.RESET_ALL}")
print(f"{''*width}")
commands = [
("contacts", "c"),
("add <name> <hash>", "a"),
("edit <name/#>", "e"),
("remove <name>", "rm"),
("savecontact [hash]", "save"),
("peers", "p"),
("sendpeer <#> <msg>", "sp"),
("addpeer <#> [name]", "ap"),
]
for cmd, alias in commands:
if alias:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}")
else:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}")
# Info & Stats
print(f"\n{Fore.MAGENTA}📊 INFO & STATS{Style.RESET_ALL}")
print(f"{''*width}")
commands = [
("stats", "st"),
("status", ""),
]
for cmd, alias in commands:
if alias:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}")
else:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}")
# Network
print(f"\n{Fore.BLUE}🌐 NETWORK{Style.RESET_ALL}")
print(f"{''*width}")
commands = [
("address", "addr"),
("announce", "ann"),
]
for cmd, alias in commands:
if alias:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}")
else:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}")
# Settings
print(f"\n{Fore.YELLOW}⚙️ SETTINGS{Style.RESET_ALL}")
print(f"{''*width}")
commands = [
("settings", "set"),
("name <name>", "n"),
("interval <sec>", "i"),
]
for cmd, alias in commands:
if alias:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}")
else:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}")
# Mobile security
print(f"\n{Fore.RED}🛡️ SECURITY{Style.RESET_ALL}")
print(f"{''*width}")
commands = [
("blacklist [list]", "bl"),
("block <#/name>", ""),
("unblock <#/name>", ""),
]
for cmd, alias in commands:
if alias:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}")
else:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}")
# System
print(f"\n{Fore.RED}🖥️ SYSTEM{Style.RESET_ALL}")
print(f"{''*width}")
commands = [
("plugin [list]", ""),
("clear", "cls"),
("restart", "r"),
("help", "h"),
("quit", "q"),
]
for cmd, alias in commands:
if alias:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL} {Fore.YELLOW}({alias}){Style.RESET_ALL}")
else:
print(f"{Fore.CYAN}{cmd}{Style.RESET_ALL}")
else:
# === DESKTOP LAYOUT (clean separator lines) ===
print(f"\n{Fore.WHITE}{''*70}")
print(f"LXMF CLIENT COMMANDS".center(70))
print(f"{''*70}{Style.RESET_ALL}\n")
# Messaging commands
self._print_color("📨 MESSAGING", Fore.CYAN + Style.BRIGHT)
print(f"{''*70}")
commands = [
("send <#> <msg>", "s", "Send message"),
("reply <msg>", "re", "Reply to last"),
("messages [n]", "m", "Recent messages"),
("messages list", "", "All conversations"),
("messages user <#>", "", "View conversation"),
]
for long_cmd, short_cmd, description in commands:
if short_cmd:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}")
else:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}")
# Contacts & Peers
print(f"\n{Fore.GREEN}👥 CONTACTS & PEERS{Style.RESET_ALL}")
print(f"{''*70}")
commands = [
("contacts", "c", "List contacts"),
("add <name> <hash>", "a", "Add contact"),
("edit <name/#>", "e", "Edit contact"),
("remove <name>", "rm", "Remove contact"),
("savecontact [hash]", "save", "Quick save contact"),
("peers", "p", "List peers"),
("sendpeer <#> <msg>", "sp", "Send to peer"),
("addpeer <#> [name]", "ap", "Add to contacts"),
]
for long_cmd, short_cmd, description in commands:
if short_cmd:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}")
else:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}")
# Info & Stats
print(f"\n{Fore.MAGENTA}📊 INFO & STATS{Style.RESET_ALL}")
print(f"{''*70}")
commands = [
("stats", "st", "Messaging stats"),
("status", "", "System status"),
]
for long_cmd, short_cmd, description in commands:
if short_cmd:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}")
else:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}")
# Network
print(f"\n{Fore.BLUE}🌐 NETWORK{Style.RESET_ALL}")
print(f"{''*70}")
commands = [
("address", "addr", "Your LXMF address info"),
("announce", "ann", "Announce manually now!"),
]
for long_cmd, short_cmd, description in commands:
if short_cmd:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}")
else:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}")
# Settings
print(f"\n{Fore.YELLOW}⚙️ SETTINGS{Style.RESET_ALL}")
print(f"{''*70}")
commands = [
("settings", "set", "Settings menu"),
("name <name>", "n", "Change name"),
("interval <sec>", "i", "Announce interval"),
]
for long_cmd, short_cmd, description in commands:
if short_cmd:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}")
else:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}")
# Security section in help menu (desktop version)
print(f"\n{Fore.RED}🛡️ SECURITY{Style.RESET_ALL}")
print(f"{''*70}")
commands = [
("blacklist [list]", "bl", "Manage blacklist"),
("block <#/name>", "", "Block contact"),
("unblock <#/name>", "", "Unblock contact"),
]
for long_cmd, short_cmd, description in commands:
if short_cmd:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}")
else:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}")
# System
print(f"\n{Fore.RED}🖥️ SYSTEM{Style.RESET_ALL}")
print(f"{''*70}")
commands = [
("plugin [list]", "", "Manage plugins"),
("clear", "cls", "Clear screen"),
("restart", "r", "Restart client"),
("help", "h", "Show help"),
("quit", "q", "Exit"),
]
for long_cmd, short_cmd, description in commands:
if short_cmd:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {Fore.YELLOW}({short_cmd:<4}){Style.RESET_ALL} {description}")
else:
print(f"{Fore.CYAN}{long_cmd:<20}{Style.RESET_ALL} {description}")
print(f"\n{Fore.YELLOW}💡 Type 'settings' for options{Style.RESET_ALL}")
print(f"{''*70}\n")
else:
# No color fallback
print("\nLXMF CLI - Commands available")
print("Type 'help' for full list\n")
def show_settings_menu(self):
"""Show interactive settings menu"""
while True:
# Get responsive width
try:
width = min(shutil.get_terminal_size().columns, 70)
except:
width = 70
print(f"\n{''*width}")
self._print_color("SETTINGS MENU", Fore.YELLOW + Style.BRIGHT)
print(f"{''*width}")
print(f"\n{Fore.CYAN}General Settings:{Style.RESET_ALL}")
print(f" [1] Display Name: {Fore.GREEN}{self.display_name}{Style.RESET_ALL}")
print(f" [2] Auto-Announce: {Fore.GREEN}{'ON' if self.auto_announce_enabled else 'OFF'}{Style.RESET_ALL}") # ADD THIS
print(f" [3] Announce Interval: {Fore.GREEN}{self.announce_interval}s{Style.RESET_ALL}")
print(f" [4] Discovery Alerts: {Fore.GREEN}{'ON' if self.show_announces else 'OFF'}{Style.RESET_ALL}")
print(f"\n{Fore.MAGENTA}Notification Settings:{Style.RESET_ALL}")
print(f" [5] Sound (beeps/melody): {Fore.GREEN}{'ON' if self.notify_sound else 'OFF'}{Style.RESET_ALL}")
print(f" [6] Terminal Bell: {Fore.GREEN}{'ON' if self.notify_bell else 'OFF'}{Style.RESET_ALL}")
print(f" [7] Visual Flash: {Fore.GREEN}{'ON' if self.notify_visual else 'OFF'}{Style.RESET_ALL}")
print(f"\n{Fore.RED}Security Settings:{Style.RESET_ALL}")
print(f" [8] Stamp Cost: {Fore.GREEN}{'ON' if self.stamp_cost_enabled else 'OFF'}{Style.RESET_ALL}")
if self.stamp_cost_enabled:
print(f" Amount: {Fore.YELLOW}{self.stamp_cost} bits{Style.RESET_ALL}")
print(f" [9] Ignore Invalid Stamps: {Fore.GREEN}{'ON' if self.ignore_invalid_stamps else 'OFF'}{Style.RESET_ALL}")
print(f"\n{Fore.YELLOW}Options:{Style.RESET_ALL}")
print(" [1-9] - Change setting")
print(" [t] - Test notification")
print(" [b] - Back to main menu")
print(" [s] - Save and exit")
print(f"{''*width}")
choice = input("\nSelect option: ").strip().lower()
if choice == '1':
new_name = input(f"\nEnter new display name [{self.display_name}]: ").strip()
if new_name:
self.display_name = new_name
if hasattr(self.destination, 'display_name'):
setattr(self.destination, 'display_name', self.display_name) # type: ignore
self.save_config()
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success(f"Display name changed to: {self.display_name}")
self._print_success("Announced to network")
else:
print("Cancelled")
elif choice == '2':
# Toggle auto-announce
self.auto_announce_enabled = not self.auto_announce_enabled
self.save_config()
# Restart announce thread to apply changes
if self.auto_announce_enabled:
self.stop_event.set()
time.sleep(0.2)
self.stop_event.clear()
# Restart the thread if needed
if not self.announce_thread.is_alive():
self.announce_thread = threading.Thread(target=self.announce_loop, daemon=True)
self.announce_thread.start()
self._print_success("Auto-announce enabled and thread restarted")
else:
self._print_success("Auto-announce enabled")
else:
self._print_success("Auto-announce disabled")
print(" You can still announce manually using 'announce' command")
elif choice == '3':
current = self.announce_interval
interval_str = input(f"\nEnter announce interval in seconds [{current}]: ").strip()
if interval_str:
try:
new_interval = int(interval_str)
if new_interval < 30:
self._print_warning("Minimum interval is 30 seconds, setting to 30")
new_interval = 30
self.announce_interval = new_interval
self.save_config()
# Restart announce thread properly
self.stop_event.set() # Signal thread to stop
if hasattr(self, 'announce_thread'):
self.announce_thread.join(timeout=2) # Wait for it to stop
self.stop_event.clear() # Clear the stop signal
# Start new thread with new interval
self.announce_thread = threading.Thread(target=self.announce_loop, daemon=True)
self.announce_thread.start()
self._print_success(f"Announce interval changed to: {self.announce_interval}s")
self._print_success("Announce thread restarted")
except ValueError:
self._print_error("Invalid number")
else:
print("Cancelled")
elif choice == '4':
# Toggle discovery alerts
self.show_announces = not self.show_announces
self.save_config()
status = "enabled" if self.show_announces else "disabled"
self._print_success(f"Discovery alerts {status}")
elif choice == '5':
# Toggle sound notifications
self.notify_sound = not self.notify_sound
self.save_config()
status = "enabled" if self.notify_sound else "disabled"
self._print_success(f"Sound notifications {status}")
elif choice == '6':
# Toggle terminal bell
self.notify_bell = not self.notify_bell
self.save_config()
status = "enabled" if self.notify_bell else "disabled"
self._print_success(f"Terminal bell {status}")
elif choice == '7':
# Toggle visual flash
self.notify_visual = not self.notify_visual
self.save_config()
status = "enabled" if self.notify_visual else "disabled"
self._print_success(f"Visual flash {status}")
elif choice == '8':
# Toggle stamp cost (existing code stays the same)
if not self.stamp_cost_enabled:
# ... existing stamp cost enable code ...
pass
else:
# ... existing stamp cost disable code ...
pass
elif choice == '9':
# Toggle ignore invalid stamps
self.ignore_invalid_stamps = not self.ignore_invalid_stamps
self.save_config()
status = "enabled" if self.ignore_invalid_stamps else "disabled"
self._print_success(f"Ignore invalid stamps {status}")
if self.ignore_invalid_stamps:
print(" Messages with insufficient/invalid stamps will be rejected")
else:
print(" Messages with insufficient/invalid stamps will be accepted")
elif choice == 't':
# Test notification
print("\nTesting notification...")
self.notify_new_message()
time.sleep(1)
self._print_success("Test complete!")
elif choice in ['b', 'back']:
break
elif choice in ['s', 'save']:
self.save_config()
self._print_success("Settings saved")
break
else:
self._print_error("Invalid option")
def resolve_command(self, cmd):
"""Resolve command aliases to full commands"""
return self.command_aliases.get(cmd, cmd)
def send_to_peer(self, peer_index, content, title=None):
"""Send message to a peer by index number"""
with self.peers_lock:
peers_copy = dict(self.announced_peers)
try:
idx = int(peer_index)
# Find peer by index
target_peer = None
for hash_str, peer_data in peers_copy.items():
if peer_data['index'] == idx:
target_peer = (hash_str, peer_data)
break
if not target_peer:
self._print_error(f"Invalid peer number #{idx}. Use 'peers' to see the list")
return False
hash_str, peer_data = target_peer
display_name = peer_data['display_name']
print(f"Sending to peer #{idx}: {display_name}")
return self.send_message(hash_str, content, title)
except ValueError:
self._print_error("Peer number must be a valid number")
return False
def add_peer_to_contacts(self, peer_index, custom_name=None):
"""Add an announced peer to contacts"""
with self.peers_lock:
peers_copy = dict(self.announced_peers)
try:
idx = int(peer_index)
# Find peer by index
target_peer = None
for hash_str, peer_data in peers_copy.items():
if peer_data['index'] == idx:
target_peer = (hash_str, peer_data)
break
if not target_peer:
self._print_error(f"Invalid peer number #{idx}. Use 'peers' to see the list")
return
hash_str, peer_data = target_peer
display_name = peer_data['display_name']
# Check if already in contacts
for contact_name, contact_data in self.contacts.items():
if contact_data['hash'].lower() == hash_str:
self._print_warning(f"Already in contacts as: {contact_name}")
return
# Use custom name if provided, otherwise use display name
contact_name = custom_name if custom_name else display_name
self.add_contact(contact_name, hash_str)
except ValueError:
self._print_error("Peer number must be a valid number")
def notify_new_message(self):
"""Visual and audio notification for new message - respects user settings"""
# === SOUND NOTIFICATION ===
if self.notify_sound or self.notify_bell:
system = platform.system()
is_termux = os.path.exists('/data/data/com.termux')
# Check for custom sound file
sound_file = None
sound_dir = os.path.join(self.storage_path, "sounds")
if os.path.exists(sound_dir):
# Look for notification sound files (in order of preference)
for filename in ['notification.wav', 'notification.mp3', 'notification.ogg', 'message.wav', 'beep.wav']:
filepath = os.path.join(sound_dir, filename)
if os.path.exists(filepath):
sound_file = filepath
break
try:
if is_termux:
# === TERMUX/ANDROID ===
if self.notify_sound:
try:
# Try to play custom sound first
if sound_file:
os.system(f'termux-media-player play "{sound_file}" >/dev/null 2>&1 &')
time.sleep(0.5)
else:
# Vibration pattern fallback
os.system('termux-vibrate -d 80 2>/dev/null &')
time.sleep(0.09)
os.system('termux-vibrate -d 80 2>/dev/null &')
time.sleep(0.09)
os.system('termux-vibrate -d 80 2>/dev/null &')
time.sleep(0.09)
os.system('termux-vibrate -d 150 2>/dev/null &')
time.sleep(0.16)
os.system('termux-vibrate -d 100 2>/dev/null &')
# System notification
os.system('termux-notification --title "📨 LXMF Message" --content "New message received" --sound 2>/dev/null &')
except:
pass
# Terminal bells
if self.notify_bell:
for _ in range(3):
print("\a", end="", flush=True)
time.sleep(0.1)
elif system == 'Windows':
# === WINDOWS ===
if self.notify_sound:
sound_played = False
# Try custom sound file first
if sound_file:
try:
import winsound # type: ignore
winsound.PlaySound(sound_file, winsound.SND_FILENAME | winsound.SND_ASYNC) # type: ignore
sound_played = True
except Exception as e:
pass
# Fallback to beep melody
if not sound_played:
try:
import winsound # type: ignore
melody = [
(523, 80), # C5
(659, 80), # E5
(784, 80), # G5
(1047, 150), # C6
(784, 100), # G5
]
for freq, duration in melody:
winsound.Beep(freq, duration) # type: ignore
time.sleep(0.01)
except Exception:
if self.notify_bell:
for _ in range(3):
print("\a", end="", flush=True)
time.sleep(0.1)
elif self.notify_bell:
for _ in range(3):
print("\a", end="", flush=True)
time.sleep(0.1)
elif system == 'Darwin':
# === MACOS ===
sound_played = False
if self.notify_sound:
# Try custom sound file first
if sound_file:
try:
subprocess.Popen(["afplay", sound_file],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
sound_played = True
except Exception:
pass
# Fallback to system sounds
if not sound_played:
try:
sound_candidates = [
"/System/Library/Sounds/Ping.aiff",
"/System/Library/Sounds/Glass.aiff",
"/System/Library/Sounds/Submarine.aiff"
]
sound_path = next((p for p in sound_candidates if os.path.exists(p)), None)
if sound_path:
subprocess.Popen(["afplay", sound_path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
else:
subprocess.run(["osascript", "-e", "beep"], check=False)
except Exception:
if self.notify_bell:
print("\a", end="", flush=True)
if self.notify_bell:
for _ in range(2):
print("\a", end="", flush=True)
time.sleep(0.12)
elif system == 'Linux':
# === LINUX ===
sound_played = False
if self.notify_sound:
# Try custom sound file first
if sound_file:
# Try multiple players in order of preference
players = [
f'paplay "{sound_file}"',
f'aplay "{sound_file}"',
f'mpg123 -q "{sound_file}"',
f'ffplay -nodisp -autoexit -hide_banner -loglevel quiet "{sound_file}"'
]
for player_cmd in players:
try:
result = os.system(f'{player_cmd} 2>/dev/null &')
if result == 0:
sound_played = True
break
except:
continue
# Fallback to system sounds
if not sound_played:
try:
result = os.system('paplay /usr/share/sounds/freedesktop/stereo/message-new-instant.oga 2>/dev/null &')
if result != 0:
os.system('beep -f 523 -l 80 -n -f 659 -l 80 -n -f 784 -l 80 -n -f 1047 -l 150 -n -f 784 -l 100 2>/dev/null &')
except:
pass
if self.notify_bell:
for _ in range(2):
print("\a", end="", flush=True)
time.sleep(0.15)
else:
# === OTHER/UNKNOWN SYSTEMS ===
if self.notify_bell:
for _ in range(3):
print("\a", end="", flush=True)
time.sleep(0.1)
except Exception as e:
# Ultimate fallback to bell if available
if self.notify_bell:
try:
for _ in range(3):
print("\a", end="", flush=True)
time.sleep(0.1)
except:
pass
# === VISUAL NOTIFICATION ===
if self.notify_visual:
try:
terminal_width = shutil.get_terminal_size().columns
except:
terminal_width = 80
is_termux = os.path.exists('/data/data/com.termux')
# Determine message
if is_termux:
msg = " 📱 NEW MESSAGE! "
else:
msg = " 📬 NEW MESSAGE! "
# Calculate centered position
msg_width = min(60, terminal_width)
padding = (terminal_width - msg_width) // 2
# Quick flash sequence (3 flashes) - PLAIN TEXT
for _ in range(3):
# Flash on
line = " " * padding + "" * msg_width
print(f"\r{line}", end="", flush=True)
time.sleep(0.08)
# Flash off (clear)
print(f"\r{' ' * terminal_width}", end="\r", flush=True)
time.sleep(0.08)
# Final message display (brief) - PLAIN TEXT
centered_msg = msg.center(msg_width, '')
line = " " * padding + centered_msg
print(f"\r{line}", end="", flush=True)
time.sleep(0.2)
# Clear completely
print(f"\r{' ' * terminal_width}", end="\r", flush=True)
def shutdown(self):
"""Clean shutdown"""
print("\nShutting down...")
self.stop_event.set()
# Force save any pending cache updates
try:
if self.cache_dirty:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.display_name_cache, f, indent=2, ensure_ascii=False)
except:
pass
if hasattr(self, 'announce_thread'):
self.announce_thread.join(timeout=2)
if hasattr(self, 'router_thread'):
self.router_thread.join(timeout=2)
time.sleep(0.5)
def clear_screen(self):
"""Clear the terminal screen"""
import os
# Windows
if os.name == 'nt':
os.system('cls')
# Unix/Linux/Mac
else:
os.system('clear')
# Get responsive width
try:
width = shutil.get_terminal_size().columns
except:
width = 60
# Banner with proper centering
banner_lines = [
"██╗ ██╗ ██╗███╗ ███╗███████╗",
"██║ ╚██╗██╔╝████╗ ████║██╔════╝",
"██║ ╚███╔╝ ██╔████╔██║█████╗ ",
"██║ ██╔██╗ ██║╚██╔╝██║██╔══╝ ",
"███████╗██╔╝ ██╗██║ ╚═╝ ██║██║ ",
"╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ",
"",
"Interactive LXMF Client"
]
sep_width = min(width, 60)
print("\n" + "" * sep_width)
if COLOR_ENABLED:
for line in banner_lines:
# Center each line
centered = line.center(sep_width)
print(f"{Fore.WHITE}{Style.BRIGHT}{centered}{Style.RESET_ALL}")
else:
for line in banner_lines:
centered = line.center(sep_width)
print(centered)
print("" * sep_width + "\n")
def restart_client(self):
"""Restart the client"""
print("\n" + "" * 60)
self._print_color("Restarting LXMF Client...", Fore.YELLOW + Style.BRIGHT)
print("" * 60 + "\n")
# Shutdown current instance
self.shutdown()
# Restart the Python script
import sys
import os
python = sys.executable
os.execl(python, python, *sys.argv)
def show_progress_spinner(self, message, duration=2):
"""Show a spinner for background operations"""
spinner = itertools.cycle(['', '', '', '', '', '', '', '', '', ''])
end_time = time.time() + duration
while time.time() < end_time:
if COLOR_ENABLED:
print(f"\r{Fore.CYAN}{next(spinner)}{Style.RESET_ALL} {message}...", end="", flush=True)
else:
print(f"\r{next(spinner)} {message}...", end="", flush=True)
time.sleep(0.1)
print("\r" + " " * (len(message) + 10), end="\r")
def _handle_address_command(self, parts):
"""Handle address command"""
print(f"\nDisplay Name: {self.display_name}")
if hasattr(self.destination, 'hash'):
print(f"LXMF Address: {RNS.prettyhexrep(self.destination.hash)}") # type: ignore
print(f"Auto-announce: Every {self.announce_interval}s\n")
def _handle_name_command(self, parts):
"""Handle name change command"""
if len(parts) < 2:
print("💡 Usage: name <new_name>")
else:
self.display_name = ' '.join(parts[1:])
if hasattr(self.destination, 'display_name'):
setattr(self.destination, 'display_name', self.display_name) # type: ignore
self.save_config()
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success(f"Display name: {self.display_name}")
self._print_success("Announced to network")
def _handle_interval_command(self, parts):
"""Handle announce interval command"""
if len(parts) < 2:
print(f"Current interval: {self.announce_interval}s")
print("💡 Usage: interval <seconds>")
print("Minimum: 30 seconds")
else:
try:
new_interval = int(parts[1])
if new_interval < 30:
self._print_warning("Minimum interval is 30 seconds, setting to 30")
new_interval = 30
self.announce_interval = new_interval
self.save_config()
self.stop_event.set()
time.sleep(0.1)
self.stop_event.clear()
self._print_success(f"Announce interval changed to: {self.announce_interval}s")
self._print_success("New interval will apply from next cycle")
except ValueError:
self._print_error("Invalid number")
def _handle_announce_command(self, parts):
"""Handle manual announce command"""
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success("Announced manually")
def _handle_add_command(self, parts):
"""Handle add contact command"""
if len(parts) < 3:
print("💡 Usage: add <name> <hash>")
else:
self.add_contact(parts[1], parts[2])
def _handle_edit_command(self, parts):
"""Handle edit contact command"""
if len(parts) < 2:
print("💡 Usage: edit <name/#>")
print("Example: edit Alice")
print("Example: edit 3")
else:
self.edit_contact(parts[1])
def _handle_savecontact_command(self, parts):
"""Handle quick save contact command"""
# If called without args, use last sender
if len(parts) < 2:
if self.last_sender_hash:
self.save_contact_from_hash(self.last_sender_hash, self.last_sender_name)
else:
print("💡 Usage: savecontact [hash]")
print("Or receive a message first, then just type 'save'")
else:
# Save specific hash
target_hash = parts[1]
self.save_contact_from_hash(target_hash)
def _handle_remove_command(self, parts):
"""Handle remove contact command"""
if len(parts) < 2:
print("💡 Usage: remove <name|index>")
else:
target = parts[1]
# Try to remove by index first
if target.isdigit():
index = int(target)
found = None
for name, data in self.contacts.items():
if data.get('index') == index:
found = name
break
if found:
del self.contacts[found]
self.save_contacts()
self._print_success(f"Removed: {found} (#{index})")
else:
self._print_error(f"Contact #{index} not found")
# Otherwise try to remove by name
elif target in self.contacts:
del self.contacts[target]
self.save_contacts()
self._print_success(f"Removed: {target}")
else:
self._print_error(f"Not found: {target}")
def _handle_reply_command(self, parts):
"""Handle reply command"""
if len(parts) < 2:
print("💡 Usage: reply <message>")
if self.last_sender_hash:
sender_display = self.format_contact_display(self.last_sender_hash, show_hash=False)
print(f"Will reply to: {sender_display}")
else:
self._print_warning("No recent message to reply to")
else:
if self.last_sender_hash is None:
self._print_error("No recent message to reply to")
print(" Receive a message first, then use 'reply'")
else:
message_text = ' '.join(parts[1:])
self.send_message(self.last_sender_hash, message_text)
def _handle_replyto_command(self, parts):
"""Handle replyto command"""
if self.last_sender_hash:
sender_display = self.format_contact_display(self.last_sender_hash, show_hash=True)
print(f"\nCurrent reply target: {sender_display}\n")
else:
print("\nNo reply target set")
print("Receive a message first\n")
def _handle_send_command(self, parts):
"""Handle send message command"""
if len(parts) < 3:
print("💡 Usage: send <name/hash> <message>")
else:
message_text = ' '.join(parts[2:])
self.send_message(parts[1], message_text)
def _handle_messages_command(self, parts):
"""Handle messages command"""
if len(parts) >= 2 and parts[1].lower() == 'user':
# View conversation with specific user by index
if len(parts) >= 3:
try:
user_idx = int(parts[2])
# Find the conversation by fixed index
target_hash = None
for hash_str, conv_idx in self.conversation_indices.items():
if conv_idx == user_idx:
target_hash = hash_str
break
if target_hash:
self.show_messages(limit=9999, filter_hash=target_hash)
else:
self._print_error(f"No conversation with index #{user_idx}. Use 'messages list' to see available conversations")
except ValueError:
self._print_error("User number must be a valid number")
else:
print("💡 Usage: messages user <#>")
print("Use 'messages list' to see numbered user list")
elif len(parts) >= 2 and parts[1].lower() == 'list':
# Show list of users with message counts
self.show_message_list_with_users()
else:
# Show recent messages
limit = 10
if len(parts) > 1:
try:
limit = int(parts[1])
except ValueError:
self._print_warning("Invalid number, showing last 10 messages")
self.show_messages(limit)
def _handle_sendpeer_command(self, parts):
"""Handle sendpeer command"""
if len(parts) < 3:
print("💡 Usage: sendpeer <peer_number> <message>")
print("Use 'peers' to see the list first")
else:
message_text = ' '.join(parts[2:])
self.send_to_peer(parts[1], message_text)
def _handle_addpeer_command(self, parts):
"""Handle addpeer command"""
if len(parts) < 2:
print("💡 Usage: addpeer <peer_number> [custom_name]")
print("Use 'peers' to see the list first")
else:
custom_name = ' '.join(parts[2:]) if len(parts) > 2 else None
self.add_peer_to_contacts(parts[1], custom_name)
def _handle_discoverannounce_command(self, parts):
"""Handle discoverannounce command"""
if len(parts) < 2:
status = "ON" if self.show_announces else "OFF"
print(f"\nDiscovery announces: {status}")
print("💡 Usage: discoverannounce <on/off>")
print(" Controls whether new peer discoveries are shown\n")
else:
setting = parts[1].lower()
if setting in ['on', 'yes', 'true', '1']:
self.show_announces = True
self.save_config()
self._print_success("Discovery announces enabled")
elif setting in ['off', 'no', 'false', '0']:
self.show_announces = False
self.save_config()
self._print_success("Discovery announces disabled")
else:
self._print_error("Use 'on' or 'off'")
def _handle_blacklist_command(self, parts):
"""Handle blacklist command"""
if len(parts) < 2:
self.list_blacklist()
else:
subcmd = parts[1].lower()
if subcmd == 'list':
self.list_blacklist()
elif subcmd == 'add' and len(parts) >= 3:
target = ' '.join(parts[2:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.add_to_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Blacklisted: {contact_display}")
else:
self._print_error(f"Unknown contact or invalid hash: {target}")
elif subcmd == 'remove' and len(parts) >= 3:
target = ' '.join(parts[2:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.remove_from_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Unblocked: {contact_display}")
else:
self._print_error(f"Unknown contact or invalid hash: {target}")
elif subcmd == 'clear':
confirm = input("Clear entire blacklist? [y/N]: ").strip().lower()
if confirm == 'y':
count = len(self.blacklist)
self.blacklist.clear()
self.save_blacklist()
self._print_success(f"Cleared {count} entries from blacklist")
else:
print("Cancelled")
else:
print("💡 Usage:")
print(" blacklist [list] - Show blacklist")
print(" blacklist add <#/name> - Block contact/peer")
print(" blacklist remove <#/name> - Unblock")
print(" blacklist clear - Clear all")
def _handle_block_command(self, parts):
"""Handle block command"""
if len(parts) < 2:
print("💡 Usage: block <contact_#/name/hash>")
else:
target = ' '.join(parts[1:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.add_to_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Blocked: {contact_display}")
else:
self._print_error(f"Unknown contact: {target}")
def _handle_unblock_command(self, parts):
"""Handle unblock command"""
if len(parts) < 2:
print("💡 Usage: unblock <contact_#/name/hash>")
else:
target = ' '.join(parts[1:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.remove_from_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Unblocked: {contact_display}")
else:
self._print_error(f"Unknown contact: {target}")
def _handle_plugin_command(self, parts):
"""Handle plugin command"""
if len(parts) < 2:
self.list_plugins()
else:
subcmd = parts[1].lower()
if subcmd == 'list':
self.list_plugins()
elif subcmd == 'enable' and len(parts) >= 3:
plugin_name = parts[2]
self.plugins_enabled[plugin_name] = True
self.save_plugins_config()
self._print_success(f"Plugin {plugin_name} enabled")
self._print_warning("Use 'plugin reload' to activate")
elif subcmd == 'disable' and len(parts) >= 3:
plugin_name = parts[2]
self.plugins_enabled[plugin_name] = False
self.save_plugins_config()
self._print_success(f"Plugin {plugin_name} disabled")
self._print_warning("Use 'plugin reload' to deactivate")
elif subcmd == 'reload':
self.plugins = {}
self.load_plugins()
self._print_success("Plugins reloaded")
else:
print("💡 Usage: plugin [list|enable|disable|reload]")
def _handle_debug_command(self, parts):
"""Handle debug command"""
print(f"\n=== Debug Info ===")
print(f"Suppressed file errors: {self.suppressed_errors}")
print(f"Cache dirty: {self.cache_dirty}")
print(f"Last cache save: {time.time() - self.last_cache_save:.1f}s ago")
print(f"Announced peers: {len(self.announced_peers)}")
print(f"Cached display names: {len(self.display_name_cache)}")
print()
def run(self):
"""Main command loop with proper async input handling"""
self.running = True
print(f"\n{Fore.CYAN}Welcome to LXMF Client!{Style.RESET_ALL}" if COLOR_ENABLED else "\nWelcome to LXMF Client!")
print(f"{Fore.YELLOW}Type 'help' or 'h' to see available commands{Style.RESET_ALL}\n" if COLOR_ENABLED else "Type 'help' or 'h' to see available commands\n")
# Create prompt session
session = PromptSession()
try:
while self.running:
try:
# Build dynamic prompt with proper formatting
with self.messages_lock:
if self.messages and self.messages[-1]['direction'] == 'inbound':
# Use HTML formatting for prompt_toolkit
if COLOR_ENABLED:
prompt_text = HTML('<style color="green">●</style> &gt; ')
else:
prompt_text = "● > "
else:
prompt_text = "> "
# Use patch_stdout to allow background prints without corrupting input
with patch_stdout():
cmd_line = session.prompt(prompt_text).strip()
if not cmd_line:
continue
parts = cmd_line.split(maxsplit=2)
cmd = self.resolve_command(parts[0].lower())
# Check for plugin commands first
if self.handle_plugin_command(cmd, parts):
continue
# Command routing
if cmd in ['quit', 'exit']:
self.running = False
print("Goodbye!")
break
elif cmd == 'help':
self.show_help()
elif cmd == 'status':
self.show_status()
elif cmd == 'settings':
self.show_settings_menu()
elif cmd == 'address':
self._handle_address_command(parts)
elif cmd == 'name':
self._handle_name_command(parts)
elif cmd == 'interval':
self._handle_interval_command(parts)
elif cmd == 'announce':
self._handle_announce_command(parts)
elif cmd == 'contacts':
self.list_contacts()
elif cmd == 'add':
self._handle_add_command(parts)
elif cmd == 'edit':
self._handle_edit_command(parts)
elif cmd == 'remove':
self._handle_remove_command(parts)
elif cmd == 'savecontact':
self._handle_savecontact_command(parts)
elif cmd == 'reply':
self._handle_reply_command(parts)
elif cmd == 'replyto':
self._handle_replyto_command(parts)
elif cmd == 'send':
self._handle_send_command(parts)
elif cmd == 'messages':
self._handle_messages_command(parts)
elif cmd == 'stats':
self.show_stats()
elif cmd == 'peers':
self.list_peers()
elif cmd == 'sendpeer':
self._handle_sendpeer_command(parts)
elif cmd == 'addpeer':
self._handle_addpeer_command(parts)
elif cmd == 'discoverannounce':
self._handle_discoverannounce_command(parts)
elif cmd == 'blacklist':
self._handle_blacklist_command(parts)
elif cmd == 'block':
self._handle_block_command(parts)
elif cmd == 'unblock':
self._handle_unblock_command(parts)
elif cmd == 'clear':
self.clear_screen()
elif cmd == 'restart':
self.restart_client()
break
elif cmd == 'plugin':
self._handle_plugin_command(parts)
elif cmd == 'debug':
self._handle_debug_command(parts)
else:
print(f"Unknown command: {cmd}")
print("Type 'help' or 'h' for commands")
except EOFError:
break
except KeyboardInterrupt:
print("\nType 'quit' or 'q' to exit")
continue
except Exception as e:
self._print_error(f"Error: {e}")
finally:
self.shutdown()
def main():
# Get responsive width
try:
width = shutil.get_terminal_size().columns
except:
width = 60
sep_width = min(width, 60)
banner_lines = [
"",
" ██╗ ██╗ ██╗███╗ ███╗███████╗",
" ██║ ╚██╗██╔╝████╗ ████║██╔════╝",
" ██║ ╚███╔╝ ██╔████╔██║█████╗ ",
" ██║ ██╔██╗ ██║╚██╔╝██║██╔══╝ ",
" ███████╗██╔╝ ██╗██║ ╚═╝ ██║██║ ",
" ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ",
"",
"Interactive LXMF Client"
]
print("\n" + ""*sep_width)
if COLOR_ENABLED:
for line in banner_lines:
centered = line.center(sep_width)
print(f"{Fore.WHITE}{Style.BRIGHT}{centered}{Style.RESET_ALL}")
else:
for line in banner_lines:
centered = line.center(sep_width)
print(centered)
print(""*sep_width + "\n")
client = LXMFClient()
client.run()
if __name__ == "__main__":
main()