Merge pull request #1 from neoemit/feat/mac-specific-notification

Add mac specific notification and requirements.txt , refactoring
This commit is contained in:
F
2025-11-15 14:12:23 +01:00
committed by GitHub
3 changed files with 403 additions and 332 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
lxmf_client_*

View File

@@ -1,21 +1,24 @@
#!/usr/bin/env python3
"""
Terminal-Based Interactive LXMF Messaging Client - Initial Version Release
Terminal-Based Interactive LXMF Messaging Client
"""
import RNS
import LXMF
import time
import sys
import os
import json
import threading
from datetime import datetime
import types
import shutil
import traceback
import itertools
import platform
import subprocess
try:
from colorama import init, Fore, Style
from colorama import init, Fore, Style # type: ignore
init(autoreset=True)
COLOR_ENABLED = True
except ImportError:
@@ -157,11 +160,13 @@ class LXMFClient:
if self.stamp_cost_enabled and self.stamp_cost > 0:
try:
# Set the stamp cost directly on the destination
self.destination.stamp_cost = self.stamp_cost
if hasattr(self.destination, 'stamp_cost'):
setattr(self.destination, 'stamp_cost', self.stamp_cost) # type: ignore
self._print_success(f"Stamp cost configured: {self.stamp_cost} bits")
# Force an announce so the stamp cost is advertised
self.destination.announce()
self._print_success("Announced with stamp cost")
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success("Announced with stamp cost")
except Exception as e:
self._print_warning(f"Could not set stamp cost: {e}")
@@ -194,7 +199,8 @@ class LXMFClient:
print(f"\n{'='*sep_width}")
self._print_color(f"Display Name: {self.display_name}", Fore.GREEN + Style.BRIGHT)
self._print_color(f"LXMF Address: {RNS.prettyhexrep(self.destination.hash)}", Fore.CYAN)
if hasattr(self.destination, 'hash'):
self._print_color(f"LXMF Address: {RNS.prettyhexrep(self.destination.hash)}", Fore.CYAN) # type: ignore
self._print_color(f"Auto-announce: Every {self.announce_interval} seconds", Fore.YELLOW)
# Show stamp cost status
@@ -207,8 +213,9 @@ class LXMFClient:
# Initial announce (this will now include stamp cost)
self._print_color("Announcing to network...", Fore.CYAN)
self.destination.announce()
self._print_success("Initial announce complete")
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success("Initial announce complete")
# Start background threads
self.announce_thread = threading.Thread(target=self.announce_loop, daemon=True)
@@ -299,9 +306,12 @@ class LXMFClient:
# Load the plugin module
plugin_path = os.path.join(self.plugins_dir, filename)
spec = importlib.util.spec_from_file_location(plugin_name, plugin_path)
module = importlib.util.module_from_spec(spec)
sys.modules[plugin_name] = module
spec.loader.exec_module(module)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
sys.modules[plugin_name] = module
spec.loader.exec_module(module)
else:
continue
# Get plugin class
if hasattr(module, 'Plugin'):
@@ -645,21 +655,6 @@ class LXMFClient:
"""Print warning message"""
self._print_color(f"{text}", Fore.YELLOW)
def router_job_loop(self):
"""Continuously process router jobs"""
while not self.stop_event.is_set():
try:
if hasattr(self.router, 'jobs'):
self.router.jobs()
if hasattr(self.router, 'process_outbound'):
if not getattr(self.router, 'processing_outbound', False):
self.router.process_outbound()
time.sleep(0.1)
except Exception:
pass
def thread_exception_handler(self, args):
"""Log thread exceptions"""
# Count and skip file-related errors
@@ -685,14 +680,6 @@ class LXMFClient:
else:
self.display_name_cache = {}
def save_display_name_cache(self):
"""Save display name cache"""
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.display_name_cache, f, indent=2, ensure_ascii=False)
except Exception as e:
self._print_warning(f"Error saving display name cache: {e}")
def cache_display_name(self, hash_str, display_name):
"""Cache a display name for a hash"""
if display_name and isinstance(display_name, str) and display_name.strip():
@@ -750,7 +737,8 @@ class LXMFClient:
break
if not self.stop_event.is_set():
self.destination.announce()
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
timestamp = datetime.now().strftime('%H:%M:%S')
self._print_color(f"\n[Auto-announced at {timestamp}]", Fore.CYAN)
print("> ", end="", flush=True)
@@ -777,7 +765,6 @@ class LXMFClient:
self._print_warning(f"Error loading config: {e}")
# === FIRST TIME SETUP ===
import shutil
try:
width = shutil.get_terminal_size().columns
except:
@@ -1041,8 +1028,8 @@ class LXMFClient:
except Exception as e:
self._print_error(f"Error processing message: {e}")
import traceback
traceback.print_exc()
import traceback as tb
tb.print_exc()
print("> ", end="", flush=True)
def load_contacts(self):
@@ -1078,7 +1065,8 @@ class LXMFClient:
# Try to access Reticulum's stored announces
if hasattr(RNS.Transport, 'announces'):
print(f"[DEBUG] Checking Transport.announces...")
for destination_hash, announce_data in RNS.Transport.announces.items():
announces_dict = getattr(RNS.Transport, 'announces', {})
for destination_hash, announce_data in announces_dict.items():
try:
hash_str = RNS.prettyhexrep(destination_hash)
@@ -1393,10 +1381,11 @@ class LXMFClient:
destination=dest,
source=self.destination,
content=content,
title=title,
title=title or "",
desired_method=LXMF.LXMessage.DIRECT
)
message.send_timestamp = send_start_time
# Add custom attribute for tracking
setattr(message, 'send_timestamp', send_start_time)
message.register_delivery_callback(self.on_delivery)
message.register_failed_callback(self.on_failed)
@@ -1469,7 +1458,6 @@ class LXMFClient:
return False
except Exception as e:
self._print_error(f"Error sending message: {e}")
import traceback
traceback.print_exc()
return False
@@ -1601,7 +1589,6 @@ class LXMFClient:
def show_status(self):
"""Show current status and connection info"""
import shutil
try:
width = min(shutil.get_terminal_size().columns, 80)
except:
@@ -1614,7 +1601,8 @@ class LXMFClient:
# Identity info
print(f"\n{Fore.GREEN}Identity:{Style.RESET_ALL}")
print(f" Display Name: {self.display_name}")
print(f" LXMF Address: {RNS.prettyhexrep(self.destination.hash)}")
if hasattr(self.destination, 'hash'):
print(f" LXMF Address: {RNS.prettyhexrep(self.destination.hash)}") # type: ignore
# Network info
print(f"\n{Fore.CYAN}Network:{Style.RESET_ALL}")
@@ -1684,7 +1672,6 @@ class LXMFClient:
return
# Get responsive width
import shutil
try:
width = min(shutil.get_terminal_size().columns, 80)
except:
@@ -1900,9 +1887,24 @@ class LXMFClient:
else:
self._show_main_help()
def _show_messaging_help(self):
"""Show messaging help"""
self._print_color("Messaging commands: send, reply, messages", Fore.CYAN)
def _show_contacts_help(self):
"""Show contacts help"""
self._print_color("Contact commands: contacts, add, remove, peers", Fore.CYAN)
def _show_settings_help(self):
"""Show settings help"""
self._print_color("Settings commands: settings, name, interval", Fore.CYAN)
def _show_system_help(self):
"""Show system help"""
self._print_color("System commands: status, restart, clear, help, quit", Fore.CYAN)
def _show_main_help(self):
"""Show main help menu with categories"""
import shutil
try:
width = shutil.get_terminal_size().columns
@@ -2140,7 +2142,6 @@ class LXMFClient:
def show_settings_menu(self):
"""Show interactive settings menu"""
import shutil
while True:
# Get responsive width
@@ -2183,9 +2184,11 @@ class LXMFClient:
new_name = input(f"\nEnter new display name [{self.display_name}]: ").strip()
if new_name:
self.display_name = new_name
self.destination.display_name = self.display_name
if hasattr(self.destination, 'display_name'):
setattr(self.destination, 'display_name', self.display_name) # type: ignore
self.save_config()
self.destination.announce()
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success(f"Display name changed to: {self.display_name}")
self._print_success("Announced to network")
else:
@@ -2266,9 +2269,11 @@ class LXMFClient:
if 0 <= cost <= 32:
self.stamp_cost = cost
self.stamp_cost_enabled = True
self.destination.stamp_cost = self.stamp_cost
if hasattr(self.destination, 'stamp_cost'):
setattr(self.destination, 'stamp_cost', self.stamp_cost) # type: ignore
self.save_config()
self.destination.announce()
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success(f"Stamp cost enabled: {self.stamp_cost}")
self._print_success("Announced to network")
else:
@@ -2281,9 +2286,11 @@ class LXMFClient:
# Disabling
self.stamp_cost_enabled = False
self.stamp_cost = 0
self.destination.stamp_cost = 0
if hasattr(self.destination, 'stamp_cost'):
setattr(self.destination, 'stamp_cost', 0) # type: ignore
self.save_config()
self.destination.announce()
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success("Stamp cost disabled")
self._print_success("Announced to network")
@@ -2387,9 +2394,6 @@ class LXMFClient:
def notify_new_message(self):
"""Visual and audio notification for new message - respects user settings"""
import shutil
import platform
import os
# === SOUND NOTIFICATION ===
if self.notify_sound or self.notify_bell:
@@ -2427,7 +2431,7 @@ class LXMFClient:
# Windows notifications
if self.notify_sound:
try:
import winsound
import winsound # type: ignore
# Musical melody
melody = [
(523, 80), # C5
@@ -2438,7 +2442,7 @@ class LXMFClient:
]
for freq, duration in melody:
winsound.Beep(freq, duration)
winsound.Beep(freq, duration) # type: ignore
time.sleep(0.01)
except ImportError:
# winsound not available, use bell
@@ -2458,6 +2462,28 @@ class LXMFClient:
print("\a", end="", flush=True)
time.sleep(0.1)
elif system == 'Darwin':
# macOS: attempt system sound, fallback to terminal bell
try:
if self.notify_sound:
sound_candidates = [
"/System/Library/Sounds/Ping.aiff",
"/System/Library/Sounds/Glass.aiff",
"/System/Library/Sounds/Submarine.aiff"
]
sound_path = next((p for p in sound_candidates if os.path.exists(p)), None)
if sound_path:
subprocess.Popen(["afplay", sound_path])
else:
subprocess.run(["osascript", "-e", "beep"], check=False)
if self.notify_bell:
for _ in range(2):
print("\a", end="", flush=True)
time.sleep(0.12)
except Exception:
if self.notify_bell:
print("\a", end="", flush=True)
time.sleep(0.05)
elif system == 'Linux':
# Linux notifications
if self.notify_sound:
@@ -2476,7 +2502,7 @@ class LXMFClient:
time.sleep(0.15)
else:
# Generic/macOS - terminal bell only
# Other/unknown systems - terminal bell only
if self.notify_bell:
for _ in range(3):
print("\a", end="", flush=True)
@@ -2605,7 +2631,6 @@ class LXMFClient:
def show_progress_spinner(self, message, duration=2):
"""Show a spinner for background operations"""
import itertools
spinner = itertools.cycle(['', '', '', '', '', '', '', '', '', ''])
end_time = time.time() + duration
@@ -2617,338 +2642,381 @@ class LXMFClient:
time.sleep(0.1)
print("\r" + " " * (len(message) + 10), end="\r")
def _handle_address_command(self, parts):
"""Handle address command"""
print(f"\nDisplay Name: {self.display_name}")
if hasattr(self.destination, 'hash'):
print(f"LXMF Address: {RNS.prettyhexrep(self.destination.hash)}") # type: ignore
print(f"Auto-announce: Every {self.announce_interval}s\n")
def _handle_name_command(self, parts):
"""Handle name change command"""
if len(parts) < 2:
print("Usage: name <new_name>")
else:
self.display_name = ' '.join(parts[1:])
if hasattr(self.destination, 'display_name'):
setattr(self.destination, 'display_name', self.display_name) # type: ignore
self.save_config()
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success(f"Display name: {self.display_name}")
self._print_success("Announced to network")
def _handle_interval_command(self, parts):
"""Handle announce interval command"""
if len(parts) < 2:
print(f"Current interval: {self.announce_interval}s")
print("Usage: interval <seconds>")
print("Minimum: 30 seconds")
else:
try:
new_interval = int(parts[1])
if new_interval < 30:
self._print_warning("Minimum interval is 30 seconds, setting to 30")
new_interval = 30
self.announce_interval = new_interval
self.save_config()
self.stop_event.set()
time.sleep(0.1)
self.stop_event.clear()
self._print_success(f"Announce interval changed to: {self.announce_interval}s")
self._print_success("New interval will apply from next cycle")
except ValueError:
self._print_error("Invalid number")
def _handle_announce_command(self, parts):
"""Handle manual announce command"""
if hasattr(self.destination, 'announce'):
self.destination.announce() # type: ignore
self._print_success("Announced manually")
def _handle_add_command(self, parts):
"""Handle add contact command"""
if len(parts) < 3:
print("Usage: add <name> <hash>")
else:
self.add_contact(parts[1], parts[2])
def _handle_remove_command(self, parts):
"""Handle remove contact command"""
if len(parts) < 2:
print("Usage: remove <name>")
else:
if parts[1] in self.contacts:
del self.contacts[parts[1]]
self.save_contacts()
self._print_success(f"Removed: {parts[1]}")
else:
self._print_error(f"Not found: {parts[1]}")
def _handle_reply_command(self, parts):
"""Handle reply command"""
if len(parts) < 2:
print("Usage: reply <message>")
if self.last_sender_hash:
sender_display = self.format_contact_display(self.last_sender_hash, show_hash=False)
print(f"Will reply to: {sender_display}")
else:
self._print_warning("No recent message to reply to")
else:
if self.last_sender_hash is None:
self._print_error("No recent message to reply to")
print(" Receive a message first, then use 'reply'")
else:
message_text = ' '.join(parts[1:])
self.send_message(self.last_sender_hash, message_text)
def _handle_replyto_command(self, parts):
"""Handle replyto command"""
if self.last_sender_hash:
sender_display = self.format_contact_display(self.last_sender_hash, show_hash=True)
print(f"\nCurrent reply target: {sender_display}\n")
else:
print("\nNo reply target set")
print("Receive a message first\n")
def _handle_send_command(self, parts):
"""Handle send message command"""
if len(parts) < 3:
print("Usage: send <name/hash> <message>")
else:
message_text = ' '.join(parts[2:])
self.send_message(parts[1], message_text)
def _handle_messages_command(self, parts):
"""Handle messages command"""
if len(parts) >= 2 and parts[1].lower() == 'user':
# View conversation with specific user by index
if len(parts) >= 3:
try:
user_idx = int(parts[2])
# Find the conversation by fixed index
target_hash = None
for hash_str, conv_idx in self.conversation_indices.items():
if conv_idx == user_idx:
target_hash = hash_str
break
if target_hash:
self.show_messages(limit=9999, filter_hash=target_hash)
else:
self._print_error(f"No conversation with index #{user_idx}. Use 'messages list' to see available conversations")
except ValueError:
self._print_error("User number must be a valid number")
else:
print("Usage: messages user <#>")
print("Use 'messages list' to see numbered user list")
elif len(parts) >= 2 and parts[1].lower() == 'list':
# Show list of users with message counts
self.show_message_list_with_users()
else:
# Show recent messages
limit = 10
if len(parts) > 1:
try:
limit = int(parts[1])
except ValueError:
self._print_warning("Invalid number, showing last 10 messages")
self.show_messages(limit)
def _handle_sendpeer_command(self, parts):
"""Handle sendpeer command"""
if len(parts) < 3:
print("Usage: sendpeer <peer_number> <message>")
print("Use 'peers' to see the list first")
else:
message_text = ' '.join(parts[2:])
self.send_to_peer(parts[1], message_text)
def _handle_addpeer_command(self, parts):
"""Handle addpeer command"""
if len(parts) < 2:
print("Usage: addpeer <peer_number> [custom_name]")
print("Use 'peers' to see the list first")
else:
custom_name = ' '.join(parts[2:]) if len(parts) > 2 else None
self.add_peer_to_contacts(parts[1], custom_name)
def _handle_discoverannounce_command(self, parts):
"""Handle discoverannounce command"""
if len(parts) < 2:
status = "ON" if self.show_announces else "OFF"
print(f"\nDiscovery announces: {status}")
print("Usage: discoverannounce <on/off>")
print(" Controls whether new peer discoveries are shown\n")
else:
setting = parts[1].lower()
if setting in ['on', 'yes', 'true', '1']:
self.show_announces = True
self.save_config()
self._print_success("Discovery announces enabled")
elif setting in ['off', 'no', 'false', '0']:
self.show_announces = False
self.save_config()
self._print_success("Discovery announces disabled")
else:
self._print_error("Use 'on' or 'off'")
def _handle_blacklist_command(self, parts):
"""Handle blacklist command"""
if len(parts) < 2:
self.list_blacklist()
else:
subcmd = parts[1].lower()
if subcmd == 'list':
self.list_blacklist()
elif subcmd == 'add' and len(parts) >= 3:
target = ' '.join(parts[2:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.add_to_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Blacklisted: {contact_display}")
else:
self._print_error(f"Unknown contact or invalid hash: {target}")
elif subcmd == 'remove' and len(parts) >= 3:
target = ' '.join(parts[2:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.remove_from_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Unblocked: {contact_display}")
else:
self._print_error(f"Unknown contact or invalid hash: {target}")
elif subcmd == 'clear':
confirm = input("Clear entire blacklist? [y/N]: ").strip().lower()
if confirm == 'y':
count = len(self.blacklist)
self.blacklist.clear()
self.save_blacklist()
self._print_success(f"Cleared {count} entries from blacklist")
else:
print("Cancelled")
else:
print("Usage:")
print(" blacklist [list] - Show blacklist")
print(" blacklist add <#/name> - Block contact/peer")
print(" blacklist remove <#/name> - Unblock")
print(" blacklist clear - Clear all")
def _handle_block_command(self, parts):
"""Handle block command"""
if len(parts) < 2:
print("Usage: block <contact_#/name/hash>")
else:
target = ' '.join(parts[1:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.add_to_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Blocked: {contact_display}")
else:
self._print_error(f"Unknown contact: {target}")
def _handle_unblock_command(self, parts):
"""Handle unblock command"""
if len(parts) < 2:
print("Usage: unblock <contact_#/name/hash>")
else:
target = ' '.join(parts[1:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.remove_from_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Unblocked: {contact_display}")
else:
self._print_error(f"Unknown contact: {target}")
def _handle_plugin_command(self, parts):
"""Handle plugin command"""
if len(parts) < 2:
self.list_plugins()
else:
subcmd = parts[1].lower()
if subcmd == 'list':
self.list_plugins()
elif subcmd == 'enable' and len(parts) >= 3:
plugin_name = parts[2]
self.plugins_enabled[plugin_name] = True
self.save_plugins_config()
self._print_success(f"Plugin {plugin_name} enabled")
self._print_warning("Use 'plugin reload' to activate")
elif subcmd == 'disable' and len(parts) >= 3:
plugin_name = parts[2]
self.plugins_enabled[plugin_name] = False
self.save_plugins_config()
self._print_success(f"Plugin {plugin_name} disabled")
self._print_warning("Use 'plugin reload' to deactivate")
elif subcmd == 'reload':
self.plugins = {}
self.load_plugins()
self._print_success("Plugins reloaded")
else:
print("Usage: plugin [list|enable|disable|reload]")
def _handle_debug_command(self, parts):
"""Handle debug command"""
print(f"\n=== Debug Info ===")
print(f"Suppressed file errors: {self.suppressed_errors}")
print(f"Cache dirty: {self.cache_dirty}")
print(f"Last cache save: {time.time() - self.last_cache_save:.1f}s ago")
print(f"Announced peers: {len(self.announced_peers)}")
print(f"Cached display names: {len(self.display_name_cache)}")
print()
def run(self):
"""Main loop"""
"""Main command loop - delegates to handler methods"""
self.running = True
# Show brief welcome message instead of full help
# Show brief welcome message
print(f"\n{Fore.CYAN}Welcome to LXMF Client!{Style.RESET_ALL}" if COLOR_ENABLED else "\nWelcome to LXMF Client!")
print(f"{Fore.YELLOW}Type 'help' or 'h' to see available commands{Style.RESET_ALL}\n" if COLOR_ENABLED else "Type 'help' or 'h' to see available commands\n")
# Show initial status and help (disable, uncomment to enable)
#self.show_status()
#self.show_help()
try:
while self.running:
try:
# Dynamic prompt showing unread indicator
with self.messages_lock:
if self.messages and self.messages[-1]['direction'] == 'inbound':
# Show if last message was inbound
prompt = f"{Fore.GREEN}{Style.RESET_ALL} > " if COLOR_ENABLED else "● > "
else:
prompt = "> "
cmd_line = input(prompt).strip()
if not cmd_line:
continue
parts = cmd_line.split(maxsplit=2)
cmd = self.resolve_command(parts[0].lower())
# Check for plugin commands first
if self.handle_plugin_command(cmd, parts):
continue
# Command routing
if cmd in ['quit', 'exit']:
self.running = False
print("Goodbye!")
break
elif cmd == 'help':
self.show_help()
elif cmd == 'status':
self.show_status()
elif cmd == 'settings':
self.show_settings_menu()
elif cmd == 'address':
print(f"\nDisplay Name: {self.display_name}")
print(f"LXMF Address: {RNS.prettyhexrep(self.destination.hash)}")
print(f"Auto-announce: Every {self.announce_interval}s\n")
self._handle_address_command(parts)
elif cmd == 'name':
if len(parts) < 2:
print("Usage: name <new_name>")
else:
self.display_name = ' '.join(parts[1:])
self.destination.display_name = self.display_name
self.save_config()
self.destination.announce()
self._print_success(f"Display name: {self.display_name}")
self._print_success("Announced to network")
self._handle_name_command(parts)
elif cmd == 'interval':
if len(parts) < 2:
print(f"Current interval: {self.announce_interval}s")
print("Usage: interval <seconds>")
print("Minimum: 30 seconds")
else:
try:
new_interval = int(parts[1])
if new_interval < 30:
self._print_warning("Minimum interval is 30 seconds, setting to 30")
new_interval = 30
self.announce_interval = new_interval
self.save_config()
self.stop_event.set()
time.sleep(0.1)
self.stop_event.clear()
self._print_success(f"Announce interval changed to: {self.announce_interval}s")
self._print_success("New interval will apply from next cycle")
except ValueError:
self._print_error("Invalid number")
self._handle_interval_command(parts)
elif cmd == 'announce':
self.destination.announce()
self._print_success("Announced manually")
self._handle_announce_command(parts)
elif cmd == 'contacts':
self.list_contacts()
elif cmd == 'add':
if len(parts) < 3:
print("Usage: add <name> <hash>")
else:
self.add_contact(parts[1], parts[2])
self._handle_add_command(parts)
elif cmd == 'remove':
if len(parts) < 2:
print("Usage: remove <name>")
else:
if parts[1] in self.contacts:
del self.contacts[parts[1]]
self.save_contacts()
self._print_success(f"Removed: {parts[1]}")
else:
self._print_error(f"Not found: {parts[1]}")
self._handle_remove_command(parts)
elif cmd == 'reply':
if len(parts) < 2:
print("Usage: reply <message>")
if self.last_sender_hash:
sender_display = self.format_contact_display(self.last_sender_hash, show_hash=False)
print(f"Will reply to: {sender_display}")
else:
self._print_warning("No recent message to reply to")
else:
if self.last_sender_hash is None:
self._print_error("No recent message to reply to")
print(" Receive a message first, then use 'reply'")
else:
message_text = ' '.join(parts[1:])
self.send_message(self.last_sender_hash, message_text)
self._handle_reply_command(parts)
elif cmd == 'replyto':
if self.last_sender_hash:
sender_display = self.format_contact_display(self.last_sender_hash, show_hash=True)
print(f"\nCurrent reply target: {sender_display}\n")
else:
print("\nNo reply target set")
print("Receive a message first\n")
self._handle_replyto_command(parts)
elif cmd == 'send':
if len(parts) < 3:
print("Usage: send <name/hash> <message>")
else:
message_text = ' '.join(parts[2:])
self.send_message(parts[1], message_text)
self._handle_send_command(parts)
elif cmd == 'messages':
if len(parts) >= 2 and parts[1].lower() == 'user':
# View conversation with specific user by index
if len(parts) >= 3:
try:
user_idx = int(parts[2])
# Find the conversation by fixed index
target_hash = None
for hash_str, conv_idx in self.conversation_indices.items():
if conv_idx == user_idx:
target_hash = hash_str
break
if target_hash:
self.show_messages(limit=9999, filter_hash=target_hash)
else:
self._print_error(f"No conversation with index #{user_idx}. Use 'messages list' to see available conversations")
except ValueError:
self._print_error("User number must be a valid number")
else:
print("Usage: messages user <#>")
print("Use 'messages list' to see numbered user list")
elif len(parts) >= 2 and parts[1].lower() == 'list':
# Show list of users with message counts
self.show_message_list_with_users()
else:
# Show recent messages
limit = 10
if len(parts) > 1:
try:
limit = int(parts[1])
except ValueError:
self._print_warning("Invalid number, showing last 10 messages")
self.show_messages(limit)
self._handle_messages_command(parts)
elif cmd == 'stats':
self.show_stats()
elif cmd == 'peers':
self.list_peers()
elif cmd == 'sendpeer':
if len(parts) < 3:
print("Usage: sendpeer <peer_number> <message>")
print("Use 'peers' to see the list first")
else:
message_text = ' '.join(parts[2:])
self.send_to_peer(parts[1], message_text)
self._handle_sendpeer_command(parts)
elif cmd == 'addpeer':
if len(parts) < 2:
print("Usage: addpeer <peer_number> [custom_name]")
print("Use 'peers' to see the list first")
else:
custom_name = ' '.join(parts[2:]) if len(parts) > 2 else None
self.add_peer_to_contacts(parts[1], custom_name)
self._handle_addpeer_command(parts)
elif cmd == 'discoverannounce':
if len(parts) < 2:
status = "ON" if self.show_announces else "OFF"
print(f"\nDiscovery announces: {status}")
print("Usage: discoverannounce <on/off>")
print(" Controls whether new peer discoveries are shown\n")
else:
setting = parts[1].lower()
if setting in ['on', 'yes', 'true', '1']:
self.show_announces = True
self.save_config()
self._print_success("Discovery announces enabled")
elif setting in ['off', 'no', 'false', '0']:
self.show_announces = False
self.save_config()
self._print_success("Discovery announces disabled")
else:
self._print_error("Use 'on' or 'off'")
self._handle_discoverannounce_command(parts)
elif cmd == 'blacklist':
if len(parts) < 2:
self.list_blacklist()
else:
subcmd = parts[1].lower()
if subcmd == 'list':
self.list_blacklist()
elif subcmd == 'add' and len(parts) >= 3:
target = ' '.join(parts[2:])
# Try to resolve contact name or index to hash
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.add_to_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Blacklisted: {contact_display}")
else:
self._print_error(f"Unknown contact or invalid hash: {target}")
elif subcmd == 'remove' and len(parts) >= 3:
target = ' '.join(parts[2:])
# Try to resolve contact name or index to hash
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.remove_from_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Unblocked: {contact_display}")
else:
self._print_error(f"Unknown contact or invalid hash: {target}")
elif subcmd == 'clear':
confirm = input("Clear entire blacklist? [y/N]: ").strip().lower()
if confirm == 'y':
count = len(self.blacklist)
self.blacklist.clear()
self.save_blacklist()
self._print_success(f"Cleared {count} entries from blacklist")
else:
print("Cancelled")
else:
print("Usage:")
print(" blacklist [list] - Show blacklist")
print(" blacklist add <#/name> - Block contact/peer")
print(" blacklist remove <#/name> - Unblock")
print(" blacklist clear - Clear all")
self._handle_blacklist_command(parts)
elif cmd == 'block':
if len(parts) < 2:
print("Usage: block <contact_#/name/hash>")
else:
target = ' '.join(parts[1:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.add_to_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Blocked: {contact_display}")
else:
self._print_error(f"Unknown contact: {target}")
self._handle_block_command(parts)
elif cmd == 'unblock':
if len(parts) < 2:
print("Usage: unblock <contact_#/name/hash>")
else:
target = ' '.join(parts[1:])
dest_hash = self.resolve_contact_or_hash(target)
if dest_hash:
if self.remove_from_blacklist(dest_hash):
contact_display = self.format_contact_display_short(dest_hash)
self._print_success(f"Unblocked: {contact_display}")
else:
self._print_error(f"Unknown contact: {target}")
self._handle_unblock_command(parts)
elif cmd == 'clear':
self.clear_screen()
elif cmd == 'restart':
self.restart_client()
# Note: execution will not continue past this point
break
elif cmd == 'plugin':
if len(parts) < 2:
self.list_plugins()
else:
subcmd = parts[1].lower()
if subcmd == 'list':
self.list_plugins()
elif subcmd == 'enable' and len(parts) >= 3:
plugin_name = parts[2]
self.plugins_enabled[plugin_name] = True
self.save_plugins_config()
self._print_success(f"Plugin {plugin_name} enabled")
self._print_warning("Use 'plugin reload' to activate")
elif subcmd == 'disable' and len(parts) >= 3:
plugin_name = parts[2]
self.plugins_enabled[plugin_name] = False
self.save_plugins_config()
self._print_success(f"Plugin {plugin_name} disabled")
self._print_warning("Use 'plugin reload' to deactivate")
elif subcmd == 'reload':
self.plugins = {}
self.load_plugins()
self._print_success("Plugins reloaded")
else:
print("Usage: plugin [list|enable|disable|reload]")
self._handle_plugin_command(parts)
elif cmd == 'debug':
print(f"\n=== Debug Info ===")
print(f"Suppressed file errors: {self.suppressed_errors}")
print(f"Cache dirty: {self.cache_dirty}")
print(f"Last cache save: {time.time() - self.last_cache_save:.1f}s ago")
print(f"Announced peers: {len(self.announced_peers)}")
print(f"Cached display names: {len(self.display_name_cache)}")
print()
self._handle_debug_command(parts)
else:
# Check if a plugin wants to handle this command
if not self.handle_plugin_command(cmd, parts):
print(f"Unknown command: {cmd}")
print("Type 'help' or 'h' for commands")
print(f"Unknown command: {cmd}")
print("Type 'help' or 'h' for commands")
except EOFError:
break
@@ -2963,7 +3031,6 @@ class LXMFClient:
def main():
import shutil
banner = """
██╗ ██╗ ██╗███╗ ███╗███████╗

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
rns
lxmf
colorama