diff --git a/meshchatx/src/backend/announce_manager.py b/meshchatx/src/backend/announce_manager.py index e51d5ff..48fa201 100644 --- a/meshchatx/src/backend/announce_manager.py +++ b/meshchatx/src/backend/announce_manager.py @@ -49,6 +49,8 @@ class AnnounceManager: destination_hash=None, query=None, blocked_identity_hashes=None, + limit=None, + offset=0, ): sql = "SELECT * FROM announces WHERE 1=1" params = [] @@ -72,4 +74,9 @@ class AnnounceManager: params.extend(blocked_identity_hashes) sql += " ORDER BY updated_at DESC" + + if limit is not None: + sql += " LIMIT ? OFFSET ?" + params.extend([limit, offset]) + return self.db.provider.fetchall(sql, params) diff --git a/meshchatx/src/backend/async_utils.py b/meshchatx/src/backend/async_utils.py index 200dfc8..13de95f 100644 --- a/meshchatx/src/backend/async_utils.py +++ b/meshchatx/src/backend/async_utils.py @@ -1,4 +1,5 @@ import asyncio +import sys from collections.abc import Coroutine @@ -6,6 +7,34 @@ class AsyncUtils: # remember main loop main_loop: asyncio.AbstractEventLoop | None = None + @staticmethod + def apply_asyncio_313_patch(): + """ + Apply a patch for asyncio on Python 3.13 to avoid a bug in sendfile with SSL. + See: https://github.com/python/cpython/issues/124448 + And: https://github.com/aio-libs/aiohttp/issues/8863 + """ + if sys.version_info >= (3, 13): + import asyncio.base_events + + # We need to patch the loop's sendfile to raise NotImplementedError for SSL transports. + # This will force aiohttp to use its own fallback which works correctly. + + original_sendfile = asyncio.base_events.BaseEventLoop.sendfile + + async def patched_sendfile( + self, transport, file, offset=0, count=None, *, fallback=True + ): + if transport.get_extra_info("sslcontext"): + raise NotImplementedError( + "sendfile is broken on SSL transports in Python 3.13" + ) + return await original_sendfile( + self, transport, file, offset, count, fallback=fallback + ) + + asyncio.base_events.BaseEventLoop.sendfile = patched_sendfile + @staticmethod def set_main_loop(loop: asyncio.AbstractEventLoop): AsyncUtils.main_loop = loop diff --git a/meshchatx/src/backend/community_interfaces.py b/meshchatx/src/backend/community_interfaces.py new file mode 100644 index 0000000..59d7b66 --- /dev/null +++ b/meshchatx/src/backend/community_interfaces.py @@ -0,0 +1,115 @@ +import asyncio +import time +from typing import List, Dict, Any + + +class CommunityInterfacesManager: + def __init__(self): + self.interfaces = [ + { + "name": "noDNS1", + "type": "TCPClientInterface", + "target_host": "202.61.243.41", + "target_port": 4965, + "description": "Public TCP Relay", + }, + { + "name": "noDNS2", + "type": "TCPClientInterface", + "target_host": "193.26.158.230", + "target_port": 4965, + "description": "Public TCP Relay", + }, + { + "name": "0rbit-Net", + "type": "TCPClientInterface", + "target_host": "93.95.227.8", + "target_port": 49952, + "description": "Public TCP Relay", + }, + { + "name": "Quad4 TCP Node 1", + "type": "TCPClientInterface", + "target_host": "rns.quad4.io", + "target_port": 4242, + "description": "Public TCP Relay", + }, + { + "name": "Quad4 TCP Node 2", + "type": "TCPClientInterface", + "target_host": "rns2.quad4.io", + "target_port": 4242, + "description": "Public TCP Relay", + }, + { + "name": "RNS Testnet Amsterdam", + "type": "TCPClientInterface", + "target_host": "amsterdam.connect.reticulum.network", + "target_port": 4965, + "description": "Reticulum Testnet Hub", + }, + { + "name": "RNS Testnet BetweenTheBorders", + "type": "TCPClientInterface", + "target_host": "reticulum.betweentheborders.com", + "target_port": 4242, + "description": "Reticulum Testnet Hub", + }, + ] + self.status_cache = {} + self.last_check = 0 + self.check_interval = 600 # Check every 10 minutes + + async def check_health(self, host: str, port: int) -> bool: + try: + # Simple TCP connect check as a proxy for "working" + # In a real RNS environment, we might want to use RNS.Transport.probe() + # but that requires Reticulum to be running with a configured interface to that target. + # For "suggested" interfaces, we just check if they are reachable. + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), timeout=3.0 + ) + writer.close() + await writer.wait_closed() + return True + except Exception: + return False + + async def update_statuses(self): + tasks = [ + self.check_health(iface["target_host"], iface["target_port"]) + for iface in self.interfaces + ] + + results = await asyncio.gather(*tasks) + + for iface, is_online in zip(self.interfaces, results): + self.status_cache[iface["name"]] = { + "online": is_online, + "last_check": time.time(), + } + self.last_check = time.time() + + async def get_interfaces(self) -> List[Dict[str, Any]]: + # If cache is old or empty, update it + if time.time() - self.last_check > self.check_interval or not self.status_cache: + # We don't want to block the request, so we could do this in background + # but for now let's just do it. + await self.update_statuses() + + results = [] + for iface in self.interfaces: + status = self.status_cache.get( + iface["name"], {"online": False, "last_check": 0} + ) + results.append( + { + **iface, + "online": status["online"], + "last_check": status["last_check"], + } + ) + + # Sort so online ones are first + results.sort(key=lambda x: x["online"], reverse=True) + return results diff --git a/meshchatx/src/backend/config_manager.py b/meshchatx/src/backend/config_manager.py index e323701..f4bac80 100644 --- a/meshchatx/src/backend/config_manager.py +++ b/meshchatx/src/backend/config_manager.py @@ -112,6 +112,24 @@ class ConfigManager: self.auth_enabled = self.BoolConfig(self, "auth_enabled", False) self.auth_password_hash = self.StringConfig(self, "auth_password_hash", None) self.auth_session_secret = self.StringConfig(self, "auth_session_secret", None) + self.docs_downloaded = self.BoolConfig(self, "docs_downloaded", False) + self.initial_docs_download_attempted = self.BoolConfig( + self, + "initial_docs_download_attempted", + False, + ) + + # desktop config + self.desktop_open_calls_in_separate_window = self.BoolConfig( + self, + "desktop_open_calls_in_separate_window", + False, + ) + self.desktop_hardware_acceleration_enabled = self.BoolConfig( + self, + "desktop_hardware_acceleration_enabled", + True, + ) # voicemail config self.voicemail_enabled = self.BoolConfig(self, "voicemail_enabled", False) @@ -130,6 +148,12 @@ class ConfigManager: "voicemail_max_recording_seconds", 60, ) + self.voicemail_tts_speed = self.IntConfig(self, "voicemail_tts_speed", 130) + self.voicemail_tts_pitch = self.IntConfig(self, "voicemail_tts_pitch", 45) + self.voicemail_tts_voice = self.StringConfig( + self, "voicemail_tts_voice", "en-us+f3" + ) + self.voicemail_tts_word_gap = self.IntConfig(self, "voicemail_tts_word_gap", 5) # ringtone config self.custom_ringtone_enabled = self.BoolConfig( @@ -138,6 +162,8 @@ class ConfigManager: False, ) self.ringtone_filename = self.StringConfig(self, "ringtone_filename", None) + self.ringtone_preferred_id = self.IntConfig(self, "ringtone_preferred_id", 0) + self.ringtone_volume = self.IntConfig(self, "ringtone_volume", 100) # telephony config self.do_not_disturb_enabled = self.BoolConfig( @@ -150,6 +176,16 @@ class ConfigManager: "telephone_allow_calls_from_contacts_only", False, ) + self.telephone_audio_profile_id = self.IntConfig( + self, + "telephone_audio_profile_id", + 2, # Default to Voice (profile 2) + ) + self.call_recording_enabled = self.BoolConfig( + self, + "call_recording_enabled", + False, + ) # map config self.map_offline_enabled = self.BoolConfig(self, "map_offline_enabled", False) @@ -174,6 +210,31 @@ class ConfigManager: "https://nominatim.openstreetmap.org", ) + # translator config + self.translator_enabled = self.BoolConfig(self, "translator_enabled", False) + self.libretranslate_url = self.StringConfig( + self, + "libretranslate_url", + "http://localhost:5000", + ) + + # banishment config + self.banished_effect_enabled = self.BoolConfig( + self, + "banished_effect_enabled", + True, + ) + self.banished_text = self.StringConfig( + self, + "banished_text", + "BANISHED", + ) + self.banished_color = self.StringConfig( + self, + "banished_color", + "#dc2626", + ) + def get(self, key: str, default_value=None) -> str | None: return self.db.config.get(key, default_value) diff --git a/meshchatx/src/backend/database/__init__.py b/meshchatx/src/backend/database/__init__.py index 76085f5..65fe3da 100644 --- a/meshchatx/src/backend/database/__init__.py +++ b/meshchatx/src/backend/database/__init__.py @@ -1,7 +1,13 @@ +import os +import shutil +import zipfile +from datetime import UTC, datetime + from .announces import AnnounceDAO from .config import ConfigDAO from .contacts import ContactsDAO from .legacy_migrator import LegacyMigrator +from .map_drawings import MapDrawingsDAO from .messages import MessageDAO from .misc import MiscDAO from .provider import DatabaseProvider @@ -25,6 +31,7 @@ class Database: self.voicemails = VoicemailDAO(self.provider) self.ringtones = RingtoneDAO(self.provider) self.contacts = ContactsDAO(self.provider) + self.map_drawings = MapDrawingsDAO(self.provider) def initialize(self): self.schema.initialize() @@ -42,5 +49,237 @@ class Database: def execute_sql(self, query, params=None): return self.provider.execute(query, params) + def _tune_sqlite_pragmas(self): + try: + self.execute_sql("PRAGMA wal_autocheckpoint=1000") + self.execute_sql("PRAGMA temp_store=MEMORY") + self.execute_sql("PRAGMA journal_mode=WAL") + except Exception as exc: + print(f"SQLite pragma setup failed: {exc}") + + def _get_pragma_value(self, pragma: str, default=None): + try: + cursor = self.execute_sql(f"PRAGMA {pragma}") + row = cursor.fetchone() + if row is None: + return default + return row[0] + except Exception: + return default + + def _get_database_file_stats(self): + def size_for(path): + try: + return os.path.getsize(path) + except OSError: + return 0 + + db_path = self.provider.db_path + wal_path = f"{db_path}-wal" + shm_path = f"{db_path}-shm" + + main_bytes = size_for(db_path) + wal_bytes = size_for(wal_path) + shm_bytes = size_for(shm_path) + + return { + "main_bytes": main_bytes, + "wal_bytes": wal_bytes, + "shm_bytes": shm_bytes, + "total_bytes": main_bytes + wal_bytes + shm_bytes, + } + + def _database_paths(self): + db_path = self.provider.db_path + return { + "main": db_path, + "wal": f"{db_path}-wal", + "shm": f"{db_path}-shm", + } + + def get_database_health_snapshot(self): + page_size = self._get_pragma_value("page_size", 0) or 0 + page_count = self._get_pragma_value("page_count", 0) or 0 + freelist_pages = self._get_pragma_value("freelist_count", 0) or 0 + free_bytes = ( + page_size * freelist_pages if page_size > 0 and freelist_pages > 0 else 0 + ) + + return { + "quick_check": self._get_pragma_value("quick_check", "unknown"), + "journal_mode": self._get_pragma_value("journal_mode", "unknown"), + "synchronous": self._get_pragma_value("synchronous", None), + "wal_autocheckpoint": self._get_pragma_value("wal_autocheckpoint", None), + "auto_vacuum": self._get_pragma_value("auto_vacuum", None), + "page_size": page_size, + "page_count": page_count, + "freelist_pages": freelist_pages, + "estimated_free_bytes": free_bytes, + "files": self._get_database_file_stats(), + } + + def _checkpoint_wal(self, mode: str = "TRUNCATE"): + return self.execute_sql(f"PRAGMA wal_checkpoint({mode})").fetchall() + + def run_database_vacuum(self): + try: + # Attempt to checkpoint WAL, ignore errors if busy + try: + self._checkpoint_wal() + except Exception as e: + print( + f"Warning: WAL checkpoint during vacuum failed (non-critical): {e}" + ) + + self.execute_sql("VACUUM") + self._tune_sqlite_pragmas() + + return { + "health": self.get_database_health_snapshot(), + } + except Exception as e: + # Wrap in a cleaner error message + raise Exception(f"Database vacuum failed: {e!s}") + + def run_database_recovery(self): + actions = [] + + actions.append( + { + "step": "quick_check_before", + "result": self._get_pragma_value("quick_check", "unknown"), + }, + ) + + actions.append({"step": "wal_checkpoint", "result": self._checkpoint_wal()}) + + integrity_rows = self.provider.integrity_check() + integrity = [row[0] for row in integrity_rows] if integrity_rows else [] + actions.append({"step": "integrity_check", "result": integrity}) + + self.provider.vacuum() + self._tune_sqlite_pragmas() + + actions.append( + { + "step": "quick_check_after", + "result": self._get_pragma_value("quick_check", "unknown"), + }, + ) + + return { + "actions": actions, + "health": self.get_database_health_snapshot(), + } + + def _checkpoint_and_close(self): + try: + self._checkpoint_wal() + except Exception as e: + print(f"Failed to checkpoint WAL: {e}") + try: + self.close() + except Exception as e: + print(f"Failed to close database: {e}") + def close(self): - self.provider.close() + if hasattr(self, "provider"): + self.provider.close() + + def close_all(self): + if hasattr(self, "provider"): + self.provider.close_all() + + def _backup_to_zip(self, backup_path: str): + paths = self._database_paths() + os.makedirs(os.path.dirname(backup_path), exist_ok=True) + # ensure WAL is checkpointed to get a consistent snapshot + self._checkpoint_wal() + + main_filename = os.path.basename(paths["main"]) + with zipfile.ZipFile(backup_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: + zf.write(paths["main"], arcname=main_filename) + if os.path.exists(paths["wal"]): + zf.write(paths["wal"], arcname=f"{main_filename}-wal") + if os.path.exists(paths["shm"]): + zf.write(paths["shm"], arcname=f"{main_filename}-shm") + + return { + "path": backup_path, + "size": os.path.getsize(backup_path), + } + + def backup_database(self, storage_path, backup_path: str | None = None): + default_dir = os.path.join(storage_path, "database-backups") + os.makedirs(default_dir, exist_ok=True) + if backup_path is None: + timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") + backup_path = os.path.join(default_dir, f"backup-{timestamp}.zip") + + return self._backup_to_zip(backup_path) + + def create_snapshot(self, storage_path, name: str): + """Creates a named snapshot of the database.""" + snapshot_dir = os.path.join(storage_path, "snapshots") + os.makedirs(snapshot_dir, exist_ok=True) + # Ensure name is safe for filesystem + safe_name = "".join( + [c for c in name if c.isalnum() or c in (" ", ".", "-", "_")] + ).strip() + if not safe_name: + safe_name = "unnamed_snapshot" + + snapshot_path = os.path.join(snapshot_dir, f"{safe_name}.zip") + return self._backup_to_zip(snapshot_path) + + def list_snapshots(self, storage_path): + """Lists all available snapshots.""" + snapshot_dir = os.path.join(storage_path, "snapshots") + if not os.path.exists(snapshot_dir): + return [] + + snapshots = [] + for file in os.listdir(snapshot_dir): + if file.endswith(".zip"): + full_path = os.path.join(snapshot_dir, file) + stats = os.stat(full_path) + snapshots.append( + { + "name": file[:-4], + "path": full_path, + "size": stats.st_size, + "created_at": datetime.fromtimestamp( + stats.st_mtime, UTC + ).isoformat(), + } + ) + return sorted(snapshots, key=lambda x: x["created_at"], reverse=True) + + def restore_database(self, backup_path: str): + if not os.path.exists(backup_path): + msg = f"Backup not found at {backup_path}" + raise FileNotFoundError(msg) + + paths = self._database_paths() + self._checkpoint_and_close() + + # clean existing files + for p in paths.values(): + if os.path.exists(p): + os.remove(p) + + if zipfile.is_zipfile(backup_path): + with zipfile.ZipFile(backup_path, "r") as zf: + zf.extractall(os.path.dirname(paths["main"])) + else: + shutil.copy2(backup_path, paths["main"]) + + # reopen and retune + self.initialize() + self._tune_sqlite_pragmas() + integrity = self.provider.integrity_check() + + return { + "restored_from": backup_path, + "integrity_check": integrity, + } diff --git a/meshchatx/src/backend/database/announces.py b/meshchatx/src/backend/database/announces.py index 335d621..4a86001 100644 --- a/meshchatx/src/backend/database/announces.py +++ b/meshchatx/src/backend/database/announces.py @@ -30,7 +30,7 @@ class AnnounceDAO: ) query = ( - f"INSERT INTO announces ({columns}, created_at, updated_at) VALUES ({placeholders}, ?, ?) " + f"INSERT INTO announces ({columns}, created_at, updated_at) VALUES ({placeholders}, ?, ?) " # noqa: S608 f"ON CONFLICT(destination_hash) DO UPDATE SET {update_set}, updated_at = EXCLUDED.updated_at" ) diff --git a/meshchatx/src/backend/database/contacts.py b/meshchatx/src/backend/database/contacts.py index b46ae53..b3b81a2 100644 --- a/meshchatx/src/backend/database/contacts.py +++ b/meshchatx/src/backend/database/contacts.py @@ -5,16 +5,20 @@ class ContactsDAO: def __init__(self, provider: DatabaseProvider): self.provider = provider - def add_contact(self, name, remote_identity_hash): + def add_contact( + self, name, remote_identity_hash, preferred_ringtone_id=None, custom_image=None + ): self.provider.execute( """ - INSERT INTO contacts (name, remote_identity_hash) - VALUES (?, ?) + INSERT INTO contacts (name, remote_identity_hash, preferred_ringtone_id, custom_image) + VALUES (?, ?, ?, ?) ON CONFLICT(remote_identity_hash) DO UPDATE SET name = EXCLUDED.name, + preferred_ringtone_id = EXCLUDED.preferred_ringtone_id, + custom_image = EXCLUDED.custom_image, updated_at = CURRENT_TIMESTAMP """, - (name, remote_identity_hash), + (name, remote_identity_hash, preferred_ringtone_id, custom_image), ) def get_contacts(self, search=None, limit=100, offset=0): @@ -38,22 +42,40 @@ class ContactsDAO: (contact_id,), ) - def update_contact(self, contact_id, name=None, remote_identity_hash=None): - if name and remote_identity_hash: - self.provider.execute( - "UPDATE contacts SET name = ?, remote_identity_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", - (name, remote_identity_hash, contact_id), - ) - elif name: - self.provider.execute( - "UPDATE contacts SET name = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", - (name, contact_id), - ) - elif remote_identity_hash: - self.provider.execute( - "UPDATE contacts SET remote_identity_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", - (remote_identity_hash, contact_id), - ) + def update_contact( + self, + contact_id, + name=None, + remote_identity_hash=None, + preferred_ringtone_id=None, + custom_image=None, + clear_image=False, + ): + updates = [] + params = [] + + if name is not None: + updates.append("name = ?") + params.append(name) + if remote_identity_hash is not None: + updates.append("remote_identity_hash = ?") + params.append(remote_identity_hash) + if preferred_ringtone_id is not None: + updates.append("preferred_ringtone_id = ?") + params.append(preferred_ringtone_id) + if clear_image: + updates.append("custom_image = NULL") + elif custom_image is not None: + updates.append("custom_image = ?") + params.append(custom_image) + + if not updates: + return + + updates.append("updated_at = CURRENT_TIMESTAMP") + query = f"UPDATE contacts SET {', '.join(updates)} WHERE id = ?" + params.append(contact_id) + self.provider.execute(query, tuple(params)) def delete_contact(self, contact_id): self.provider.execute("DELETE FROM contacts WHERE id = ?", (contact_id,)) diff --git a/meshchatx/src/backend/database/map_drawings.py b/meshchatx/src/backend/database/map_drawings.py new file mode 100644 index 0000000..6bbbfd7 --- /dev/null +++ b/meshchatx/src/backend/database/map_drawings.py @@ -0,0 +1,48 @@ +from datetime import UTC, datetime +from .provider import DatabaseProvider + + +class MapDrawingsDAO: + def __init__(self, provider: DatabaseProvider): + self.provider = provider + + def upsert_drawing(self, identity_hash, name, data): + now = datetime.now(UTC) + # Check if drawing with same name exists for this user + existing = self.provider.fetchone( + "SELECT id FROM map_drawings WHERE identity_hash = ? AND name = ?", + (identity_hash, name), + ) + + if existing: + self.provider.execute( + "UPDATE map_drawings SET data = ?, updated_at = ? WHERE id = ?", + (data, now, existing["id"]), + ) + else: + self.provider.execute( + """ + INSERT INTO map_drawings (identity_hash, name, data, created_at, updated_at) + VALUES (?, ?, ?, ?, ?) + """, + (identity_hash, name, data, now, now), + ) + + def get_drawings(self, identity_hash): + return self.provider.fetchall( + "SELECT * FROM map_drawings WHERE identity_hash = ? ORDER BY updated_at DESC", + (identity_hash,), + ) + + def delete_drawing(self, drawing_id): + self.provider.execute( + "DELETE FROM map_drawings WHERE id = ?", + (drawing_id,), + ) + + def update_drawing(self, drawing_id, name, data): + now = datetime.now(UTC) + self.provider.execute( + "UPDATE map_drawings SET name = ?, data = ?, updated_at = ? WHERE id = ?", + (name, data, now, drawing_id), + ) diff --git a/meshchatx/src/backend/database/messages.py b/meshchatx/src/backend/database/messages.py index ccd4811..442be84 100644 --- a/meshchatx/src/backend/database/messages.py +++ b/meshchatx/src/backend/database/messages.py @@ -18,6 +18,7 @@ class MessageDAO: "hash", "source_hash", "destination_hash", + "peer_hash", "state", "progress", "is_incoming", @@ -39,7 +40,7 @@ class MessageDAO: update_set = ", ".join([f"{f} = EXCLUDED.{f}" for f in fields if f != "hash"]) query = ( - f"INSERT INTO lxmf_messages ({columns}, created_at, updated_at) VALUES ({placeholders}, ?, ?) " + f"INSERT INTO lxmf_messages ({columns}, created_at, updated_at) VALUES ({placeholders}, ?, ?) " # noqa: S608 f"ON CONFLICT(hash) DO UPDATE SET {update_set}, updated_at = EXCLUDED.updated_at" ) @@ -70,22 +71,21 @@ class MessageDAO: def get_conversation_messages(self, destination_hash, limit=100, offset=0): return self.provider.fetchall( - "SELECT * FROM lxmf_messages WHERE destination_hash = ? OR source_hash = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", - (destination_hash, destination_hash, limit, offset), + "SELECT * FROM lxmf_messages WHERE peer_hash = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", + (destination_hash, limit, offset), ) def get_conversations(self): - # This is a bit complex in raw SQL, we need the latest message for each destination + # Optimized using peer_hash column query = """ SELECT m1.* FROM lxmf_messages m1 - JOIN ( - SELECT - CASE WHEN is_incoming = 1 THEN source_hash ELSE destination_hash END as peer_hash, - MAX(timestamp) as max_ts + INNER JOIN ( + SELECT peer_hash, MAX(timestamp) as max_ts FROM lxmf_messages + WHERE peer_hash IS NOT NULL GROUP BY peer_hash - ) m2 ON (CASE WHEN m1.is_incoming = 1 THEN m1.source_hash ELSE m1.destination_hash END = m2.peer_hash - AND m1.timestamp = m2.max_ts) + ) m2 ON m1.peer_hash = m2.peer_hash AND m1.timestamp = m2.max_ts + GROUP BY m1.peer_hash ORDER BY m1.timestamp DESC """ return self.provider.fetchall(query) @@ -109,10 +109,10 @@ class MessageDAO: SELECT m.timestamp, r.last_read_at FROM lxmf_messages m LEFT JOIN lxmf_conversation_read_state r ON r.destination_hash = ? - WHERE (m.destination_hash = ? OR m.source_hash = ?) + WHERE m.peer_hash = ? ORDER BY m.timestamp DESC LIMIT 1 """, - (destination_hash, destination_hash, destination_hash), + (destination_hash, destination_hash), ) if not row: @@ -140,17 +140,75 @@ class MessageDAO: def get_failed_messages_for_destination(self, destination_hash): return self.provider.fetchall( - "SELECT * FROM lxmf_messages WHERE state = 'failed' AND destination_hash = ? ORDER BY id ASC", + "SELECT * FROM lxmf_messages WHERE state = 'failed' AND peer_hash = ? ORDER BY id ASC", (destination_hash,), ) def get_failed_messages_count(self, destination_hash): row = self.provider.fetchone( - "SELECT COUNT(*) as count FROM lxmf_messages WHERE state = 'failed' AND destination_hash = ?", + "SELECT COUNT(*) as count FROM lxmf_messages WHERE state = 'failed' AND peer_hash = ?", (destination_hash,), ) return row["count"] if row else 0 + def get_conversations_unread_states(self, destination_hashes): + if not destination_hashes: + return {} + + placeholders = ", ".join(["?"] * len(destination_hashes)) + query = f""" + SELECT peer_hash, MAX(timestamp) as latest_ts, last_read_at + FROM lxmf_messages m + LEFT JOIN lxmf_conversation_read_state r ON r.destination_hash = m.peer_hash + WHERE m.peer_hash IN ({placeholders}) + GROUP BY m.peer_hash + """ # noqa: S608 + rows = self.provider.fetchall(query, destination_hashes) + + unread_states = {} + for row in rows: + peer_hash = row["peer_hash"] + latest_ts = row["latest_ts"] + last_read_at_str = row["last_read_at"] + + if not last_read_at_str: + unread_states[peer_hash] = True + continue + + last_read_at = datetime.fromisoformat(last_read_at_str) + if last_read_at.tzinfo is None: + last_read_at = last_read_at.replace(tzinfo=UTC) + + unread_states[peer_hash] = latest_ts > last_read_at.timestamp() + + return unread_states + + def get_conversations_failed_counts(self, destination_hashes): + if not destination_hashes: + return {} + placeholders = ", ".join(["?"] * len(destination_hashes)) + rows = self.provider.fetchall( + f"SELECT peer_hash, COUNT(*) as count FROM lxmf_messages WHERE state = 'failed' AND peer_hash IN ({placeholders}) GROUP BY peer_hash", # noqa: S608 + tuple(destination_hashes), + ) + return {row["peer_hash"]: row["count"] for row in rows} + + def get_conversations_attachment_states(self, destination_hashes): + if not destination_hashes: + return {} + + placeholders = ", ".join(["?"] * len(destination_hashes)) + query = f""" + SELECT peer_hash, 1 as has_attachments + FROM lxmf_messages + WHERE peer_hash IN ({placeholders}) + AND fields IS NOT NULL AND fields != '{{}}' AND fields != '' + GROUP BY peer_hash + """ # noqa: S608 + rows = self.provider.fetchall(query, destination_hashes) + + return {row["peer_hash"]: True for row in rows} + # Forwarding Mappings def get_forwarding_mapping( self, diff --git a/meshchatx/src/backend/database/misc.py b/meshchatx/src/backend/database/misc.py index 51bc7fd..b7f7bfb 100644 --- a/meshchatx/src/backend/database/misc.py +++ b/meshchatx/src/backend/database/misc.py @@ -90,6 +90,15 @@ class MiscDAO: (destination_hash,), ) + def get_user_icons(self, destination_hashes): + if not destination_hashes: + return [] + placeholders = ", ".join(["?"] * len(destination_hashes)) + return self.provider.fetchall( + f"SELECT * FROM lxmf_user_icons WHERE destination_hash IN ({placeholders})", # noqa: S608 + tuple(destination_hashes), + ) + # Forwarding Rules def get_forwarding_rules(self, identity_hash=None, active_only=False): query = "SELECT * FROM lxmf_forwarding_rules WHERE 1=1" @@ -165,8 +174,14 @@ class MiscDAO: sql += " ORDER BY created_at DESC" return self.provider.fetchall(sql, params) - def delete_archived_pages(self, destination_hash=None, page_path=None): - if destination_hash and page_path: + def delete_archived_pages(self, destination_hash=None, page_path=None, ids=None): + if ids: + placeholders = ", ".join(["?"] * len(ids)) + self.provider.execute( + f"DELETE FROM archived_pages WHERE id IN ({placeholders})", # noqa: S608 + tuple(ids), + ) + elif destination_hash and page_path: self.provider.execute( "DELETE FROM archived_pages WHERE destination_hash = ? AND page_path = ?", (destination_hash, page_path), @@ -252,7 +267,7 @@ class MiscDAO: if notification_ids: placeholders = ", ".join(["?"] * len(notification_ids)) self.provider.execute( - f"UPDATE notifications SET is_viewed = 1 WHERE id IN ({placeholders})", + f"UPDATE notifications SET is_viewed = 1 WHERE id IN ({placeholders})", # noqa: S608 notification_ids, ) else: @@ -289,3 +304,22 @@ class MiscDAO: "DELETE FROM keyboard_shortcuts WHERE identity_hash = ? AND action = ?", (identity_hash, action), ) + + # Last Sent Icon Hashes + def get_last_sent_icon_hash(self, destination_hash): + row = self.provider.fetchone( + "SELECT icon_hash FROM lxmf_last_sent_icon_hashes WHERE destination_hash = ?", + (destination_hash,), + ) + return row["icon_hash"] if row else None + + def update_last_sent_icon_hash(self, destination_hash, icon_hash): + now = datetime.now(UTC) + self.provider.execute( + """ + INSERT INTO lxmf_last_sent_icon_hashes (destination_hash, icon_hash, created_at, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(destination_hash) DO UPDATE SET icon_hash = EXCLUDED.icon_hash, updated_at = EXCLUDED.updated_at + """, + (destination_hash, icon_hash, now, now), + ) diff --git a/meshchatx/src/backend/database/provider.py b/meshchatx/src/backend/database/provider.py index 597eb1c..11eb83f 100644 --- a/meshchatx/src/backend/database/provider.py +++ b/meshchatx/src/backend/database/provider.py @@ -1,14 +1,17 @@ import sqlite3 import threading +import weakref class DatabaseProvider: _instance = None _lock = threading.Lock() + _all_locals = weakref.WeakSet() def __init__(self, db_path=None): self.db_path = db_path self._local = threading.local() + self._all_locals.add(self._local) @classmethod def get_instance(cls, db_path=None): @@ -27,24 +30,81 @@ class DatabaseProvider: @property def connection(self): if not hasattr(self._local, "connection"): + # isolation_level=None enables autocommit mode, letting us manage transactions manually self._local.connection = sqlite3.connect( self.db_path, check_same_thread=False, + isolation_level=None, ) self._local.connection.row_factory = sqlite3.Row # Enable WAL mode for better concurrency self._local.connection.execute("PRAGMA journal_mode=WAL") return self._local.connection - def execute(self, query, params=None): + def execute(self, query, params=None, commit=None): cursor = self.connection.cursor() + + # Convert any datetime objects in params to ISO strings to avoid DeprecationWarning in Python 3.12+ + if params: + from datetime import datetime + + if isinstance(params, dict): + params = { + k: (v.isoformat() if isinstance(v, datetime) else v) + for k, v in params.items() + } + else: + params = tuple( + (p.isoformat() if isinstance(p, datetime) else p) for p in params + ) + if params: cursor.execute(query, params) else: cursor.execute(query) - self.connection.commit() + + # In autocommit mode (isolation_level=None), in_transaction is True + # only if we explicitly started one with BEGIN and haven't committed/rolled back. + if commit is True: + self.connection.commit() + elif commit is False: + pass + else: + # Default behavior: if we're in a manual transaction, don't commit automatically + if not self.connection.in_transaction: + # In autocommit mode, non-DML statements don't start transactions. + # DML statements might if they are part of a BEGIN block. + # Actually, in isolation_level=None, NOTHING starts a transaction unless we say BEGIN. + pass return cursor + def begin(self): + try: + self.connection.execute("BEGIN") + except sqlite3.OperationalError as e: + if "within a transaction" in str(e): + pass + else: + raise + + def commit(self): + if self.connection.in_transaction: + self.connection.commit() + + def rollback(self): + if self.connection.in_transaction: + self.connection.rollback() + + def __enter__(self): + self.begin() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type: + self.rollback() + else: + self.commit() + def fetchone(self, query, params=None): cursor = self.execute(query, params) return cursor.fetchone() @@ -55,11 +115,28 @@ class DatabaseProvider: def close(self): if hasattr(self._local, "connection"): - self._local.connection.close() + try: + self.commit() # Ensure everything is saved + self._local.connection.close() + except Exception: # noqa: S110 + pass del self._local.connection + def close_all(self): + with self._lock: + for loc in self._all_locals: + if hasattr(loc, "connection"): + try: + loc.connection.commit() + loc.connection.close() + except Exception: # noqa: S110 + pass + del loc.connection + def vacuum(self): - self.execute("VACUUM") + # VACUUM cannot run inside a transaction + self.commit() + self.connection.execute("VACUUM") def integrity_check(self): return self.fetchall("PRAGMA integrity_check") diff --git a/meshchatx/src/backend/database/schema.py b/meshchatx/src/backend/database/schema.py index 0e9f9ba..eec2e9d 100644 --- a/meshchatx/src/backend/database/schema.py +++ b/meshchatx/src/backend/database/schema.py @@ -2,7 +2,7 @@ from .provider import DatabaseProvider class DatabaseSchema: - LATEST_VERSION = 23 + LATEST_VERSION = 32 def __init__(self, provider: DatabaseProvider): self.provider = provider @@ -15,6 +15,101 @@ class DatabaseSchema: current_version = self._get_current_version() self.migrate(current_version) + def _ensure_column(self, table_name, column_name, column_type): + """Add a column to a table if it doesn't exist.""" + # First check if it exists using PRAGMA + cursor = self.provider.connection.cursor() + cursor.execute(f"PRAGMA table_info({table_name})") + columns = [row[1] for row in cursor.fetchall()] + + if column_name not in columns: + try: + # SQLite has limitations on ALTER TABLE ADD COLUMN: + # 1. Cannot add UNIQUE or PRIMARY KEY columns + # 2. Cannot add columns with non-constant defaults (like CURRENT_TIMESTAMP) + + # Strip non-constant defaults if present for the ALTER TABLE statement + stmt_type = column_type + forbidden_defaults = [ + "CURRENT_TIMESTAMP", + "CURRENT_TIME", + "CURRENT_DATE", + ] + for forbidden in forbidden_defaults: + if f"DEFAULT {forbidden}" in stmt_type.upper(): + # Remove the DEFAULT part for the ALTER statement + import re + + stmt_type = re.sub( + f"DEFAULT\\s+{forbidden}", + "", + stmt_type, + flags=re.IGNORECASE, + ).strip() + + # Use the connection directly to avoid any middle-ware issues + self.provider.connection.execute( + f"ALTER TABLE {table_name} ADD COLUMN {column_name} {stmt_type}" + ) + except Exception as e: + # Log but don't crash, we might be able to continue + print(f"Failed to add column {column_name} to {table_name}: {e}") + + def _sync_table_columns(self, table_name, create_sql): + """ + Parses a CREATE TABLE statement and ensures all columns exist in the actual table. + This is a robust way to handle legacy tables that are missing columns. + """ + # Find the first '(' and the last ')' + start_idx = create_sql.find("(") + end_idx = create_sql.rfind(")") + + if start_idx == -1 or end_idx == -1: + return + + inner_content = create_sql[start_idx + 1 : end_idx] + + # Split by comma but ignore commas inside parentheses (e.g. DECIMAL(10,2)) + definitions = [] + depth = 0 + current = "" + for char in inner_content: + if char == "(": + depth += 1 + elif char == ")": + depth -= 1 + + if char == "," and depth == 0: + definitions.append(current.strip()) + current = "" + else: + current += char + if current.strip(): + definitions.append(current.strip()) + + for definition in definitions: + definition = definition.strip() + # Skip table-level constraints + if not definition or definition.upper().startswith( + ("PRIMARY KEY", "FOREIGN KEY", "UNIQUE", "CHECK") + ): + continue + + parts = definition.split(None, 1) + if not parts: + continue + + column_name = parts[0].strip('"').strip("`").strip("[").strip("]") + column_type = parts[1] if len(parts) > 1 else "TEXT" + + # Special case for column types that are already PRIMARY KEY + if "PRIMARY KEY" in column_type.upper() and column_name.upper() != "ID": + # We usually don't want to ALTER TABLE ADD COLUMN with PRIMARY KEY + # unless it's the main ID which should already exist + continue + + self._ensure_column(table_name, column_name, column_type) + def _get_current_version(self): row = self.provider.fetchone( "SELECT value FROM config WHERE key = ?", @@ -26,7 +121,7 @@ class DatabaseSchema: def _create_initial_tables(self): # We create the config table first so we can track version - self.provider.execute(""" + config_sql = """ CREATE TABLE IF NOT EXISTS config ( id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT UNIQUE, @@ -34,7 +129,9 @@ class DatabaseSchema: created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) - """) + """ + self.provider.execute(config_sql) + self._sync_table_columns("config", config_sql) # Other essential tables that were present from version 1 # Peewee automatically creates tables if they don't exist. @@ -81,6 +178,7 @@ class DatabaseSchema: hash TEXT UNIQUE, source_hash TEXT, destination_hash TEXT, + peer_hash TEXT, state TEXT, progress REAL, is_incoming INTEGER, @@ -270,10 +368,32 @@ class DatabaseSchema: UNIQUE(identity_hash, action) ) """, + "map_drawings": """ + CREATE TABLE IF NOT EXISTS map_drawings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + identity_hash TEXT, + name TEXT, + data TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """, + "lxmf_last_sent_icon_hashes": """ + CREATE TABLE IF NOT EXISTS lxmf_last_sent_icon_hashes ( + destination_hash TEXT PRIMARY KEY, + icon_hash TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """, } for table_name, create_sql in tables.items(): self.provider.execute(create_sql) + + # Robust self-healing: Ensure existing tables have all modern columns + self._sync_table_columns(table_name, create_sql) + # Create indexes that were present if table_name == "announces": self.provider.execute( @@ -282,6 +402,9 @@ class DatabaseSchema: self.provider.execute( "CREATE INDEX IF NOT EXISTS idx_announces_identity_hash ON announces(identity_hash)", ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_announces_updated_at ON announces(updated_at)", + ) elif table_name == "lxmf_messages": self.provider.execute( "CREATE INDEX IF NOT EXISTS idx_lxmf_messages_source_hash ON lxmf_messages(source_hash)", @@ -289,6 +412,15 @@ class DatabaseSchema: self.provider.execute( "CREATE INDEX IF NOT EXISTS idx_lxmf_messages_destination_hash ON lxmf_messages(destination_hash)", ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_lxmf_messages_peer_hash ON lxmf_messages(peer_hash)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_lxmf_messages_timestamp ON lxmf_messages(timestamp)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_lxmf_messages_peer_ts ON lxmf_messages(peer_hash, timestamp)", + ) elif table_name == "blocked_destinations": self.provider.execute( "CREATE INDEX IF NOT EXISTS idx_blocked_destinations_hash ON blocked_destinations(destination_hash)", @@ -451,7 +583,7 @@ class DatabaseSchema: self.provider.execute( "ALTER TABLE lxmf_messages ADD COLUMN is_spam INTEGER DEFAULT 0", ) - except Exception: + except Exception: # noqa: S110 # Column might already exist if table was created with newest schema pass @@ -541,7 +673,7 @@ class DatabaseSchema: self.provider.execute( "ALTER TABLE lxmf_forwarding_rules ADD COLUMN name TEXT", ) - except Exception: + except Exception: # noqa: S110 pass if current_version < 17: @@ -641,6 +773,94 @@ class DatabaseSchema: "CREATE INDEX IF NOT EXISTS idx_announces_aspect ON announces(aspect)", ) + if current_version < 24: + self.provider.execute(""" + CREATE TABLE IF NOT EXISTS call_recordings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + remote_identity_hash TEXT, + remote_identity_name TEXT, + filename_rx TEXT, + filename_tx TEXT, + duration_seconds INTEGER, + timestamp REAL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_call_recordings_remote_hash ON call_recordings(remote_identity_hash)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_call_recordings_timestamp ON call_recordings(timestamp)", + ) + + if current_version < 25: + # Add docs_downloaded to config if not exists + self.provider.execute( + "INSERT OR IGNORE INTO config (key, value) VALUES (?, ?)", + ("docs_downloaded", "0"), + ) + + if current_version < 26: + # Add initial_docs_download_attempted to config if not exists + self.provider.execute( + "INSERT OR IGNORE INTO config (key, value) VALUES (?, ?)", + ("initial_docs_download_attempted", "0"), + ) + + if current_version < 28: + # Add preferred_ringtone_id to contacts + try: + self.provider.execute( + "ALTER TABLE contacts ADD COLUMN preferred_ringtone_id INTEGER DEFAULT NULL", + ) + except Exception: # noqa: S110 + pass + + if current_version < 29: + # Performance optimization indexes + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_lxmf_messages_peer_hash ON lxmf_messages(peer_hash)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_lxmf_messages_timestamp ON lxmf_messages(timestamp)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_lxmf_messages_peer_ts ON lxmf_messages(peer_hash, timestamp)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_announces_updated_at ON announces(updated_at)", + ) + + if current_version < 30: + # Add custom_image to contacts + try: + self.provider.execute( + "ALTER TABLE contacts ADD COLUMN custom_image TEXT DEFAULT NULL", + ) + except Exception: # noqa: S110 + pass + + if current_version < 31: + self.provider.execute(""" + CREATE TABLE IF NOT EXISTS lxmf_last_sent_icon_hashes ( + destination_hash TEXT PRIMARY KEY, + icon_hash TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """) + + if current_version < 32: + # Add tutorial_seen and changelog_seen_version to config + self.provider.execute( + "INSERT OR IGNORE INTO config (key, value) VALUES (?, ?)", + ("tutorial_seen", "false"), + ) + self.provider.execute( + "INSERT OR IGNORE INTO config (key, value) VALUES (?, ?)", + ("changelog_seen_version", "0.0.0"), + ) + # Update version in config self.provider.execute( """ diff --git a/meshchatx/src/backend/database/telephone.py b/meshchatx/src/backend/database/telephone.py index 2fe2f14..b28f2cc 100644 --- a/meshchatx/src/backend/database/telephone.py +++ b/meshchatx/src/backend/database/telephone.py @@ -57,3 +57,65 @@ class TelephoneDAO: def clear_call_history(self): self.provider.execute("DELETE FROM call_history") + + def add_call_recording( + self, + remote_identity_hash, + remote_identity_name, + filename_rx, + filename_tx, + duration_seconds, + timestamp, + ): + from datetime import UTC, datetime + + now = datetime.now(UTC) + self.provider.execute( + """ + INSERT INTO call_recordings ( + remote_identity_hash, + remote_identity_name, + filename_rx, + filename_tx, + duration_seconds, + timestamp, + created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + remote_identity_hash, + remote_identity_name, + filename_rx, + filename_tx, + duration_seconds, + timestamp, + now, + ), + ) + + def get_call_recordings(self, search=None, limit=10, offset=0): + if search: + return self.provider.fetchall( + """ + SELECT * FROM call_recordings + WHERE remote_identity_name LIKE ? OR remote_identity_hash LIKE ? + ORDER BY timestamp DESC LIMIT ? OFFSET ? + """, + (f"%{search}%", f"%{search}%", limit, offset), + ) + return self.provider.fetchall( + "SELECT * FROM call_recordings ORDER BY timestamp DESC LIMIT ? OFFSET ?", + (limit, offset), + ) + + def get_call_recording(self, recording_id): + return self.provider.fetchone( + "SELECT * FROM call_recordings WHERE id = ?", + (recording_id,), + ) + + def delete_call_recording(self, recording_id): + self.provider.execute( + "DELETE FROM call_recordings WHERE id = ?", + (recording_id,), + ) diff --git a/meshchatx/src/backend/database/voicemails.py b/meshchatx/src/backend/database/voicemails.py index 0b8cdb8..3b39c5c 100644 --- a/meshchatx/src/backend/database/voicemails.py +++ b/meshchatx/src/backend/database/voicemails.py @@ -75,3 +75,9 @@ class VoicemailDAO: "SELECT COUNT(*) as count FROM voicemails WHERE is_read = 0", ) return row["count"] if row else 0 + + def get_latest_voicemail_id(self): + row = self.provider.fetchone( + "SELECT id FROM voicemails ORDER BY timestamp DESC LIMIT 1", + ) + return row["id"] if row else None diff --git a/meshchatx/src/backend/docs_manager.py b/meshchatx/src/backend/docs_manager.py new file mode 100644 index 0000000..c604bd3 --- /dev/null +++ b/meshchatx/src/backend/docs_manager.py @@ -0,0 +1,436 @@ +import logging +import os +import re +import shutil +import threading +import zipfile +import io +import html + +import requests +from meshchatx.src.backend.markdown_renderer import MarkdownRenderer + + +class DocsManager: + def __init__(self, config, public_dir, project_root=None, storage_dir=None): + self.config = config + self.public_dir = public_dir + self.project_root = project_root + self.storage_dir = storage_dir + + # Determine docs directories + # If storage_dir is provided, we prefer using it for documentation storage + # to avoid Read-only file system errors in environments like AppImages. + if self.storage_dir: + self.docs_dir = os.path.join(self.storage_dir, "reticulum-docs") + self.meshchatx_docs_dir = os.path.join(self.storage_dir, "meshchatx-docs") + else: + self.docs_dir = os.path.join(self.public_dir, "reticulum-docs") + self.meshchatx_docs_dir = os.path.join(self.public_dir, "meshchatx-docs") + + self.download_status = "idle" + self.download_progress = 0 + self.last_error = None + + # Ensure docs directories exist + try: + if not os.path.exists(self.docs_dir): + os.makedirs(self.docs_dir) + + if not os.path.exists(self.meshchatx_docs_dir): + os.makedirs(self.meshchatx_docs_dir) + except OSError as e: + # If we still fail (e.g. storage_dir was not provided and public_dir is read-only) + # we log it but don't crash the whole app. Emergency mode can still run. + logging.error(f"Failed to create documentation directories: {e}") + self.last_error = str(e) + + # Initial population of MeshChatX docs + if os.path.exists(self.meshchatx_docs_dir) and os.access( + self.meshchatx_docs_dir, os.W_OK + ): + self.populate_meshchatx_docs() + + def populate_meshchatx_docs(self): + """Populates meshchatx-docs from the project's docs folder.""" + # Try to find docs folder in several places + search_paths = [] + if self.project_root: + search_paths.append(os.path.join(self.project_root, "docs")) + + # Also try relative to this file + # This file is in meshchatx/src/backend/docs_manager.py + # Project root is 3 levels up + this_dir = os.path.dirname(os.path.abspath(__file__)) + search_paths.append( + os.path.abspath(os.path.join(this_dir, "..", "..", "..", "docs")) + ) + + src_docs = None + for path in search_paths: + if os.path.exists(path) and os.path.isdir(path): + src_docs = path + break + + if not src_docs: + logging.warning("MeshChatX docs source directory not found.") + return + + try: + for file in os.listdir(src_docs): + if file.endswith(".md") or file.endswith(".txt"): + src_path = os.path.join(src_docs, file) + dest_path = os.path.join(self.meshchatx_docs_dir, file) + shutil.copy2(src_path, dest_path) + + # Also pre-render to HTML for easy sharing/viewing + try: + with open(src_path, "r", encoding="utf-8") as f: + content = f.read() + + html_content = MarkdownRenderer.render(content) + # Basic HTML wrapper for standalone viewing + full_html = f""" + +
+ + +{html.escape(content)}",
+ "type": "text",
+ }
+
+ def export_docs(self):
+ """Creates a zip of all docs and returns the bytes."""
+ buffer = io.BytesIO()
+ with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
+ # Add reticulum docs
+ for root, _, files in os.walk(self.docs_dir):
+ for file in files:
+ file_path = os.path.join(root, file)
+ rel_path = os.path.join(
+ "reticulum-docs", os.path.relpath(file_path, self.docs_dir)
+ )
+ zip_file.write(file_path, rel_path)
+
+ # Add meshchatx docs
+ for root, _, files in os.walk(self.meshchatx_docs_dir):
+ for file in files:
+ file_path = os.path.join(root, file)
+ rel_path = os.path.join(
+ "meshchatx-docs",
+ os.path.relpath(file_path, self.meshchatx_docs_dir),
+ )
+ zip_file.write(file_path, rel_path)
+
+ buffer.seek(0)
+ return buffer.getvalue()
+
+ def search(self, query, lang="en"):
+ if not query:
+ return []
+
+ results = []
+ query = query.lower()
+
+ # 1. Search MeshChatX Docs first
+ if os.path.exists(self.meshchatx_docs_dir):
+ for file in os.listdir(self.meshchatx_docs_dir):
+ if file.endswith((".md", ".txt")):
+ file_path = os.path.join(self.meshchatx_docs_dir, file)
+ try:
+ with open(
+ file_path, "r", encoding="utf-8", errors="ignore"
+ ) as f:
+ content = f.read()
+ if query in content.lower():
+ # Simple snippet
+ idx = content.lower().find(query)
+ start = max(0, idx - 80)
+ end = min(len(content), idx + len(query) + 120)
+ snippet = content[start:end]
+ if start > 0:
+ snippet = "..." + snippet
+ if end < len(content):
+ snippet = snippet + "..."
+
+ results.append(
+ {
+ "title": file,
+ "path": f"/meshchatx-docs/{file}",
+ "snippet": snippet,
+ "source": "MeshChatX",
+ }
+ )
+ except Exception as e:
+ logging.error(f"Error searching MeshChatX doc {file}: {e}")
+
+ # 2. Search Reticulum Docs
+ if self.has_docs():
+ # Known language suffixes in Reticulum docs
+ known_langs = ["de", "es", "jp", "nl", "pl", "pt-br", "tr", "uk", "zh-cn"]
+
+ # Determine files to search
+ target_files = []
+ try:
+ for root, _, files in os.walk(self.docs_dir):
+ for file in files:
+ if file.endswith(".html"):
+ # Basic filtering for language if possible
+ if lang != "en":
+ if f"_{lang}.html" in file:
+ target_files.append(os.path.join(root, file))
+ else:
+ # For English, we want files that DON'T have a language suffix
+ # This is a bit heuristic
+ has_lang_suffix = False
+ for lang_code in known_langs:
+ if f"_{lang_code}.html" in file:
+ has_lang_suffix = True
+ break
+ if not has_lang_suffix:
+ target_files.append(os.path.join(root, file))
+
+ # If we found nothing for a specific language, fall back to English ONLY
+ if not target_files and lang != "en":
+ for root, _, files in os.walk(self.docs_dir):
+ for file in files:
+ if file.endswith(".html"):
+ has_lang_suffix = False
+ for lang_code in known_langs:
+ if f"_{lang_code}.html" in file:
+ has_lang_suffix = True
+ break
+ if not has_lang_suffix:
+ target_files.append(os.path.join(root, file))
+
+ for file_path in target_files:
+ try:
+ with open(file_path, encoding="utf-8", errors="ignore") as f:
+ content = f.read()
+
+ # Very basic HTML tag removal for searching
+ text_content = re.sub(r"<[^>]+>", " ", content)
+ text_content = " ".join(text_content.split())
+
+ if query in text_content.lower():
+ # Find title
+ title_match = re.search(
+ r"{code}'
+ )
+ return placeholder
+
+ text = re.sub(
+ r"```(\w+)?\n(.*?)\n```", code_block_placeholder, text, flags=re.DOTALL
+ )
+
+ # Horizontal Rules
+ text = re.sub(
+ r"^---+$",
+ r'\1',
+ text,
+ )
+
+ # Task lists
+ text = re.sub(
+ r"^[-*] \[ \] (.*)$",
+ r'\1', + text, + flags=re.MULTILINE, + ) + + # Lists - Simple single level for now to keep it predictable + def unordered_list_repl(match): + items = match.group(0).strip().split("\n") + html_items = "" + for i in items: + # Check if it's already a task list item + if 'type="checkbox"' in i: + html_items += i + else: + content = i[2:].strip() + html_items += f'
+ if part.startswith("[[CB") and part.endswith("]]"): + processed_parts.append(part) + continue + + # If it already starts with a block tag, don't wrap in
+ if re.match(r"^<(h\d|ul|ol|li|blockquote|hr|div)", part):
+ processed_parts.append(part)
+ else:
+ # Replace single newlines with
for line breaks within paragraphs
+ part = part.replace("\n", "
")
+ processed_parts.append(
+ f'
{part}
' + ) + + text = "\n".join(processed_parts) + + # Restore code blocks + for i, code_html in enumerate(code_blocks): + text = text.replace(f"[[CB{i}]]", code_html) + + return text diff --git a/meshchatx/src/backend/meshchat_utils.py b/meshchatx/src/backend/meshchat_utils.py new file mode 100644 index 0000000..26159da --- /dev/null +++ b/meshchatx/src/backend/meshchat_utils.py @@ -0,0 +1,151 @@ +import base64 +import json + +import LXMF +import RNS.vendor.umsgpack as msgpack +from LXMF import LXMRouter + + +def parse_bool_query_param(value: str | None) -> bool: + if value is None: + return False + value = value.lower() + return value in {"1", "true", "yes", "on"} + + +def message_fields_have_attachments(fields_json: str | None): + if not fields_json: + return False + try: + fields = json.loads(fields_json) + except Exception: + return False + if "image" in fields or "audio" in fields: + return True + if "file_attachments" in fields and isinstance( + fields["file_attachments"], + list, + ): + return len(fields["file_attachments"]) > 0 + return False + + +def has_attachments(lxmf_fields: dict) -> bool: + try: + if LXMF.FIELD_FILE_ATTACHMENTS in lxmf_fields: + return len(lxmf_fields[LXMF.FIELD_FILE_ATTACHMENTS]) > 0 + if LXMF.FIELD_IMAGE in lxmf_fields: + return True + if LXMF.FIELD_AUDIO in lxmf_fields: + return True + return False + except Exception: + return False + + +def convert_propagation_node_state_to_string(state): + state_map = { + LXMRouter.PR_IDLE: "idle", + LXMRouter.PR_PATH_REQUESTED: "path_requested", + LXMRouter.PR_LINK_ESTABLISHING: "link_establishing", + LXMRouter.PR_LINK_ESTABLISHED: "link_established", + LXMRouter.PR_REQUEST_SENT: "request_sent", + LXMRouter.PR_RECEIVING: "receiving", + LXMRouter.PR_RESPONSE_RECEIVED: "response_received", + LXMRouter.PR_COMPLETE: "complete", + LXMRouter.PR_NO_PATH: "no_path", + LXMRouter.PR_LINK_FAILED: "link_failed", + LXMRouter.PR_TRANSFER_FAILED: "transfer_failed", + LXMRouter.PR_NO_IDENTITY_RCVD: "no_identity_received", + LXMRouter.PR_NO_ACCESS: "no_access", + LXMRouter.PR_FAILED: "failed", + } + + if state in state_map: + return state_map[state] + return "unknown" + + +def convert_db_favourite_to_dict(favourite): + created_at = str(favourite["created_at"]) + if created_at and "+" not in created_at and "Z" not in created_at: + created_at += "Z" + + updated_at = str(favourite["updated_at"]) + if updated_at and "+" not in updated_at and "Z" not in updated_at: + updated_at += "Z" + + return { + "id": favourite["id"], + "destination_hash": favourite["destination_hash"], + "display_name": favourite["display_name"], + "aspect": favourite["aspect"], + "created_at": created_at, + "updated_at": updated_at, + } + + +def parse_lxmf_display_name( + app_data_base64: str | None, + default_value: str | None = "Anonymous Peer", +): + if app_data_base64 is None: + return default_value + + try: + app_data_bytes = base64.b64decode(app_data_base64) + display_name = LXMF.display_name_from_app_data(app_data_bytes) + if display_name is not None: + return display_name + except Exception as e: + print(f"Failed to parse LXMF display name: {e}") + + return default_value + + +def parse_lxmf_stamp_cost(app_data_base64: str | None): + if app_data_base64 is None: + return None + + try: + app_data_bytes = base64.b64decode(app_data_base64) + return LXMF.stamp_cost_from_app_data(app_data_bytes) + except Exception as e: + print(f"Failed to parse LXMF stamp cost: {e}") + return None + + +def parse_nomadnetwork_node_display_name( + app_data_base64: str | None, + default_value: str | None = "Anonymous Node", +): + if app_data_base64 is None: + return default_value + + try: + app_data_bytes = base64.b64decode(app_data_base64) + return app_data_bytes.decode("utf-8") + except Exception as e: + print(f"Failed to parse NomadNetwork display name: {e}") + return default_value + + +def parse_lxmf_propagation_node_app_data(app_data_base64: str | None): + if app_data_base64 is None: + return None + + try: + app_data_bytes = base64.b64decode(app_data_base64) + data = msgpack.unpackb(app_data_bytes) + + if not isinstance(data, list) or len(data) < 4: + return None + + return { + "enabled": bool(data[2]) if data[2] is not None else False, + "timebase": int(data[1]) if data[1] is not None else 0, + "per_transfer_limit": int(data[3]) if data[3] is not None else 0, + } + except Exception as e: + print(f"Failed to parse LXMF propagation node app data: {e}") + return None diff --git a/meshchatx/src/backend/message_handler.py b/meshchatx/src/backend/message_handler.py index 2bdd495..2bdd299 100644 --- a/meshchatx/src/backend/message_handler.py +++ b/meshchatx/src/backend/message_handler.py @@ -16,10 +16,9 @@ class MessageHandler: ): query = """ SELECT * FROM lxmf_messages - WHERE ((source_hash = ? AND destination_hash = ?) - OR (destination_hash = ? AND source_hash = ?)) + WHERE peer_hash = ? """ - params = [local_hash, destination_hash, local_hash, destination_hash] + params = [destination_hash] if after_id: query += " AND id > ?" @@ -34,75 +33,88 @@ class MessageHandler: return self.db.provider.fetchall(query, params) def delete_conversation(self, local_hash, destination_hash): - query = """ - DELETE FROM lxmf_messages - WHERE ((source_hash = ? AND destination_hash = ?) - OR (destination_hash = ? AND source_hash = ?)) - """ - self.db.provider.execute( - query, - [local_hash, destination_hash, local_hash, destination_hash], - ) + query = "DELETE FROM lxmf_messages WHERE peer_hash = ?" + self.db.provider.execute(query, [destination_hash]) def search_messages(self, local_hash, search_term): like_term = f"%{search_term}%" query = """ - SELECT source_hash, destination_hash, MAX(timestamp) as max_ts + SELECT peer_hash, MAX(timestamp) as max_ts FROM lxmf_messages - WHERE (source_hash = ? OR destination_hash = ?) - AND (title LIKE ? OR content LIKE ? OR source_hash LIKE ? OR destination_hash LIKE ?) - GROUP BY source_hash, destination_hash + WHERE title LIKE ? OR content LIKE ? OR peer_hash LIKE ? + GROUP BY peer_hash """ - params = [local_hash, local_hash, like_term, like_term, like_term, like_term] + params = [like_term, like_term, like_term] return self.db.provider.fetchall(query, params) - def get_conversations(self, local_hash, filter_unread=False): - # Implementation using window functions for better performance - # This requires SQLite 3.25+ + def get_conversations( + self, + local_hash, + search=None, + filter_unread=False, + filter_failed=False, + filter_has_attachments=False, + limit=None, + offset=0, + ): + # Optimized using peer_hash column and JOINs to avoid N+1 queries query = """ - WITH RankedMessages AS ( - SELECT *, - CASE WHEN source_hash = ? THEN destination_hash ELSE source_hash END as peer_hash, - ROW_NUMBER() OVER ( - PARTITION BY CASE WHEN source_hash = ? THEN destination_hash ELSE source_hash END - ORDER BY timestamp DESC - ) as rn + SELECT + m1.*, + a.app_data as peer_app_data, + c.display_name as custom_display_name, + con.custom_image as contact_image, + i.icon_name, i.foreground_colour, i.background_colour, + r.last_read_at, + (SELECT COUNT(*) FROM lxmf_messages m_failed + WHERE m_failed.peer_hash = m1.peer_hash AND m_failed.state = 'failed') as failed_count + FROM lxmf_messages m1 + INNER JOIN ( + SELECT peer_hash, MAX(timestamp) as max_ts FROM lxmf_messages - WHERE source_hash = ? OR destination_hash = ? - ) - SELECT * FROM RankedMessages WHERE rn = 1 + WHERE peer_hash IS NOT NULL + GROUP BY peer_hash + ) m2 ON m1.peer_hash = m2.peer_hash AND m1.timestamp = m2.max_ts + LEFT JOIN announces a ON a.destination_hash = m1.peer_hash + LEFT JOIN custom_destination_display_names c ON c.destination_hash = m1.peer_hash + LEFT JOIN contacts con ON con.remote_identity_hash = m1.peer_hash + LEFT JOIN lxmf_user_icons i ON i.destination_hash = m1.peer_hash + LEFT JOIN lxmf_conversation_read_state r ON r.destination_hash = m1.peer_hash """ - params = [ - local_hash, - local_hash, - local_hash, - local_hash, - ] + params = [] + where_clauses = [] if filter_unread: - # For filtering unread, we need to check if there are any received messages from that peer - query = """ - WITH RankedMessages AS ( - SELECT *, - CASE WHEN source_hash = ? THEN destination_hash ELSE source_hash END as peer_hash, - ROW_NUMBER() OVER ( - PARTITION BY CASE WHEN source_hash = ? THEN destination_hash ELSE source_hash END - ORDER BY timestamp DESC - ) as rn - FROM lxmf_messages - WHERE source_hash = ? OR destination_hash = ? - ) - SELECT * FROM RankedMessages WHERE rn = 1 - AND EXISTS ( - SELECT 1 FROM lxmf_messages m3 - WHERE m3.source_hash = peer_hash - AND m3.destination_hash = ? - AND m3.state = 'received' - AND m3.is_incoming = 1 - ) - """ - params.append(local_hash) + where_clauses.append( + "(r.last_read_at IS NULL OR m1.timestamp > strftime('%s', r.last_read_at))" + ) - query += " ORDER BY timestamp DESC" + if filter_failed: + where_clauses.append("m1.state = 'failed'") + + if filter_has_attachments: + where_clauses.append( + "(m1.fields IS NOT NULL AND m1.fields != '{}' AND m1.fields != '')" + ) + + if search: + like_term = f"%{search}%" + # Search in latest message info OR search across ALL messages for this peer + where_clauses.append(""" + (m1.title LIKE ? OR m1.content LIKE ? OR m1.peer_hash LIKE ? OR c.display_name LIKE ? + OR m1.peer_hash IN (SELECT peer_hash FROM lxmf_messages WHERE title LIKE ? OR content LIKE ?)) + """) + params.extend( + [like_term, like_term, like_term, like_term, like_term, like_term] + ) + + if where_clauses: + query += " WHERE " + " AND ".join(where_clauses) + + query += " GROUP BY m1.peer_hash ORDER BY m1.timestamp DESC" + + if limit is not None: + query += " LIMIT ? OFFSET ?" + params.extend([limit, offset]) return self.db.provider.fetchall(query, params) diff --git a/meshchatx/src/backend/nomadnet_downloader.py b/meshchatx/src/backend/nomadnet_downloader.py new file mode 100644 index 0000000..1c2901a --- /dev/null +++ b/meshchatx/src/backend/nomadnet_downloader.py @@ -0,0 +1,268 @@ +import asyncio +import io +import os +import time +from collections.abc import Callable + +import RNS + +# global cache for nomadnet links to avoid re-establishing them for every request +nomadnet_cached_links = {} + + +class NomadnetDownloader: + def __init__( + self, + destination_hash: bytes, + path: str, + data: str | None, + on_download_success: Callable[[RNS.RequestReceipt], None], + on_download_failure: Callable[[str], None], + on_progress_update: Callable[[float], None], + timeout: int | None = None, + ): + self.app_name = "nomadnetwork" + self.aspects = "node" + self.destination_hash = destination_hash + self.path = path + self.data = data + self.timeout = timeout + self._download_success_callback = on_download_success + self._download_failure_callback = on_download_failure + self.on_progress_update = on_progress_update + self.request_receipt = None + self.is_cancelled = False + self.link = None + + # cancel the download + def cancel(self): + self.is_cancelled = True + + # cancel the request if it exists + if self.request_receipt is not None: + try: + self.request_receipt.cancel() + except Exception as e: + print(f"Failed to cancel request: {e}") + + # clean up the link if we created it + if self.link is not None: + try: + self.link.teardown() + except Exception as e: + print(f"Failed to teardown link: {e}") + + # notify that download was cancelled + self._download_failure_callback("cancelled") + + # setup link to destination and request download + async def download( + self, + path_lookup_timeout: int = 15, + link_establishment_timeout: int = 15, + ): + # check if cancelled before starting + if self.is_cancelled: + return + + # use existing established link if it's active + if self.destination_hash in nomadnet_cached_links: + link = nomadnet_cached_links[self.destination_hash] + if link.status is RNS.Link.ACTIVE: + print("[NomadnetDownloader] using existing link for request") + self.link_established(link) + return + + # determine when to timeout + timeout_after_seconds = time.time() + path_lookup_timeout + + # check if we have a path to the destination + if not RNS.Transport.has_path(self.destination_hash): + # we don't have a path, so we need to request it + RNS.Transport.request_path(self.destination_hash) + + # wait until we have a path, or give up after the configured timeout + while ( + not RNS.Transport.has_path(self.destination_hash) + and time.time() < timeout_after_seconds + ): + # check if cancelled during path lookup + if self.is_cancelled: + return + await asyncio.sleep(0.1) + + # if we still don't have a path, we can't establish a link, so bail out + if not RNS.Transport.has_path(self.destination_hash): + self._download_failure_callback("Could not find path to destination.") + return + + # check if cancelled before establishing link + if self.is_cancelled: + return + + # create destination to nomadnet node + identity = RNS.Identity.recall(self.destination_hash) + destination = RNS.Destination( + identity, + RNS.Destination.OUT, + RNS.Destination.SINGLE, + self.app_name, + self.aspects, + ) + + # create link to destination + print("[NomadnetDownloader] establishing new link for request") + link = RNS.Link(destination, established_callback=self.link_established) + self.link = link + + # determine when to timeout + timeout_after_seconds = time.time() + link_establishment_timeout + + # wait until we have established a link, or give up after the configured timeout + while ( + link.status is not RNS.Link.ACTIVE and time.time() < timeout_after_seconds + ): + # check if cancelled during link establishment + if self.is_cancelled: + return + await asyncio.sleep(0.1) + + # if we still haven't established a link, bail out + if link.status is not RNS.Link.ACTIVE: + self._download_failure_callback("Could not establish link to destination.") + + # link to destination was established, we should now request the download + def link_established(self, link): + # check if cancelled before requesting + if self.is_cancelled: + return + + # cache link for using in future requests + nomadnet_cached_links[self.destination_hash] = link + + # request download over link + self.request_receipt = link.request( + self.path, + data=self.data, + response_callback=self.on_response, + failed_callback=self.on_failed, + progress_callback=self.on_progress, + timeout=self.timeout, + ) + + # handle successful download + def on_response(self, request_receipt: RNS.RequestReceipt): + self._download_success_callback(request_receipt) + + # handle failure + def on_failed(self, request_receipt=None): + self._download_failure_callback("request_failed") + + # handle download progress + def on_progress(self, request_receipt): + self.on_progress_update(request_receipt.progress) + + +class NomadnetPageDownloader(NomadnetDownloader): + def __init__( + self, + destination_hash: bytes, + page_path: str, + data: str | None, + on_page_download_success: Callable[[str], None], + on_page_download_failure: Callable[[str], None], + on_progress_update: Callable[[float], None], + timeout: int | None = None, + ): + self.on_page_download_success = on_page_download_success + self.on_page_download_failure = on_page_download_failure + super().__init__( + destination_hash, + page_path, + data, + self.on_download_success, + self.on_download_failure, + on_progress_update, + timeout, + ) + + # page download was successful, decode the response and send to provided callback + def on_download_success(self, request_receipt: RNS.RequestReceipt): + micron_markup_response = request_receipt.response.decode("utf-8") + self.on_page_download_success(micron_markup_response) + + # page download failed, send error to provided callback + def on_download_failure(self, failure_reason): + self.on_page_download_failure(failure_reason) + + +class NomadnetFileDownloader(NomadnetDownloader): + def __init__( + self, + destination_hash: bytes, + page_path: str, + on_file_download_success: Callable[[str, bytes], None], + on_file_download_failure: Callable[[str], None], + on_progress_update: Callable[[float], None], + timeout: int | None = None, + ): + self.on_file_download_success = on_file_download_success + self.on_file_download_failure = on_file_download_failure + super().__init__( + destination_hash, + page_path, + None, + self.on_download_success, + self.on_download_failure, + on_progress_update, + timeout, + ) + + # file download was successful, decode the response and send to provided callback + def on_download_success(self, request_receipt: RNS.RequestReceipt): + # get response + response = request_receipt.response + + # handle buffered reader response + if isinstance(response, io.BufferedReader): + # get file name from metadata + file_name = "downloaded_file" + metadata = request_receipt.metadata + if metadata is not None and "name" in metadata: + file_path = metadata["name"].decode("utf-8") + file_name = os.path.basename(file_path) + + # get file data + file_data: bytes = response.read() + + self.on_file_download_success(file_name, file_data) + return + + # check for list response with bytes in position 0, and metadata dict in position 1 + # e.g: [file_bytes, {name: "filename.ext"}] + if isinstance(response, list) and isinstance(response[1], dict): + file_data: bytes = response[0] + metadata: dict = response[1] + + # get file name from metadata + file_name = "downloaded_file" + if metadata is not None and "name" in metadata: + file_path = metadata["name"].decode("utf-8") + file_name = os.path.basename(file_path) + + self.on_file_download_success(file_name, file_data) + return + + # try using original response format + # unsure if this is actually used anymore now that a buffered reader is provided + # have left here just in case... + try: + file_name: str = response[0] + file_data: bytes = response[1] + self.on_file_download_success(file_name, file_data) + except Exception: + self.on_download_failure("unsupported_response") + + # page download failed, send error to provided callback + def on_download_failure(self, failure_reason): + self.on_file_download_failure(failure_reason) diff --git a/meshchatx/src/backend/nomadnet_utils.py b/meshchatx/src/backend/nomadnet_utils.py new file mode 100644 index 0000000..8c202cc --- /dev/null +++ b/meshchatx/src/backend/nomadnet_utils.py @@ -0,0 +1,61 @@ +def convert_nomadnet_string_data_to_map(path_data: str | None): + data = {} + if path_data is not None: + for field in path_data.split("|"): + if "=" in field: + parts = field.split("=", 1) + if len(parts) == 2: + variable_name, variable_value = parts + data[f"var_{variable_name}"] = variable_value + else: + print(f"unhandled field: {field}") + return data + + +def convert_nomadnet_field_data_to_map(field_data): + data = {} + if field_data is not None or "{}": + try: + json_data = field_data + if isinstance(json_data, dict): + data = {f"field_{key}": value for key, value in json_data.items()} + else: + return None + except Exception as e: + print(f"skipping invalid field data: {e}") + + return data + + +class NomadNetworkManager: + def __init__(self, config, archiver_manager, database): + self.config = config + self.archiver_manager = archiver_manager + self.database = database + + def archive_page( + self, + destination_hash: str, + page_path: str, + content: str, + is_manual: bool = False, + ): + if not is_manual and not self.config.page_archiver_enabled.get(): + return + + self.archiver_manager.archive_page( + destination_hash, + page_path, + content, + max_versions=self.config.page_archiver_max_versions.get(), + max_storage_gb=self.config.archives_max_storage_gb.get(), + ) + + def get_archived_page_versions(self, destination_hash: str, page_path: str): + return self.database.misc.get_archived_page_versions( + destination_hash, + page_path, + ) + + def flush_all_archived_pages(self): + self.database.misc.delete_archived_pages() diff --git a/meshchatx/src/backend/recovery/__init__.py b/meshchatx/src/backend/recovery/__init__.py new file mode 100644 index 0000000..5992cca --- /dev/null +++ b/meshchatx/src/backend/recovery/__init__.py @@ -0,0 +1,3 @@ +from .crash_recovery import CrashRecovery + +__all__ = ["CrashRecovery"] diff --git a/meshchatx/src/backend/recovery/crash_recovery.py b/meshchatx/src/backend/recovery/crash_recovery.py new file mode 100644 index 0000000..9e88b72 --- /dev/null +++ b/meshchatx/src/backend/recovery/crash_recovery.py @@ -0,0 +1,301 @@ +import sys +import os +import traceback +import platform +import shutil +import sqlite3 +import psutil +import RNS + + +class CrashRecovery: + """ + A diagnostic utility that intercepts application crashes and provides + meaningful error reports and system state analysis. + """ + + def __init__( + self, + storage_dir=None, + database_path=None, + public_dir=None, + reticulum_config_dir=None, + ): + self.storage_dir = storage_dir + self.database_path = database_path + self.public_dir = public_dir + self.reticulum_config_dir = reticulum_config_dir + self.enabled = True + + # Check environment variable to allow disabling the recovery system + env_val = os.environ.get("MESHCHAT_NO_CRASH_RECOVERY", "").lower() + if env_val in ("true", "1", "yes", "on"): + self.enabled = False + + def install(self): + """ + Installs the crash recovery exception hook into the system. + """ + if not self.enabled: + return + + sys.excepthook = self.handle_exception + + def disable(self): + """ + Disables the crash recovery system manually. + """ + self.enabled = False + + def update_paths( + self, + storage_dir=None, + database_path=None, + public_dir=None, + reticulum_config_dir=None, + ): + """ + Updates the internal paths used for system diagnosis. + """ + if storage_dir: + self.storage_dir = storage_dir + if database_path: + self.database_path = database_path + if public_dir: + self.public_dir = public_dir + if reticulum_config_dir: + self.reticulum_config_dir = reticulum_config_dir + + def handle_exception(self, exc_type, exc_value, exc_traceback): + """ + Intercepts unhandled exceptions to provide a detailed diagnosis report. + """ + # Let keyboard interrupts pass through normally + if issubclass(exc_type, KeyboardInterrupt): + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + + # Use stderr for everything to ensure correct ordering in logs and console + out = sys.stderr + + # Print visual separator + out.write("\n" + "=" * 70 + "\n") + out.write("!!! APPLICATION CRASH DETECTED !!!\n") + out.write("=" * 70 + "\n") + + out.write("\nError Summary:\n") + out.write(f" Type: {exc_type.__name__}\n") + out.write(f" Message: {exc_value}\n") + + out.write("\nSystem Environment Diagnosis:\n") + try: + self.run_diagnosis(file=out) + except Exception as e: + out.write(f" [ERROR] Failed to complete diagnosis: {e}\n") + + out.write("\nTechnical Traceback:\n") + traceback.print_exception(exc_type, exc_value, exc_traceback, file=out) + + out.write("\n" + "=" * 70 + "\n") + out.write("Recovery Suggestions:\n") + out.write(" 1. Review the 'System Environment Diagnosis' section above.\n") + out.write( + " 2. Verify that all dependencies are installed (poetry install or pip install -r requirements.txt).\n" + ) + out.write( + " 3. If database corruption is suspected, try starting with --auto-recover.\n" + ) + out.write( + " 4. If the issue persists, report it to Ivan over another LXMF client: 7cc8d66b4f6a0e0e49d34af7f6077b5a\n" + ) + out.write("=" * 70 + "\n\n") + out.flush() + + # Exit with error code + sys.exit(1) + + def run_diagnosis(self, file=sys.stderr): + """ + Performs a series of OS-agnostic checks on the application's environment. + """ + # Basic System Info + file.write( + f"- OS: {platform.system()} {platform.release()} ({platform.machine()})\n" + ) + file.write(f"- Python: {sys.version.split()[0]}\n") + + # Resource Monitoring + try: + mem = psutil.virtual_memory() + file.write( + f"- Memory: {mem.percent}% used ({mem.available / (1024**2):.1f} MB available)\n" + ) + if mem.percent > 95: + file.write(" [CRITICAL] System memory is dangerously low!\n") + except Exception: + pass + + # Filesystem Status + if self.storage_dir: + file.write(f"- Storage Path: {self.storage_dir}\n") + if not os.path.exists(self.storage_dir): + file.write( + " [ERROR] Storage path does not exist. Check MESHCHAT_STORAGE_DIR.\n" + ) + else: + if not os.access(self.storage_dir, os.W_OK): + file.write( + " [ERROR] Storage path is NOT writable. Check filesystem permissions.\n" + ) + + try: + usage = shutil.disk_usage(self.storage_dir) + free_mb = usage.free / (1024**2) + file.write(f" - Disk Space: {free_mb:.1f} MB free\n") + if free_mb < 50: + file.write( + " [CRITICAL] Disk space is critically low (< 50MB)!\n" + ) + except Exception: + pass + + # Database Integrity + if self.database_path: + file.write(f"- Database: {self.database_path}\n") + if os.path.exists(self.database_path): + if os.path.getsize(self.database_path) == 0: + file.write( + " [WARNING] Database file exists but is empty (0 bytes).\n" + ) + else: + try: + # Open in read-only mode for safety during crash handling + conn = sqlite3.connect( + f"file:{self.database_path}?mode=ro", uri=True + ) + cursor = conn.cursor() + cursor.execute("PRAGMA integrity_check") + res = cursor.fetchone()[0] + if res != "ok": + file.write( + f" [ERROR] Database corruption detected: {res}\n" + ) + else: + file.write(" - Integrity: OK\n") + conn.close() + except sqlite3.DatabaseError as e: + file.write( + f" [ERROR] Database is unreadable or not a SQLite file: {e}\n" + ) + except Exception as e: + file.write(f" [ERROR] Database check failed: {e}\n") + else: + file.write(" - Database: File not yet created\n") + + # Frontend Assets + if self.public_dir: + file.write(f"- Frontend Assets: {self.public_dir}\n") + if not os.path.exists(self.public_dir): + file.write( + " [ERROR] Frontend directory is missing. Web interface will fail to load.\n" + ) + else: + index_path = os.path.join(self.public_dir, "index.html") + if not os.path.exists(index_path): + file.write( + " [ERROR] index.html not found in frontend directory!\n" + ) + else: + file.write(" - Frontend Status: Assets verified\n") + + # Reticulum Status + self.run_reticulum_diagnosis(file=file) + + def run_reticulum_diagnosis(self, file=sys.stderr): + """ + Diagnoses the Reticulum Network Stack environment. + """ + file.write("- Reticulum Network Stack:\n") + + # Check config directory + config_dir = self.reticulum_config_dir or RNS.Reticulum.configpath + file.write(f" - Config Directory: {config_dir}\n") + + if not os.path.exists(config_dir): + file.write(" [ERROR] Reticulum config directory does not exist.\n") + return + + config_file = os.path.join(config_dir, "config") + if not os.path.exists(config_file): + file.write(" [ERROR] Reticulum config file is missing.\n") + else: + try: + # Basic config validation + with open(config_file, "r") as f: + content = f.read() + if "[reticulum]" not in content: + file.write( + " [ERROR] Reticulum config file is invalid (missing [reticulum] section).\n" + ) + else: + file.write(" - Config File: OK\n") + except Exception as e: + file.write(f" [ERROR] Could not read Reticulum config: {e}\n") + + # Extract recent RNS log entries if possible + # Check common log file locations + log_paths = [ + os.path.join(config_dir, "logfile"), + os.path.join(config_dir, "rnsd.log"), + "/var/log/rnsd.log", + ] + + found_logs = False + for logfile in log_paths: + if os.path.exists(logfile): + file.write(f" - Recent Log Entries ({logfile}):\n") + try: + with open(logfile, "r") as f: + lines = f.readlines() + if not lines: + file.write(" (Log file is empty)\n") + else: + for line in lines[-15:]: + if "ERROR" in line or "CRITICAL" in line: + file.write(f" > [ALERT] {line.strip()}\n") + else: + file.write(f" > {line.strip()}\n") + found_logs = True + break # Stop at first found log file + except Exception as e: + file.write(f" [ERROR] Could not read logfile: {e}\n") + + if not found_logs: + file.write(" - Logs: No RNS log files found in standard locations.\n") + + # Check for interfaces and transport status + try: + # Try to get more info from RNS if it's already running + if hasattr(RNS.Transport, "interfaces") and RNS.Transport.interfaces: + file.write(f" - Active Interfaces: {len(RNS.Transport.interfaces)}\n") + for iface in RNS.Transport.interfaces: + status = "Active" if iface.online else "Offline" + file.write(f" > {iface} [{status}]\n") + else: + file.write( + " - Active Interfaces: None registered (Reticulum may not be initialized yet)\n" + ) + except Exception: + pass + + # Check for common port conflicts + common_ports = [4242, 8000, 8080] # Reticulum default is often 4242 + for port in common_ports: + try: + for conn in psutil.net_connections(): + if conn.laddr.port == port and conn.status == "LISTEN": + file.write( + f" [ALERT] Port {port} is already in use by PID {conn.pid}. Potential conflict.\n" + ) + except Exception: + pass diff --git a/meshchatx/src/backend/ringtone_manager.py b/meshchatx/src/backend/ringtone_manager.py index 8164ce6..7a87b6e 100644 --- a/meshchatx/src/backend/ringtone_manager.py +++ b/meshchatx/src/backend/ringtone_manager.py @@ -38,7 +38,7 @@ class RingtoneManager: filename = f"ringtone_{secrets.token_hex(8)}.opus" opus_path = os.path.join(self.storage_dir, filename) - subprocess.run( + subprocess.run( # noqa: S603 [ self.ffmpeg_path, "-i", diff --git a/meshchatx/src/backend/telephone_manager.py b/meshchatx/src/backend/telephone_manager.py index 9320aff..9b22c17 100644 --- a/meshchatx/src/backend/telephone_manager.py +++ b/meshchatx/src/backend/telephone_manager.py @@ -1,10 +1,34 @@ import asyncio +import os import time import RNS from LXST import Telephone +class Tee: + def __init__(self, sink): + self.sinks = [sink] + + def add_sink(self, sink): + if sink not in self.sinks: + self.sinks.append(sink) + + def remove_sink(self, sink): + if sink in self.sinks: + self.sinks.remove(sink) + + def handle_frame(self, frame, source): + for sink in self.sinks: + try: + sink.handle_frame(frame, source) + except Exception as e: + RNS.log(f"Tee: Error in sink handle_frame: {e}", RNS.LOG_ERROR) + + def can_receive(self, from_source=None): + return any(sink.can_receive(from_source) for sink in self.sinks) + + class TelephoneManager: # LXST Status Constants for reference: # 0: STATUS_BUSY @@ -15,9 +39,20 @@ class TelephoneManager: # 5: STATUS_CONNECTING # 6: STATUS_ESTABLISHED - def __init__(self, identity: RNS.Identity, config_manager=None): + def __init__( + self, identity: RNS.Identity, config_manager=None, storage_dir=None, db=None + ): self.identity = identity self.config_manager = config_manager + self.storage_dir = storage_dir + self.db = db + self.get_name_for_identity_hash = None + self.recordings_dir = ( + os.path.join(storage_dir, "recordings") if storage_dir else None + ) + if self.recordings_dir: + os.makedirs(self.recordings_dir, exist_ok=True) + self.telephone = None self.on_ringing_callback = None self.on_established_callback = None @@ -27,6 +62,19 @@ class TelephoneManager: self.call_status_at_end = None self.call_is_incoming = False + # Manual mute overrides in case LXST internal muting is buggy + self.transmit_muted = False + self.receive_muted = False + + self.initiation_status = None + self.initiation_target_hash = None + self.on_initiation_status_callback = None + + @property + def is_recording(self): + # Disabled for now + return False + def init_telephone(self): if self.telephone is not None: return @@ -34,6 +82,12 @@ class TelephoneManager: self.telephone = Telephone(self.identity) # Disable busy tone played on caller side when remote side rejects, or doesn't answer self.telephone.set_busy_tone_time(0) + + # Set initial profile from config + if self.config_manager: + profile_id = self.config_manager.telephone_audio_profile_id.get() + self.telephone.switch_profile(profile_id) + self.telephone.set_ringing_callback(self.on_telephone_ringing) self.telephone.set_established_callback(self.on_telephone_call_established) self.telephone.set_ended_callback(self.on_telephone_call_ended) @@ -61,6 +115,11 @@ class TelephoneManager: def on_telephone_call_established(self, caller_identity: RNS.Identity): # Update start time to when it was actually established for duration calculation self.call_start_time = time.time() + + # Recording disabled for now due to stability issues with LXST + # if self.config_manager and self.config_manager.call_recording_enabled.get(): + # self.start_recording() + if self.on_established_callback: self.on_established_callback(caller_identity) @@ -72,34 +131,170 @@ class TelephoneManager: if self.on_ended_callback: self.on_ended_callback(caller_identity) + def start_recording(self): + # Disabled for now as LXST does not have a Tee to use + pass + + def stop_recording(self): + # Disabled for now + pass + def announce(self, attached_interface=None): if self.telephone: self.telephone.announce(attached_interface=attached_interface) + def _update_initiation_status(self, status, target_hash=None): + self.initiation_status = status + if target_hash is not None or status is None: + self.initiation_target_hash = target_hash + if self.on_initiation_status_callback: + try: + self.on_initiation_status_callback( + self.initiation_status, self.initiation_target_hash + ) + except Exception as e: + RNS.log( + f"TelephoneManager: Error in initiation status callback: {e}", + RNS.LOG_ERROR, + ) + async def initiate(self, destination_hash: bytes, timeout_seconds: int = 15): if self.telephone is None: msg = "Telephone is not initialized" raise RuntimeError(msg) - # Find destination identity - destination_identity = RNS.Identity.recall(destination_hash) - if destination_identity is None: - # If not found by identity hash, try as destination hash - destination_identity = RNS.Identity.recall( - destination_hash, - ) # Identity.recall takes identity hash - - if destination_identity is None: - msg = "Destination identity not found" + if self.telephone.busy or self.initiation_status: + msg = "Telephone is already in use" raise RuntimeError(msg) - # In LXST, we just call the identity. Telephone class handles path requests. - # But we might want to ensure a path exists first for better UX, - # similar to how the old MeshChat did it. + destination_hash_hex = destination_hash.hex() + self._update_initiation_status("Resolving identity...", destination_hash_hex) - # For now, let's just use the telephone.call method which is threaded. - # We need to run it in a thread since it might block. - self.call_start_time = time.time() - self.call_is_incoming = False - await asyncio.to_thread(self.telephone.call, destination_identity) - return self.telephone.active_call + try: + # Find destination identity + destination_identity = RNS.Identity.recall(destination_hash) + + # If identity not found, check if it's a destination hash in our announces + if destination_identity is None and self.db: + announce = self.db.announces.get_announce_by_hash(destination_hash_hex) + if announce: + identity_hash = bytes.fromhex(announce["identity_hash"]) + destination_identity = RNS.Identity.recall(identity_hash) + + if destination_identity is None: + self._update_initiation_status("Discovering path/identity...") + RNS.Transport.request_path(destination_hash) + + # Wait for identity to appear + start_wait = time.time() + while time.time() - start_wait < timeout_seconds: + await asyncio.sleep(0.5) + destination_identity = RNS.Identity.recall(destination_hash) + if destination_identity: + break + + if self.db: + announce = self.db.announces.get_announce_by_hash( + destination_hash_hex + ) + if announce: + identity_hash = bytes.fromhex(announce["identity_hash"]) + destination_identity = RNS.Identity.recall(identity_hash) + if destination_identity: + break + + if destination_identity is None: + self._update_initiation_status(None, None) + msg = "Destination identity not found" + raise RuntimeError(msg) + + if not RNS.Transport.has_path(destination_hash): + self._update_initiation_status("Requesting path...") + RNS.Transport.request_path(destination_hash) + + self._update_initiation_status("Dialing...") + self.call_start_time = time.time() + self.call_is_incoming = False + + # Use a thread for the blocking LXST call + await asyncio.to_thread(self.telephone.call, destination_identity) + return self.telephone.active_call + + except Exception as e: + self._update_initiation_status(f"Failed: {str(e)}") + await asyncio.sleep(3) + raise + finally: + self._update_initiation_status(None, None) + + def mute_transmit(self): + if self.telephone: + # Manual override as LXST internal muting can be buggy + if hasattr(self.telephone, "audio_input") and self.telephone.audio_input: + try: + self.telephone.audio_input.stop() + except Exception as e: + RNS.log(f"Failed to stop audio input for mute: {e}", RNS.LOG_ERROR) + + # Still call the internal method just in case it does something useful + try: + self.telephone.mute_transmit() + except Exception: # noqa: S110 + pass + + self.transmit_muted = True + + def unmute_transmit(self): + if self.telephone: + # Manual override as LXST internal muting can be buggy + if hasattr(self.telephone, "audio_input") and self.telephone.audio_input: + try: + self.telephone.audio_input.start() + except Exception as e: + RNS.log( + f"Failed to start audio input for unmute: {e}", RNS.LOG_ERROR + ) + + # Still call the internal method just in case + try: + self.telephone.unmute_transmit() + except Exception: # noqa: S110 + pass + + self.transmit_muted = False + + def mute_receive(self): + if self.telephone: + # Manual override as LXST internal muting can be buggy + if hasattr(self.telephone, "audio_output") and self.telephone.audio_output: + try: + self.telephone.audio_output.stop() + except Exception as e: + RNS.log(f"Failed to stop audio output for mute: {e}", RNS.LOG_ERROR) + + # Still call the internal method just in case + try: + self.telephone.mute_receive() + except Exception: # noqa: S110 + pass + + self.receive_muted = True + + def unmute_receive(self): + if self.telephone: + # Manual override as LXST internal muting can be buggy + if hasattr(self.telephone, "audio_output") and self.telephone.audio_output: + try: + self.telephone.audio_output.start() + except Exception as e: + RNS.log( + f"Failed to start audio output for unmute: {e}", RNS.LOG_ERROR + ) + + # Still call the internal method just in case + try: + self.telephone.unmute_receive() + except Exception: # noqa: S110 + pass + + self.receive_muted = False diff --git a/meshchatx/src/backend/translator_handler.py b/meshchatx/src/backend/translator_handler.py index 3946acc..ac12b3f 100644 --- a/meshchatx/src/backend/translator_handler.py +++ b/meshchatx/src/backend/translator_handler.py @@ -64,7 +64,8 @@ LANGUAGE_CODE_TO_NAME = { class TranslatorHandler: - def __init__(self, libretranslate_url: str | None = None): + def __init__(self, libretranslate_url: str | None = None, enabled: bool = False): + self.enabled = enabled self.libretranslate_url = libretranslate_url or os.getenv( "LIBRETRANSLATE_URL", "http://localhost:5000", @@ -76,6 +77,9 @@ class TranslatorHandler: def get_supported_languages(self, libretranslate_url: str | None = None): languages = [] + if not self.enabled: + return languages + url = libretranslate_url or self.libretranslate_url if self.has_requests: @@ -131,6 +135,10 @@ class TranslatorHandler: use_argos: bool = False, libretranslate_url: str | None = None, ) -> dict[str, Any]: + if not self.enabled: + msg = "Translator is disabled" + raise RuntimeError(msg) + if not text: msg = "Text cannot be empty" raise ValueError(msg) diff --git a/meshchatx/src/backend/voicemail_manager.py b/meshchatx/src/backend/voicemail_manager.py index ebbd1e2..47e4f44 100644 --- a/meshchatx/src/backend/voicemail_manager.py +++ b/meshchatx/src/backend/voicemail_manager.py @@ -141,8 +141,34 @@ class VoicemailManager: wav_path = os.path.join(self.greetings_dir, "greeting.wav") try: - # espeak-ng to WAV - subprocess.run([self.espeak_path, "-w", wav_path, text], check=True) + # espeak-ng to WAV with improved parameters + speed = str(self.config.voicemail_tts_speed.get()) + pitch = str(self.config.voicemail_tts_pitch.get()) + voice = self.config.voicemail_tts_voice.get() + gap = str(self.config.voicemail_tts_word_gap.get()) + + cmd = [ + self.espeak_path, + "-s", + speed, + "-p", + pitch, + "-g", + gap, + "-k", + "10", + "-v", + voice, + "-w", + wav_path, + text, + ] + + RNS.log( + f"Voicemail: Generating greeting with command: {' '.join(cmd)}", + RNS.LOG_DEBUG, + ) + subprocess.run(cmd, check=True) # noqa: S603 # Convert WAV to Opus return self.convert_to_greeting(wav_path) @@ -160,7 +186,7 @@ class VoicemailManager: if os.path.exists(opus_path): os.remove(opus_path) - subprocess.run( + subprocess.run( # noqa: S603 [ self.ffmpeg_path, "-i", @@ -169,6 +195,10 @@ class VoicemailManager: "libopus", "-b:a", "16k", + "-ar", + "48000", + "-ac", + "1", "-vbr", "on", opus_path, @@ -214,11 +244,16 @@ class VoicemailManager: RNS.LOG_DEBUG, ) + active_call_remote_identity = ( + telephone.active_call.get_remote_identity() + if (telephone and telephone.active_call) + else None + ) if ( telephone and telephone.active_call - and telephone.active_call.get_remote_identity().hash - == caller_identity.hash + and active_call_remote_identity + and active_call_remote_identity.hash == caller_identity.hash and telephone.call_status == 4 # Ringing ): RNS.log( @@ -232,10 +267,17 @@ class VoicemailManager: RNS.LOG_DEBUG, ) if telephone.active_call: - RNS.log( - f"Voicemail: Active call remote: {RNS.prettyhexrep(telephone.active_call.get_remote_identity().hash)}", - RNS.LOG_DEBUG, - ) + remote_identity = telephone.active_call.get_remote_identity() + if remote_identity: + RNS.log( + f"Voicemail: Active call remote: {RNS.prettyhexrep(remote_identity.hash)}", + RNS.LOG_DEBUG, + ) + else: + RNS.log( + "Voicemail: Active call remote identity not found", + RNS.LOG_DEBUG, + ) threading.Thread(target=voicemail_job, daemon=True).start() @@ -360,6 +402,9 @@ class VoicemailManager: threading.Thread(target=session_job, daemon=True).start() def start_recording(self, caller_identity): + # Disabled for now + return + telephone = self.telephone_manager.telephone if not telephone or not telephone.active_call: return @@ -370,17 +415,12 @@ class VoicemailManager: try: self.recording_sink = OpusFileSink(filepath) - # Ensure samplerate is set to avoid TypeError in LXST Opus codec - # which expects sink to have a valid samplerate attribute self.recording_sink.samplerate = 48000 - # Connect the caller's audio source to our sink - # active_call.audio_source is a LinkSource that feeds into receive_mixer - # We want to record what we receive. self.recording_pipeline = Pipeline( - source=telephone.active_call.audio_source, - codec=Null(), - sink=self.recording_sink, + telephone.active_call.audio_source, + Null(), + self.recording_sink, ) self.recording_pipeline.start() @@ -402,7 +442,13 @@ class VoicemailManager: try: duration = int(time.time() - self.recording_start_time) - self.recording_pipeline.stop() + + if self.recording_pipeline: + self.recording_pipeline.stop() + + if self.recording_sink: + self.recording_sink.stop() + self.recording_sink = None self.recording_pipeline = None @@ -446,6 +492,9 @@ class VoicemailManager: self.is_recording = False def start_greeting_recording(self): + # Disabled for now + return + telephone = self.telephone_manager.telephone if not telephone: return @@ -469,11 +518,12 @@ class VoicemailManager: self.greeting_recording_sink.samplerate = 48000 self.greeting_recording_pipeline = Pipeline( - source=telephone.audio_input, - codec=Null(), - sink=self.greeting_recording_sink, + telephone.audio_input, + Null(), + self.greeting_recording_sink, ) self.greeting_recording_pipeline.start() + self.is_greeting_recording = True RNS.log("Voicemail: Started recording greeting from mic", RNS.LOG_DEBUG) except Exception as e: @@ -487,7 +537,12 @@ class VoicemailManager: return try: - self.greeting_recording_pipeline.stop() + if self.greeting_recording_pipeline: + self.greeting_recording_pipeline.stop() + + if self.greeting_recording_sink: + self.greeting_recording_sink.stop() + self.greeting_recording_sink = None self.greeting_recording_pipeline = None self.is_greeting_recording = False