diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index 4043934..2489666 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -79,6 +79,7 @@ from meshchatx.src.backend.nomadnet_utils import ( convert_nomadnet_field_data_to_map, convert_nomadnet_string_data_to_map, ) +from meshchatx.src.backend.persistent_log_handler import PersistentLogHandler from meshchatx.src.backend.recovery import CrashRecovery from meshchatx.src.backend.rnprobe_handler import RNProbeHandler from meshchatx.src.backend.sideband_commands import SidebandCommands @@ -89,31 +90,8 @@ import collections import logging -class MemoryLogHandler(logging.Handler): - def __init__(self, capacity=5000): - super().__init__() - self.logs = collections.deque(maxlen=capacity) - - def emit(self, record): - try: - msg = self.format(record) - self.logs.append( - { - "timestamp": datetime.now(UTC).isoformat(), - "level": record.levelname, - "module": record.module, - "message": msg, - } - ) - except Exception: - self.handleError(record) - - def get_logs(self): - return list(self.logs) - - # Global log handler -memory_log_handler = MemoryLogHandler() +memory_log_handler = PersistentLogHandler() logging.basicConfig( level=logging.INFO, handlers=[memory_log_handler, logging.StreamHandler(sys.stdout)] ) @@ -517,6 +495,9 @@ class ReticulumMeshChat: self.contexts[identity_hash] = context self.current_context = context context.setup() + + # Link database to memory log handler + memory_log_handler.set_database(context.database) def _checkpoint_and_close(self): # delegated to database instance @@ -1989,10 +1970,39 @@ class ReticulumMeshChat: async def call_html_redirect(request): return web.HTTPFound("/#/popout/call") - # serve ping + # serve debug logs @routes.get("/api/v1/debug/logs") async def get_debug_logs(request): - return web.json_response(memory_log_handler.get_logs()) + search = request.query.get("search") + level = request.query.get("level") + module = request.query.get("module") + is_anomaly = parse_bool_query_param(request.query.get("is_anomaly")) + limit = int(request.query.get("limit", 100)) + offset = int(request.query.get("offset", 0)) + + logs = memory_log_handler.get_logs( + limit=limit, + offset=offset, + search=search, + level=level, + module=module, + is_anomaly=is_anomaly, + ) + total = memory_log_handler.get_total_count( + search=search, + level=level, + module=module, + is_anomaly=is_anomaly, + ) + + return web.json_response( + { + "logs": logs, + "total": total, + "limit": limit, + "offset": offset, + } + ) @routes.post("/api/v1/database/snapshot") async def create_db_snapshot(request): @@ -3166,6 +3176,7 @@ class ReticulumMeshChat: "download_stats": { "avg_download_speed_bps": avg_download_speed_bps, }, + "emergency": getattr(self, "emergency", False), "integrity_issues": getattr(self, "integrity_issues", []), "user_guidance": self.build_user_guidance_messages(), "tutorial_seen": self.config.get("tutorial_seen", "false") @@ -3913,6 +3924,12 @@ class ReticulumMeshChat: d = dict(row) remote_identity_hash = d.get("remote_identity_hash") if remote_identity_hash: + # try to resolve name if unknown or missing + if not d.get("remote_identity_name") or d.get("remote_identity_name") == "Unknown": + resolved_name = self.get_name_for_identity_hash(remote_identity_hash) + if resolved_name: + d["remote_identity_name"] = resolved_name + lxmf_hash = self.get_lxmf_destination_hash_for_identity_hash( remote_identity_hash, ) @@ -4307,52 +4324,68 @@ class ReticulumMeshChat: @routes.get("/api/v1/telephone/ringtones/status") async def telephone_ringtone_status(request): - caller_hash = request.query.get("caller_hash") + try: + caller_hash = request.query.get("caller_hash") - ringtone_id = None + ringtone_id = None - # 1. check contact preferred ringtone - if caller_hash: - contact = self.database.contacts.get_contact_by_identity_hash( - caller_hash + # 1. check contact preferred ringtone + if caller_hash: + contact = self.database.contacts.get_contact_by_identity_hash( + caller_hash + ) + if contact and contact.get("preferred_ringtone_id"): + ringtone_id = contact["preferred_ringtone_id"] + + # 2. check global preferred for non-contacts + if ringtone_id is None: + preferred_id = self.config.ringtone_preferred_id.get() + if preferred_id: + ringtone_id = preferred_id + + # 3. fallback to primary + if ringtone_id is None: + primary = self.database.ringtones.get_primary() + if primary: + ringtone_id = primary["id"] + + # 4. handle random if selected (-1) + if ringtone_id == -1: + import random + + ringtones = self.database.ringtones.get_all() + if ringtones: + ringtone_id = random.choice(ringtones)["id"] # noqa: S311 + else: + ringtone_id = None + + has_custom = ringtone_id is not None + ringtone = ( + self.database.ringtones.get_by_id(ringtone_id) + if has_custom + else None ) - if contact and contact.get("preferred_ringtone_id"): - ringtone_id = contact["preferred_ringtone_id"] - # 2. check global preferred for non-contacts - if ringtone_id is None: - preferred_id = self.config.ringtone_preferred_id.get() - if preferred_id: - ringtone_id = preferred_id - - # 3. fallback to primary - if ringtone_id is None: - primary = self.database.ringtones.get_primary() - if primary: - ringtone_id = primary["id"] - - # 4. handle random if selected (-1) - if ringtone_id == -1: - import random - - ringtones = self.database.ringtones.get_all() - if ringtones: - ringtone_id = random.choice(ringtones)["id"] # noqa: S311 - - has_custom = ringtone_id is not None - ringtone = ( - self.database.ringtones.get_by_id(ringtone_id) if has_custom else None - ) - - return web.json_response( - { - "has_custom_ringtone": has_custom, - "enabled": self.config.custom_ringtone_enabled.get(), - "filename": ringtone["filename"] if ringtone else None, - "id": ringtone_id if ringtone_id != -1 else None, - "volume": self.config.ringtone_volume.get() / 100.0, - }, - ) + return web.json_response( + { + "has_custom_ringtone": has_custom and ringtone is not None, + "enabled": self.config.custom_ringtone_enabled.get(), + "filename": ringtone["filename"] if ringtone else None, + "id": ringtone_id if ringtone_id != -1 else None, + "volume": self.config.ringtone_volume.get() / 100.0, + }, + ) + except Exception as e: + logger.error(f"Error in telephone_ringtone_status: {e}") + return web.json_response( + { + "has_custom_ringtone": False, + "enabled": self.config.custom_ringtone_enabled.get(), + "filename": None, + "id": None, + "volume": self.config.ringtone_volume.get() / 100.0, + }, + ) @routes.get("/api/v1/telephone/ringtones/{id}/audio") async def telephone_ringtone_audio(request): @@ -7171,6 +7204,7 @@ class ReticulumMeshChat: # update lxmf user icon name in config if "lxmf_user_icon_name" in data: self.config.lxmf_user_icon_name.set(data["lxmf_user_icon_name"]) + self.database.misc.clear_last_sent_icon_hashes() self.update_identity_metadata_cache() # update lxmf user icon foreground colour in config @@ -7178,6 +7212,7 @@ class ReticulumMeshChat: self.config.lxmf_user_icon_foreground_colour.set( data["lxmf_user_icon_foreground_colour"], ) + self.database.misc.clear_last_sent_icon_hashes() self.update_identity_metadata_cache() # update lxmf user icon background colour in config @@ -7185,6 +7220,7 @@ class ReticulumMeshChat: self.config.lxmf_user_icon_background_colour.set( data["lxmf_user_icon_background_colour"], ) + self.database.misc.clear_last_sent_icon_hashes() self.update_identity_metadata_cache() # update archiver settings diff --git a/meshchatx/src/backend/database/__init__.py b/meshchatx/src/backend/database/__init__.py index 65fe3da..33e7d60 100644 --- a/meshchatx/src/backend/database/__init__.py +++ b/meshchatx/src/backend/database/__init__.py @@ -6,6 +6,7 @@ from datetime import UTC, datetime from .announces import AnnounceDAO from .config import ConfigDAO from .contacts import ContactsDAO +from .debug_logs import DebugLogsDAO from .legacy_migrator import LegacyMigrator from .map_drawings import MapDrawingsDAO from .messages import MessageDAO @@ -32,6 +33,7 @@ class Database: self.ringtones = RingtoneDAO(self.provider) self.contacts = ContactsDAO(self.provider) self.map_drawings = MapDrawingsDAO(self.provider) + self.debug_logs = DebugLogsDAO(self.provider) def initialize(self): self.schema.initialize() diff --git a/meshchatx/src/backend/database/debug_logs.py b/meshchatx/src/backend/database/debug_logs.py new file mode 100644 index 0000000..49cd3fa --- /dev/null +++ b/meshchatx/src/backend/database/debug_logs.py @@ -0,0 +1,87 @@ +from datetime import UTC, datetime +from .provider import DatabaseProvider + + +class DebugLogsDAO: + def __init__(self, provider: DatabaseProvider): + self.provider = provider + + def insert_log(self, level, module, message, is_anomaly=0, anomaly_type=None): + sql = """ + INSERT INTO debug_logs (timestamp, level, module, message, is_anomaly, anomaly_type) + VALUES (?, ?, ?, ?, ?, ?) + """ + self.provider.execute( + sql, + ( + datetime.now(UTC).timestamp(), + level, + module, + message, + is_anomaly, + anomaly_type, + ), + ) + + def get_logs(self, limit=100, offset=0, search=None, level=None, module=None, is_anomaly=None): + sql = "SELECT * FROM debug_logs WHERE 1=1" + params = [] + + if search: + sql += " AND (message LIKE ? OR module LIKE ?)" + params.extend([f"%{search}%", f"%{search}%"]) + + if level: + sql += " AND level = ?" + params.append(level) + + if module: + sql += " AND module = ?" + params.append(module) + + if is_anomaly is not None: + sql += " AND is_anomaly = ?" + params.append(1 if is_anomaly else 0) + + sql += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + + return self.provider.fetchall(sql, tuple(params)) + + def get_total_count(self, search=None, level=None, module=None, is_anomaly=None): + sql = "SELECT COUNT(*) as count FROM debug_logs WHERE 1=1" + params = [] + + if search: + sql += " AND (message LIKE ? OR module LIKE ?)" + params.extend([f"%{search}%", f"%{search}%"]) + + if level: + sql += " AND level = ?" + params.append(level) + + if module: + sql += " AND module = ?" + params.append(module) + + if is_anomaly is not None: + sql += " AND is_anomaly = ?" + params.append(1 if is_anomaly else 0) + + row = self.provider.fetchone(sql, tuple(params)) + return row["count"] if row else 0 + + def cleanup_old_logs(self, max_logs=10000): + """Removes old logs keeping only the newest max_logs.""" + count = self.get_total_count() + if count > max_logs: + # Find the timestamp of the N-th newest log + sql = "SELECT timestamp FROM debug_logs ORDER BY timestamp DESC LIMIT 1 OFFSET ?" + row = self.provider.fetchone(sql, (max_logs - 1,)) + if row: + cutoff_ts = row["timestamp"] + self.provider.execute("DELETE FROM debug_logs WHERE timestamp < ?", (cutoff_ts,)) + + def get_anomalies(self, limit=50): + return self.get_logs(limit=limit, is_anomaly=True) + diff --git a/meshchatx/src/backend/database/misc.py b/meshchatx/src/backend/database/misc.py index b7f7bfb..61f4008 100644 --- a/meshchatx/src/backend/database/misc.py +++ b/meshchatx/src/backend/database/misc.py @@ -200,13 +200,14 @@ class MiscDAO: now = datetime.now(UTC) self.provider.execute( """ - INSERT INTO crawl_tasks (destination_hash, page_path, status, retry_count, created_at) - VALUES (?, ?, ?, ?, ?) + INSERT INTO crawl_tasks (destination_hash, page_path, status, retry_count, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(destination_hash, page_path) DO UPDATE SET status = EXCLUDED.status, - retry_count = EXCLUDED.retry_count + retry_count = EXCLUDED.retry_count, + updated_at = EXCLUDED.updated_at """, - (destination_hash, page_path, status, retry_count, now), + (destination_hash, page_path, status, retry_count, now, now), ) def get_pending_crawl_tasks(self): @@ -220,6 +221,8 @@ class MiscDAO: "page_path", "status", "retry_count", + "last_retry_at", + "next_retry_at", "updated_at", } filtered_kwargs = {k: v for k, v in kwargs.items() if k in allowed_keys} @@ -323,3 +326,6 @@ class MiscDAO: """, (destination_hash, icon_hash, now, now), ) + + def clear_last_sent_icon_hashes(self): + self.provider.execute("DELETE FROM lxmf_last_sent_icon_hashes") diff --git a/meshchatx/src/backend/database/schema.py b/meshchatx/src/backend/database/schema.py index eec2e9d..013ac1f 100644 --- a/meshchatx/src/backend/database/schema.py +++ b/meshchatx/src/backend/database/schema.py @@ -2,7 +2,7 @@ from .provider import DatabaseProvider class DatabaseSchema: - LATEST_VERSION = 32 + LATEST_VERSION = 34 def __init__(self, provider: DatabaseProvider): self.provider = provider @@ -253,6 +253,7 @@ class DatabaseSchema: next_retry_at DATETIME, status TEXT DEFAULT 'pending', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(destination_hash, page_path) ) """, @@ -386,6 +387,18 @@ class DatabaseSchema: updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """, + "debug_logs": """ + CREATE TABLE IF NOT EXISTS debug_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL, + level TEXT, + module TEXT, + message TEXT, + is_anomaly INTEGER DEFAULT 0, + anomaly_type TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """, } for table_name, create_sql in tables.items(): @@ -446,6 +459,16 @@ class DatabaseSchema: self.provider.execute( "CREATE UNIQUE INDEX IF NOT EXISTS idx_lxmf_telemetry_dest_ts_unique ON lxmf_telemetry(destination_hash, timestamp)", ) + elif table_name == "debug_logs": + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_debug_logs_timestamp ON debug_logs(timestamp)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_debug_logs_level ON debug_logs(level)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_debug_logs_anomaly ON debug_logs(is_anomaly)", + ) def migrate(self, current_version): if current_version < 7: @@ -861,6 +884,38 @@ class DatabaseSchema: ("changelog_seen_version", "0.0.0"), ) + if current_version < 33: + self.provider.execute(""" + CREATE TABLE IF NOT EXISTS debug_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL, + level TEXT, + module TEXT, + message TEXT, + is_anomaly INTEGER DEFAULT 0, + anomaly_type TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_debug_logs_timestamp ON debug_logs(timestamp)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_debug_logs_level ON debug_logs(level)", + ) + self.provider.execute( + "CREATE INDEX IF NOT EXISTS idx_debug_logs_anomaly ON debug_logs(is_anomaly)", + ) + + if current_version < 34: + # Add updated_at to crawl_tasks + try: + self.provider.execute( + "ALTER TABLE crawl_tasks ADD COLUMN updated_at DATETIME DEFAULT CURRENT_TIMESTAMP", + ) + except Exception: # noqa: S110 + pass + # Update version in config self.provider.execute( """ diff --git a/meshchatx/src/backend/identity_context.py b/meshchatx/src/backend/identity_context.py index 55e5cc0..3deb7e6 100644 --- a/meshchatx/src/backend/identity_context.py +++ b/meshchatx/src/backend/identity_context.py @@ -74,7 +74,9 @@ class IdentityContext: self.community_interfaces_manager = None self.local_lxmf_destination = None self.announce_handlers = [] - self.integrity_manager = IntegrityManager(self.storage_path, self.database_path) + self.integrity_manager = IntegrityManager( + self.storage_path, self.database_path, self.identity_hash + ) self.running = False @@ -134,7 +136,9 @@ class IdentityContext: self.config, self.app.get_public_path(), project_root=os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ) ), storage_dir=self.storage_path, ) diff --git a/meshchatx/src/backend/integrity_manager.py b/meshchatx/src/backend/integrity_manager.py index e44538d..5229a92 100644 --- a/meshchatx/src/backend/integrity_manager.py +++ b/meshchatx/src/backend/integrity_manager.py @@ -8,9 +8,10 @@ from datetime import UTC, datetime class IntegrityManager: """Manages the integrity of the database and identity files at rest.""" - def __init__(self, storage_dir, database_path): + def __init__(self, storage_dir, database_path, identity_hash=None): self.storage_dir = Path(storage_dir) self.database_path = Path(database_path) + self.identity_hash = identity_hash self.manifest_path = self.storage_dir / "integrity-manifest.json" self.issues = [] @@ -64,6 +65,16 @@ class IntegrityManager: # New files are also a concern for integrity issues.append(f"New file detected: {rel_path}") + if issues: + m_date = manifest.get("date", "Unknown") + m_time = manifest.get("time", "Unknown") + m_id = manifest.get("identity", "Unknown") + issues.insert(0, f"Last integrity snapshot: {m_date} {m_time} (Identity: {m_id})") + + # Check if identity matches + if self.identity_hash and m_id != "Unknown" and self.identity_hash != m_id: + issues.append(f"Identity mismatch! Manifest belongs to: {m_id}") + self.issues = issues return len(issues) == 0, issues except Exception as e: @@ -89,9 +100,13 @@ class IntegrityManager: rel_path = str(full_path.relative_to(self.storage_dir)) files[rel_path] = self._hash_file(full_path) + now = datetime.now(UTC) manifest = { "version": 1, - "timestamp": datetime.now(UTC).timestamp(), + "timestamp": now.timestamp(), + "date": now.strftime("%Y-%m-%d"), + "time": now.strftime("%H:%M:%S"), + "identity": self.identity_hash, "files": files, } diff --git a/meshchatx/src/backend/persistent_log_handler.py b/meshchatx/src/backend/persistent_log_handler.py new file mode 100644 index 0000000..4c310e4 --- /dev/null +++ b/meshchatx/src/backend/persistent_log_handler.py @@ -0,0 +1,165 @@ +import collections +import logging +import threading +import time +from datetime import UTC, datetime + + +class PersistentLogHandler(logging.Handler): + def __init__(self, database=None, capacity=5000, flush_interval=5): + super().__init__() + self.database = database + self.logs_buffer = collections.deque(maxlen=capacity) + self.flush_interval = flush_interval + self.last_flush_time = time.time() + self.lock = threading.RLock() + self.flush_lock = threading.Lock() + + # Anomaly detection state + self.recent_messages = collections.deque(maxlen=100) + self.flooding_threshold = 20 # messages per second + self.repeat_threshold = 5 # identical messages in a row + self.message_counts = collections.defaultdict(int) + self.last_reset_time = time.time() + + def set_database(self, database): + with self.lock: + self.database = database + + def emit(self, record): + try: + msg = self.format(record) + timestamp = datetime.now(UTC).timestamp() + + is_anomaly, anomaly_type = self._detect_anomaly(record, msg, timestamp) + + log_entry = { + "timestamp": timestamp, + "level": record.levelname, + "module": record.module, + "message": msg, + "is_anomaly": 1 if is_anomaly else 0, + "anomaly_type": anomaly_type, + } + + with self.lock: + self.logs_buffer.append(log_entry) + + # Periodically flush to database if available + if self.database and (time.time() - self.last_flush_time > self.flush_interval): + self._flush_to_db() + + except Exception: + self.handleError(record) + + def _detect_anomaly(self, record, message, timestamp): + now = time.time() + + # 1. Detect Log Flooding + if now - self.last_reset_time > 1.0: + self.message_counts.clear() + self.last_reset_time = now + + self.message_counts["total"] += 1 + if self.message_counts["total"] > self.flooding_threshold: + return True, "flooding" + + # 2. Detect Repeats + if len(self.recent_messages) > 0: + repeat_count = 0 + for prev_msg in reversed(self.recent_messages): + if prev_msg == message: + repeat_count += 1 + else: + break + + if repeat_count >= self.repeat_threshold: + return True, "repeat" + + self.recent_messages.append(message) + return False, None + + def _flush_to_db(self): + if not self.database: + return + + # Ensure only one thread flushes at a time + if not self.flush_lock.acquire(blocking=False): + return + + try: + items_to_flush = [] + with self.lock: + while self.logs_buffer: + items_to_flush.append(self.logs_buffer.popleft()) + + if not items_to_flush: + return + + # Batch insert for speed + for entry in items_to_flush: + try: + self.database.debug_logs.insert_log( + level=entry["level"], + module=entry["module"], + message=entry["message"], + is_anomaly=entry["is_anomaly"], + anomaly_type=entry["anomaly_type"] + ) + except Exception as e: + print(f"Error inserting log: {e}") + + # Periodic cleanup of old logs (only every 100 flushes or similar? + # for now let's just keep it here but it should be fast) + try: + self.database.debug_logs.cleanup_old_logs() + except Exception as e: + print(f"Error cleaning up logs: {e}") + + self.last_flush_time = time.time() + except Exception as e: + print(f"Failed to flush logs to database: {e}") + finally: + self.flush_lock.release() + + def get_logs(self, limit=100, offset=0, search=None, level=None, module=None, is_anomaly=None): + if self.database: + # Flush current buffer first to ensure we have latest logs + self._flush_to_db() + + with self.lock: + if self.database: + return self.database.debug_logs.get_logs( + limit=limit, + offset=offset, + search=search, + level=level, + module=module, + is_anomaly=is_anomaly + ) + else: + # Fallback to in-memory buffer if DB not yet available + logs = list(self.logs_buffer) + if search: + logs = [l for l in logs if search.lower() in l["message"].lower() or search.lower() in l["module"].lower()] + if level: + logs = [l for l in logs if l["level"] == level] + if is_anomaly is not None: + logs = [l for l in logs if l["is_anomaly"] == (1 if is_anomaly else 0)] + + # Sort descending + logs.sort(key=lambda x: x["timestamp"], reverse=True) + return logs[offset : offset + limit] + + def get_total_count(self, search=None, level=None, module=None, is_anomaly=None): + with self.lock: + if self.database: + return self.database.debug_logs.get_total_count( + search=search, + level=level, + module=module, + is_anomaly=is_anomaly + ) + else: + return len(self.logs_buffer) +