From df306cc67b4f5331d5f0ea811e036d929177287f Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Wed, 7 Jan 2026 19:13:08 -0600 Subject: [PATCH] feat(telemetry): implement telemetry tracking and path tracing features - Added telemetry tracking capabilities, allowing users to toggle tracking for specific peers and retrieve tracked peers. - Introduced RNPathTraceHandler for tracing paths to destination hashes. - Enhanced database schema to support telemetry tracking and added related fields in contacts. - Updated configuration management to include telemetry settings. - Implemented API endpoints for downloading database backups and snapshots, as well as for telemetry-related functionalities. - Improved error handling and response messages for telemetry requests and path tracing. --- meshchatx/meshchat.py | 622 +++++++++++++++--- meshchatx/src/backend/config_manager.py | 20 + meshchatx/src/backend/database/contacts.py | 11 +- meshchatx/src/backend/database/schema.py | 24 +- meshchatx/src/backend/database/telemetry.py | 39 ++ meshchatx/src/backend/identity_context.py | 20 + meshchatx/src/backend/integrity_manager.py | 100 ++- meshchatx/src/backend/lxmf_utils.py | 108 ++- meshchatx/src/backend/meshchat_utils.py | 60 +- meshchatx/src/backend/rnpath_trace_handler.py | 100 +++ meshchatx/src/backend/telemetry_utils.py | 22 +- 11 files changed, 991 insertions(+), 135 deletions(-) create mode 100644 meshchatx/src/backend/rnpath_trace_handler.py diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index 867bab3..aa04c0b 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -26,6 +26,7 @@ import traceback import webbrowser from datetime import UTC, datetime, timedelta from logging.handlers import RotatingFileHandler +from urllib.parse import urlparse import aiohttp import bcrypt @@ -435,6 +436,17 @@ class ReticulumMeshChat: if self.current_context: self.current_context.rnpath_handler = value + @property + def rnpath_trace_handler(self): + return ( + self.current_context.rnpath_trace_handler if self.current_context else None + ) + + @rnpath_trace_handler.setter + def rnpath_trace_handler(self, value): + if self.current_context: + self.current_context.rnpath_trace_handler = value + @property def rnprobe_handler(self): return self.current_context.rnprobe_handler if self.current_context else None @@ -793,7 +805,6 @@ class ReticulumMeshChat: # Wait another moment for sockets to definitely be released by OS # Also give some time for the RPC listener port to settle print("Waiting for ports to settle...") - # We add a settle time here similar to Sideband's logic await asyncio.sleep(4) # Detect RPC type from reticulum instance if possible, otherwise default to both @@ -2292,6 +2303,64 @@ class ReticulumMeshChat: status=500, ) + @routes.get("/api/v1/database/backups/{filename}/download") + async def download_db_backup(request): + try: + filename = request.match_info.get("filename") + if not filename.endswith(".zip"): + filename += ".zip" + backup_dir = os.path.join(self.storage_dir, "database-backups") + full_path = os.path.join(backup_dir, filename) + + if not os.path.exists(full_path) or not full_path.startswith( + backup_dir, + ): + return web.json_response( + {"status": "error", "message": "Backup not found"}, + status=404, + ) + + return web.FileResponse( + path=full_path, + headers={ + "Content-Disposition": f'attachment; filename="{filename}"', + }, + ) + except Exception as e: + return web.json_response( + {"status": "error", "message": str(e)}, + status=500, + ) + + @routes.get("/api/v1/database/snapshots/{filename}/download") + async def download_db_snapshot(request): + try: + filename = request.match_info.get("filename") + if not filename.endswith(".zip"): + filename += ".zip" + snapshot_dir = os.path.join(self.storage_dir, "snapshots") + full_path = os.path.join(snapshot_dir, filename) + + if not os.path.exists(full_path) or not full_path.startswith( + snapshot_dir, + ): + return web.json_response( + {"status": "error", "message": "Snapshot not found"}, + status=404, + ) + + return web.FileResponse( + path=full_path, + headers={ + "Content-Disposition": f'attachment; filename="{filename}"', + }, + ) + except Exception as e: + return web.json_response( + {"status": "error", "message": str(e)}, + status=500, + ) + @routes.get("/api/v1/status") async def status(request): return web.json_response( @@ -3366,6 +3435,8 @@ class ReticulumMeshChat: # Get total paths total_paths = 0 + is_connected_to_shared_instance = False + shared_instance_address = None if hasattr(self, "reticulum") and self.reticulum: try: path_table = self.reticulum.get_path_table() @@ -3373,6 +3444,70 @@ class ReticulumMeshChat: except Exception: # noqa: S110 pass + is_connected_to_shared_instance = getattr( + self.reticulum, + "is_connected_to_shared_instance", + False, + ) + + if is_connected_to_shared_instance: + # Try to find the shared instance address from active connections + try: + for conn in process.connections(kind="all"): + if conn.status == psutil.CONN_ESTABLISHED and conn.raddr: + # Check for common Reticulum shared instance ports or UNIX sockets + if ( + isinstance(conn.raddr, tuple) + and conn.raddr[1] == 37428 + ): + shared_instance_address = ( + f"{conn.raddr[0]}:{conn.raddr[1]}" + ) + break + if ( + isinstance(conn.raddr, str) + and ( + "rns" in conn.raddr or "reticulum" in conn.raddr + ) + and ".sock" in conn.raddr + ): + shared_instance_address = conn.raddr + break + except Exception: # noqa: S110 + pass + + # Fallback to reading config if not found via connections + if not shared_instance_address: + try: + config_dir = getattr(self, "reticulum_config_dir", None) + if not config_dir: + config_dir = getattr( + RNS.Reticulum, + "configdir", + os.path.expanduser("~/.reticulum"), + ) + + config_path = os.path.join(config_dir, "config") + if os.path.isfile(config_path): + cp = configparser.ConfigParser() + cp.read(config_path) + if cp.has_section("reticulum"): + shared_port = cp.getint( + "reticulum", + "shared_instance_port", + fallback=37428, + ) + shared_bind = cp.get( + "reticulum", + "shared_instance_bind", + fallback="127.0.0.1", + ) + shared_instance_address = ( + f"{shared_bind}:{shared_port}" + ) + except Exception: # noqa: S110 + pass + # Calculate announce rates current_time = time.time() announces_per_second = len( @@ -3453,15 +3588,8 @@ class ReticulumMeshChat: if hasattr(self, "reticulum") and self.reticulum else None ), - "is_connected_to_shared_instance": ( - getattr( - self.reticulum, - "is_connected_to_shared_instance", - False, - ) - if hasattr(self, "reticulum") and self.reticulum - else False - ), + "is_connected_to_shared_instance": is_connected_to_shared_instance, + "shared_instance_address": shared_instance_address, "is_transport_enabled": ( self.reticulum.transport_enabled() if hasattr(self, "reticulum") and self.reticulum @@ -5245,6 +5373,7 @@ class ReticulumMeshChat: lxst_address = data.get("lxst_address") preferred_ringtone_id = data.get("preferred_ringtone_id") custom_image = data.get("custom_image") + is_telemetry_trusted = data.get("is_telemetry_trusted", 0) if not name: return web.json_response( @@ -5278,6 +5407,7 @@ class ReticulumMeshChat: lxst_address=lxst_address, preferred_ringtone_id=preferred_ringtone_id, custom_image=custom_image, + is_telemetry_trusted=is_telemetry_trusted, ) return web.json_response({"message": "Contact added"}) @@ -5292,6 +5422,7 @@ class ReticulumMeshChat: preferred_ringtone_id = data.get("preferred_ringtone_id") custom_image = data.get("custom_image") clear_image = data.get("clear_image", False) + is_telemetry_trusted = data.get("is_telemetry_trusted") self.database.contacts.update_contact( contact_id, @@ -5302,6 +5433,7 @@ class ReticulumMeshChat: preferred_ringtone_id=preferred_ringtone_id, custom_image=custom_image, clear_image=clear_image, + is_telemetry_trusted=is_telemetry_trusted, ) return web.json_response({"message": "Contact updated"}) @@ -6374,6 +6506,31 @@ class ReticulumMeshChat: except Exception as e: return web.json_response({"message": str(e)}, status=500) + @routes.get("/api/v1/rnpath/trace/{destination_hash}") + async def rnpath_trace(request): + destination_hash = request.match_info.get("destination_hash") + if not destination_hash: + return web.json_response( + {"error": "destination_hash is required"}, + status=400, + ) + try: + if not self.rnpath_trace_handler: + return web.json_response( + { + "error": "RNPathTraceHandler not initialized for current context", + }, + status=503, + ) + result = await self.rnpath_trace_handler.trace_path(destination_hash) + return web.json_response(result) + except Exception as e: + import traceback + + error_msg = f"Trace route failed: {e}\n{traceback.format_exc()}" + print(error_msg) + return web.json_response({"error": error_msg}, status=500) + @routes.post("/api/v1/rnprobe") async def rnprobe(request): data = await request.json() @@ -7219,6 +7376,9 @@ class ReticulumMeshChat: "contact_image": contact_image, "destination_hash": other_user_hash, "is_unread": is_unread, + "is_tracking": self.database.telemetry.is_tracking( + other_user_hash, + ), "failed_messages_count": row["failed_count"], "has_attachments": message_fields_have_attachments( row["fields"], @@ -7866,10 +8026,40 @@ class ReticulumMeshChat: if r["physical_link"] else None, "updated_at": r["updated_at"], + "is_tracking": self.database.telemetry.is_tracking( + r["destination_hash"], + ), }, ) return web.json_response({"telemetry": telemetry_list}) + @routes.get("/api/v1/telemetry/trusted-peers") + async def telemetry_trusted_peers_get(request): + # get all contacts that are telemetry trusted + contacts = self.database.provider.fetchall( + "SELECT * FROM contacts WHERE is_telemetry_trusted = 1 ORDER BY name ASC", + ) + return web.json_response({"trusted_peers": [dict(c) for c in contacts]}) + + # toggle telemetry tracking for a destination + @routes.post("/api/v1/telemetry/tracking/{destination_hash}/toggle") + async def toggle_telemetry_tracking(request): + destination_hash = request.match_info["destination_hash"] + data = await request.json() + is_tracking = data.get("is_tracking") + + new_status = self.database.telemetry.toggle_tracking( + destination_hash, + is_tracking, + ) + return web.json_response({"status": "ok", "is_tracking": new_status}) + + # get all tracked peers + @routes.get("/api/v1/telemetry/tracking") + async def get_tracked_peers(request): + results = self.database.telemetry.get_tracked_peers() + return web.json_response({"tracked_peers": results}) + # get telemetry history for a destination @routes.get("/api/v1/telemetry/history/{destination_hash}") async def get_telemetry_history(request): @@ -8088,10 +8278,7 @@ class ReticulumMeshChat: response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" - # CSP: allow localhost for development and Electron, websockets, and blob URLs - # Add 'unsafe-inline' and 'unsafe-eval' for some legacy doc scripts if needed, - # and allow framing ourselves for the docs page. - gitea_url = "https://git.quad4.io" + # CSP base configuration connect_sources = [ "'self'", "ws://localhost:*", @@ -8100,13 +8287,47 @@ class ReticulumMeshChat: "https://*.tile.openstreetmap.org", "https://tile.openstreetmap.org", "https://nominatim.openstreetmap.org", + "https://*.cartocdn.com", ] + img_sources = [ + "'self'", + "data:", + "blob:", + "https://*.tile.openstreetmap.org", + "https://tile.openstreetmap.org", + "https://*.cartocdn.com", + ] + + frame_sources = [ + "'self'", + "https://reticulum.network", + ] + + script_sources = ["'self'", "'unsafe-inline'", "'unsafe-eval'"] + style_sources = ["'self'", "'unsafe-inline'"] + if self.current_context and self.current_context.config: + # Helper to add domain from URL + def add_domain_from_url(url, target_list): + if not url: + return None + try: + parsed = urlparse(url) + if parsed.netloc: + domain = f"{parsed.scheme}://{parsed.netloc}" + if domain not in target_list: + target_list.append(domain) + return domain + except Exception: # noqa: S110 + pass + return None + # Add configured Gitea base URL - gitea_url = self.current_context.config.gitea_base_url.get() - if gitea_url not in connect_sources: - connect_sources.append(gitea_url) + add_domain_from_url( + self.current_context.config.gitea_base_url.get(), + connect_sources, + ) # Add configured docs download URLs domains docs_urls_str = self.current_context.config.docs_download_urls.get() @@ -8116,33 +8337,67 @@ class ReticulumMeshChat: if u.strip() ] for url in docs_urls: - try: - from urllib.parse import urlparse + domain = add_domain_from_url(url, connect_sources) + if domain and "github.com" in domain: + content_domain = "https://objects.githubusercontent.com" + if content_domain not in connect_sources: + connect_sources.append(content_domain) - parsed = urlparse(url) - if parsed.netloc: - domain = f"{parsed.scheme}://{parsed.netloc}" - if domain not in connect_sources: - connect_sources.append(domain) + # Add map tile server domain + map_tile_url = self.current_context.config.map_tile_server_url.get() + add_domain_from_url(map_tile_url, img_sources) + add_domain_from_url(map_tile_url, connect_sources) - # If GitHub is used, also allow objects.githubusercontent.com for redirects - if "github.com" in domain: - content_domain = "https://objects.githubusercontent.com" - if content_domain not in connect_sources: - connect_sources.append(content_domain) - except Exception: # noqa: S110 - pass + # Add nominatim API domain + nominatim_url = self.current_context.config.map_nominatim_api_url.get() + add_domain_from_url(nominatim_url, connect_sources) + + # Add custom CSP sources from config + def add_extra_sources(extra_str, target_list): + if not extra_str: + return + sources = [ + s.strip() + for s in extra_str.replace("\n", ",") + .replace(";", ",") + .split(",") + if s.strip() + ] + for s in sources: + if s not in target_list: + target_list.append(s) + + add_extra_sources( + self.current_context.config.csp_extra_connect_src.get(), + connect_sources, + ) + add_extra_sources( + self.current_context.config.csp_extra_img_src.get(), + img_sources, + ) + add_extra_sources( + self.current_context.config.csp_extra_frame_src.get(), + frame_sources, + ) + add_extra_sources( + self.current_context.config.csp_extra_script_src.get(), + script_sources, + ) + add_extra_sources( + self.current_context.config.csp_extra_style_src.get(), + style_sources, + ) csp = ( "default-src 'self'; " - "script-src 'self' 'unsafe-inline' 'unsafe-eval'; " - "style-src 'self' 'unsafe-inline'; " - "img-src 'self' data: blob: https://*.tile.openstreetmap.org https://tile.openstreetmap.org; " + f"script-src {' '.join(script_sources)}; " + f"style-src {' '.join(style_sources)}; " + f"img-src {' '.join(img_sources)}; " "font-src 'self' data:; " f"connect-src {' '.join(connect_sources)}; " "media-src 'self' blob:; " "worker-src 'self' blob:; " - "frame-src 'self' https://reticulum.network; " + f"frame-src {' '.join(frame_sources)}; " "object-src 'none'; " "base-uri 'self';" ) @@ -8329,6 +8584,47 @@ class ReticulumMeshChat: # Sleep for 12 hours await asyncio.sleep(12 * 3600) + async def telemetry_tracking_loop(self, session_id, context=None): + ctx = context or self.current_context + if not ctx: + return + + while self.running and ctx.running and ctx.session_id == session_id: + try: + # Only run if telemetry is enabled globally + if not ctx.config.telemetry_enabled.get(): + await asyncio.sleep(60) + continue + + # Get all tracked peers + tracked_peers = ctx.database.telemetry.get_tracked_peers() + now = time.time() + + for peer in tracked_peers: + dest_hash = peer["destination_hash"] + interval = peer.get("interval_seconds", 60) + last_req = peer.get("last_request_at") + + if last_req is None or now - last_req >= interval: + print(f"Sending telemetry request to tracked peer: {dest_hash}") + # Send telemetry request + await self.send_message( + destination_hash=dest_hash, + content="", + commands=[{SidebandCommands.TELEMETRY_REQUEST: 0}], + delivery_method="opportunistic", + no_display=False, + context=ctx, + ) + # Update last request time + ctx.database.telemetry.update_last_request_at(dest_hash, now) + + except Exception as e: + print(f"Telemetry tracking loop error: {e}") + + # Check every 10 seconds + await asyncio.sleep(10) + # handle announcing async def announce(self, context=None): ctx = context or self.current_context @@ -8590,6 +8886,24 @@ class ReticulumMeshChat: if "map_nominatim_api_url" in data: self.config.map_nominatim_api_url.set(data["map_nominatim_api_url"]) + # update location settings + if "location_source" in data: + self.config.location_source.set(data["location_source"]) + + if "location_manual_lat" in data: + self.config.location_manual_lat.set(str(data["location_manual_lat"])) + + if "location_manual_lon" in data: + self.config.location_manual_lon.set(str(data["location_manual_lon"])) + + if "location_manual_alt" in data: + self.config.location_manual_alt.set(str(data["location_manual_alt"])) + + if "telemetry_enabled" in data: + self.config.telemetry_enabled.set( + self._parse_bool(data["telemetry_enabled"]) + ) + # update banishment settings if "banished_effect_enabled" in data: self.config.banished_effect_enabled.set( @@ -8644,6 +8958,18 @@ class ReticulumMeshChat: value = self._parse_bool(data["blackhole_integration_enabled"]) self.config.blackhole_integration_enabled.set(value) + # update csp extra sources + if "csp_extra_connect_src" in data: + self.config.csp_extra_connect_src.set(data["csp_extra_connect_src"]) + if "csp_extra_img_src" in data: + self.config.csp_extra_img_src.set(data["csp_extra_img_src"]) + if "csp_extra_frame_src" in data: + self.config.csp_extra_frame_src.set(data["csp_extra_frame_src"]) + if "csp_extra_script_src" in data: + self.config.csp_extra_script_src.set(data["csp_extra_script_src"]) + if "csp_extra_style_src" in data: + self.config.csp_extra_style_src.set(data["csp_extra_style_src"]) + # update voicemail settings if "voicemail_enabled" in data: self.config.voicemail_enabled.set( @@ -9604,8 +9930,18 @@ class ReticulumMeshChat: "desktop_open_calls_in_separate_window": ctx.config.desktop_open_calls_in_separate_window.get(), "desktop_hardware_acceleration_enabled": ctx.config.desktop_hardware_acceleration_enabled.get(), "blackhole_integration_enabled": ctx.config.blackhole_integration_enabled.get(), + "csp_extra_connect_src": ctx.config.csp_extra_connect_src.get(), + "csp_extra_img_src": ctx.config.csp_extra_img_src.get(), + "csp_extra_frame_src": ctx.config.csp_extra_frame_src.get(), + "csp_extra_script_src": ctx.config.csp_extra_script_src.get(), + "csp_extra_style_src": ctx.config.csp_extra_style_src.get(), "telephone_tone_generator_enabled": ctx.config.telephone_tone_generator_enabled.get(), "telephone_tone_generator_volume": ctx.config.telephone_tone_generator_volume.get(), + "location_source": ctx.config.location_source.get(), + "location_manual_lat": ctx.config.location_manual_lat.get(), + "location_manual_lon": ctx.config.location_manual_lon.get(), + "location_manual_alt": ctx.config.location_manual_alt.get(), + "telemetry_enabled": ctx.config.telemetry_enabled.get(), } # try and get a name for the provided identity hash @@ -9938,18 +10274,82 @@ class ReticulumMeshChat: print(f"Rejecting LXMF message from blocked source: {source_hash}") return - # check if this lxmf message contains a telemetry request command from sideband is_sideband_telemetry_request = False lxmf_fields = lxmf_message.get_fields() + + # check both standard LXMF.FIELD_COMMANDS (9) and FIELD_COMMANDS (1) + commands = [] if LXMF.FIELD_COMMANDS in lxmf_fields: - for command in lxmf_fields[LXMF.FIELD_COMMANDS]: - if SidebandCommands.TELEMETRY_REQUEST in command: + val = lxmf_fields[LXMF.FIELD_COMMANDS] + if isinstance(val, list): + commands.extend(val) + elif isinstance(val, dict): + commands.append(val) + if 0x01 in lxmf_fields and 0x01 != LXMF.FIELD_COMMANDS: + val = lxmf_fields[0x01] + if isinstance(val, list): + commands.extend(val) + elif isinstance(val, dict): + commands.append(val) + + if commands: + for command in commands: + if ( + ( + isinstance(command, dict) + and ( + SidebandCommands.TELEMETRY_REQUEST in command + or str(SidebandCommands.TELEMETRY_REQUEST) in command + or f"0x{SidebandCommands.TELEMETRY_REQUEST:02x}" + in command + ) + ) + or ( + isinstance(command, (list, tuple)) + and SidebandCommands.TELEMETRY_REQUEST in command + ) + or command == SidebandCommands.TELEMETRY_REQUEST + or str(command) == str(SidebandCommands.TELEMETRY_REQUEST) + ): is_sideband_telemetry_request = True - # respond to telemetry requests from sideband + # respond to telemetry requests if is_sideband_telemetry_request: - print(f"Responding to telemetry request from {source_hash}") - self.handle_telemetry_request(source_hash) + # Check if telemetry is enabled globally + if not ctx.config.telemetry_enabled.get(): + print(f"Telemetry is disabled, ignoring request from {source_hash}") + else: + # Check if peer is trusted + contact = ctx.database.contacts.get_contact_by_identity_hash( + source_hash + ) + if not contact or not contact.get("is_telemetry_trusted"): + print( + f"Telemetry request from untrusted peer {source_hash}, ignoring" + ) + else: + print(f"Responding to telemetry request from {source_hash}") + self.handle_telemetry_request(source_hash) + + self.db_upsert_lxmf_message(lxmf_message, context=ctx) + + # broadcast notification + AsyncUtils.run_async( + self.websocket_broadcast( + json.dumps( + { + "type": "lxmf.delivery", + "remote_identity_name": source_hash[:8], + "lxmf_message": convert_db_lxmf_message_to_dict( + ctx.database.messages.get_lxmf_message_by_hash( + lxmf_message.hash.hex() + ), + include_attachments=False, + ), + }, + ), + ), + ) return # check for spam keywords @@ -9989,47 +10389,39 @@ class ReticulumMeshChat: # handle telemetry try: message_fields = lxmf_message.get_fields() + + # Single telemetry entry if LXMF.FIELD_TELEMETRY in message_fields: - telemetry_data = message_fields[LXMF.FIELD_TELEMETRY] - # unpack to get timestamp - unpacked = Telemeter.from_packed(telemetry_data) - if unpacked and "time" in unpacked: - timestamp = unpacked["time"]["utc"] + self.process_incoming_telemetry( + source_hash, + message_fields[LXMF.FIELD_TELEMETRY], + lxmf_message, + context=ctx, + ) - # physical link info - physical_link = { - "rssi": self.reticulum.get_packet_rssi(lxmf_message.hash) - if hasattr(self, "reticulum") and self.reticulum - else None, - "snr": self.reticulum.get_packet_snr(lxmf_message.hash) - if hasattr(self, "reticulum") and self.reticulum - else None, - "q": self.reticulum.get_packet_q(lxmf_message.hash) - if hasattr(self, "reticulum") and self.reticulum - else None, - } - - ctx.database.telemetry.upsert_telemetry( - destination_hash=source_hash, - timestamp=timestamp, - data=telemetry_data, - received_from=ctx.local_lxmf_destination.hexhash, - physical_link=physical_link, - ) - - # broadcast telemetry update via websocket - AsyncUtils.run_async( - self.websocket_broadcast( - json.dumps( - { - "type": "lxmf.telemetry", - "destination_hash": source_hash, - "timestamp": timestamp, - "telemetry": unpacked, - }, - ), - ), - ) + # Telemetry stream (multiple entries) + if ( + hasattr(LXMF, "FIELD_TELEMETRY_STREAM") + and LXMF.FIELD_TELEMETRY_STREAM in message_fields + ): + stream = message_fields[LXMF.FIELD_TELEMETRY_STREAM] + if isinstance(stream, (list, tuple)): + for entry in stream: + if isinstance(entry, (list, tuple)) and len(entry) >= 3: + entry_source = ( + entry[0].hex() + if isinstance(entry[0], bytes) + else entry[0] + ) + entry_timestamp = entry[1] + entry_data = entry[2] + self.process_incoming_telemetry( + entry_source, + entry_data, + lxmf_message, + timestamp_override=entry_timestamp, + context=ctx, + ) except Exception as e: print(f"Failed to handle telemetry in LXMF message: {e}") @@ -10050,9 +10442,7 @@ class ReticulumMeshChat: source_hash = lxmf_message.source_hash.hex() # ignore our own icon and empty payloads to avoid overwriting peers with our appearance - if source_hash and local_hash and source_hash == local_hash: - pass - elif ( + if (source_hash and local_hash and source_hash == local_hash) or ( not icon_name or not foreground_colour or not background_colour ): pass @@ -10560,6 +10950,66 @@ class ReticulumMeshChat: data = f"{name}|{fg}|{bg}" return hashlib.sha256(data.encode()).hexdigest() + def process_incoming_telemetry( + self, + source_hash, + telemetry_data, + lxmf_message, + timestamp_override=None, + context=None, + ): + ctx = context or self.current_context + if not ctx: + return + + try: + unpacked = Telemeter.from_packed(telemetry_data) + if unpacked: + timestamp = timestamp_override or ( + unpacked["time"]["utc"] if "time" in unpacked else int(time.time()) + ) + + # physical link info + physical_link = { + "rssi": self.reticulum.get_packet_rssi(lxmf_message.hash) + if hasattr(self, "reticulum") and self.reticulum + else None, + "snr": self.reticulum.get_packet_snr(lxmf_message.hash) + if hasattr(self, "reticulum") and self.reticulum + else None, + "q": self.reticulum.get_packet_q(lxmf_message.hash) + if hasattr(self, "reticulum") and self.reticulum + else None, + } + + ctx.database.telemetry.upsert_telemetry( + destination_hash=source_hash, + timestamp=timestamp, + data=telemetry_data, + received_from=ctx.local_lxmf_destination.hexhash, + physical_link=physical_link, + ) + + # broadcast telemetry update via websocket + AsyncUtils.run_async( + self.websocket_broadcast( + json.dumps( + { + "type": "lxmf.telemetry", + "destination_hash": source_hash, + "timestamp": timestamp, + "telemetry": unpacked, + "physical_link": physical_link, + "is_tracking": ctx.database.telemetry.is_tracking( + source_hash, + ), + }, + ), + ), + ) + except Exception as e: + print(f"Error processing incoming telemetry: {e}") + def handle_telemetry_request(self, to_addr_hash: str): # get our location from config lat = self.database.config.get("map_default_lat") @@ -10584,15 +11034,13 @@ class ReticulumMeshChat: telemetry_data = Telemeter.pack(location=location) - # send as an LXMF message with no content, only telemetry field - # use no_display=True to avoid showing in chat UI AsyncUtils.run_async( self.send_message( destination_hash=to_addr_hash, content="", telemetry_data=telemetry_data, delivery_method="opportunistic", - no_display=True, + no_display=False, ), ) except Exception as e: @@ -10672,7 +11120,7 @@ class ReticulumMeshChat: f" ({display_name})" if ( display_name := parse_lxmf_display_name( - base64.b64encode(app_data).decode() if app_data else None, + app_data, None, ) ) diff --git a/meshchatx/src/backend/config_manager.py b/meshchatx/src/backend/config_manager.py index b5803f3..77371ad 100644 --- a/meshchatx/src/backend/config_manager.py +++ b/meshchatx/src/backend/config_manager.py @@ -245,6 +245,9 @@ class ConfigManager: "https://nominatim.openstreetmap.org", ) + # telemetry config + self.telemetry_enabled = self.BoolConfig(self, "telemetry_enabled", False) + # translator config self.translator_enabled = self.BoolConfig(self, "translator_enabled", False) self.libretranslate_url = self.StringConfig( @@ -253,6 +256,12 @@ class ConfigManager: "http://localhost:5000", ) + # location config + self.location_source = self.StringConfig(self, "location_source", "browser") + self.location_manual_lat = self.StringConfig(self, "location_manual_lat", "0.0") + self.location_manual_lon = self.StringConfig(self, "location_manual_lon", "0.0") + self.location_manual_alt = self.StringConfig(self, "location_manual_alt", "0.0") + # banishment config self.banished_effect_enabled = self.BoolConfig( self, @@ -279,6 +288,17 @@ class ConfigManager: True, ) + # csp config so users can set extra CSP sources for local offgrid environments (tile servers, etc.) + self.csp_extra_connect_src = self.StringConfig( + self, + "csp_extra_connect_src", + "", + ) + self.csp_extra_img_src = self.StringConfig(self, "csp_extra_img_src", "") + self.csp_extra_frame_src = self.StringConfig(self, "csp_extra_frame_src", "") + self.csp_extra_script_src = self.StringConfig(self, "csp_extra_script_src", "") + self.csp_extra_style_src = self.StringConfig(self, "csp_extra_style_src", "") + def get(self, key: str, default_value=None) -> str | None: return self.db.config.get(key, default_value) diff --git a/meshchatx/src/backend/database/contacts.py b/meshchatx/src/backend/database/contacts.py index a184388..af616e4 100644 --- a/meshchatx/src/backend/database/contacts.py +++ b/meshchatx/src/backend/database/contacts.py @@ -13,17 +13,19 @@ class ContactsDAO: lxst_address=None, preferred_ringtone_id=None, custom_image=None, + is_telemetry_trusted=0, ): self.provider.execute( """ - INSERT INTO contacts (name, remote_identity_hash, lxmf_address, lxst_address, preferred_ringtone_id, custom_image) - VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO contacts (name, remote_identity_hash, lxmf_address, lxst_address, preferred_ringtone_id, custom_image, is_telemetry_trusted) + VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(remote_identity_hash) DO UPDATE SET name = EXCLUDED.name, lxmf_address = COALESCE(EXCLUDED.lxmf_address, contacts.lxmf_address), lxst_address = COALESCE(EXCLUDED.lxst_address, contacts.lxst_address), preferred_ringtone_id = EXCLUDED.preferred_ringtone_id, custom_image = EXCLUDED.custom_image, + is_telemetry_trusted = EXCLUDED.is_telemetry_trusted, updated_at = CURRENT_TIMESTAMP """, ( @@ -33,6 +35,7 @@ class ContactsDAO: lxst_address, preferred_ringtone_id, custom_image, + is_telemetry_trusted, ), ) @@ -74,6 +77,7 @@ class ContactsDAO: preferred_ringtone_id=None, custom_image=None, clear_image=False, + is_telemetry_trusted=None, ): updates = [] params = [] @@ -93,6 +97,9 @@ class ContactsDAO: if preferred_ringtone_id is not None: updates.append("preferred_ringtone_id = ?") params.append(preferred_ringtone_id) + if is_telemetry_trusted is not None: + updates.append("is_telemetry_trusted = ?") + params.append(1 if is_telemetry_trusted else 0) if clear_image: updates.append("custom_image = NULL") elif custom_image is not None: diff --git a/meshchatx/src/backend/database/schema.py b/meshchatx/src/backend/database/schema.py index 4a551cd..ddf9bf5 100644 --- a/meshchatx/src/backend/database/schema.py +++ b/meshchatx/src/backend/database/schema.py @@ -2,7 +2,7 @@ from .provider import DatabaseProvider class DatabaseSchema: - LATEST_VERSION = 36 + LATEST_VERSION = 37 def __init__(self, provider: DatabaseProvider): self.provider = provider @@ -348,6 +348,17 @@ class DatabaseSchema: UNIQUE(destination_hash, timestamp) ) """, + "telemetry_tracking": """ + CREATE TABLE IF NOT EXISTS telemetry_tracking ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + destination_hash TEXT UNIQUE, + is_tracking INTEGER DEFAULT 1, + interval_seconds INTEGER DEFAULT 60, + last_request_at REAL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """, "ringtones": """ CREATE TABLE IF NOT EXISTS ringtones ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -977,6 +988,17 @@ class DatabaseSchema: "CREATE INDEX IF NOT EXISTS idx_lxmf_conversation_folders_folder_id ON lxmf_conversation_folders(folder_id)", ) + if current_version < 37: + # Add is_telemetry_trusted to contacts + self._safe_execute( + "ALTER TABLE contacts ADD COLUMN is_telemetry_trusted INTEGER DEFAULT 0", + ) + # Ensure telemetry_enabled exists in config and is false by default + self._safe_execute( + "INSERT OR IGNORE INTO config (key, value) VALUES (?, ?)", + ("telemetry_enabled", "false"), + ) + # Update version in config self._safe_execute( """ diff --git a/meshchatx/src/backend/database/telemetry.py b/meshchatx/src/backend/database/telemetry.py index 711a925..c96fb87 100644 --- a/meshchatx/src/backend/database/telemetry.py +++ b/meshchatx/src/backend/database/telemetry.py @@ -65,3 +65,42 @@ class TelemetryDAO: "DELETE FROM lxmf_telemetry WHERE destination_hash = ?", (destination_hash,), ) + + def is_tracking(self, destination_hash): + row = self.provider.fetchone( + "SELECT is_tracking FROM telemetry_tracking WHERE destination_hash = ?", + (destination_hash,), + ) + return bool(row["is_tracking"]) if row else False + + def toggle_tracking(self, destination_hash, is_tracking=None): + if is_tracking is None: + is_tracking = not self.is_tracking(destination_hash) + + now = datetime.now(UTC).isoformat() + self.provider.execute( + """ + INSERT INTO telemetry_tracking (destination_hash, is_tracking, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(destination_hash) DO UPDATE SET + is_tracking = EXCLUDED.is_tracking, + updated_at = EXCLUDED.updated_at + """, + (destination_hash, int(is_tracking), now), + ) + return is_tracking + + def get_tracked_peers(self): + return self.provider.fetchall( + "SELECT * FROM telemetry_tracking WHERE is_tracking = 1", + ) + + def update_last_request_at(self, destination_hash, timestamp=None): + if timestamp is None: + import time + + timestamp = time.time() + self.provider.execute( + "UPDATE telemetry_tracking SET last_request_at = ? WHERE destination_hash = ?", + (timestamp, destination_hash), + ) diff --git a/meshchatx/src/backend/identity_context.py b/meshchatx/src/backend/identity_context.py index 8d50086..53310d0 100644 --- a/meshchatx/src/backend/identity_context.py +++ b/meshchatx/src/backend/identity_context.py @@ -21,6 +21,7 @@ from meshchatx.src.backend.nomadnet_utils import NomadNetworkManager from meshchatx.src.backend.ringtone_manager import RingtoneManager from meshchatx.src.backend.rncp_handler import RNCPHandler from meshchatx.src.backend.rnpath_handler import RNPathHandler +from meshchatx.src.backend.rnpath_trace_handler import RNPathTraceHandler from meshchatx.src.backend.rnprobe_handler import RNProbeHandler from meshchatx.src.backend.rnstatus_handler import RNStatusHandler from meshchatx.src.backend.telephone_manager import TelephoneManager @@ -72,6 +73,8 @@ class IdentityContext: self.ringtone_manager = None self.rncp_handler = None self.rnstatus_handler = None + self.rnpath_handler = None + self.rnpath_trace_handler = None self.rnprobe_handler = None self.translator_handler = None self.bot_handler = None @@ -228,6 +231,10 @@ class IdentityContext: self.rnpath_handler = RNPathHandler( reticulum_instance=getattr(self.app, "reticulum", None), ) + self.rnpath_trace_handler = RNPathTraceHandler( + reticulum_instance=getattr(self.app, "reticulum", None), + identity=self.identity, + ) self.rnprobe_handler = RNProbeHandler( reticulum_instance=getattr(self.app, "reticulum", None), identity=self.identity, @@ -360,6 +367,14 @@ class IdentityContext: thread.daemon = True thread.start() + # start background thread for telemetry tracking loop + thread = threading.Thread( + target=asyncio.run, + args=(self.app.telemetry_tracking_loop(self.session_id, context=self),), + ) + thread.daemon = True + thread.start() + def register_announce_handlers(self): handlers = [ AnnounceHandler( @@ -468,6 +483,11 @@ class IdentityContext: self.message_router.jobs = lambda: None if hasattr(self.message_router, "exit_handler"): self.message_router.exit_handler() + + # Give LXMF/RNS a moment to finish any final disk writes + import time + + time.sleep(1.0) except Exception as e: print( f"Error while tearing down LXMRouter for {self.identity_hash}: {e}", diff --git a/meshchatx/src/backend/integrity_manager.py b/meshchatx/src/backend/integrity_manager.py index 041446d..a6a7190 100644 --- a/meshchatx/src/backend/integrity_manager.py +++ b/meshchatx/src/backend/integrity_manager.py @@ -1,3 +1,4 @@ +import fnmatch import hashlib import json import os @@ -8,6 +9,21 @@ from pathlib import Path class IntegrityManager: """Manages the integrity of the database and identity files at rest.""" + # Files and directories that are frequently modified by RNS/LXMF or SQLite + # and should be ignored during integrity checks. + IGNORED_PATTERNS = [ + "*-wal", + "*-shm", + "*-journal", + "*.tmp", + "*.lock", + "*.log", + "*~", + ".DS_Store", + "Thumbs.db", + "integrity-manifest.json", + ] + def __init__(self, storage_dir, database_path, identity_hash=None): self.storage_dir = Path(storage_dir) self.database_path = Path(database_path) @@ -15,6 +31,35 @@ class IntegrityManager: self.manifest_path = self.storage_dir / "integrity-manifest.json" self.issues = [] + def _should_ignore(self, rel_path): + """Determine if a file path should be ignored based on name or directory.""" + path = Path(rel_path) + path_parts = path.parts + + # Check for volatile LXMF/RNS directories + # We only ignore these if they are inside the lxmf_router directory + # to avoid accidentally ignoring important files with similar names. + if "lxmf_router" in path_parts: + if any( + part in ["announces", "storage", "identities"] for part in path_parts + ): + return True + + # Check for other generally ignored directories + if any( + part in ["tmp", "recordings", "greetings", "docs", "bots", "ringtones"] + for part in path_parts + ): + return True + + filename = path_parts[-1] + + # Check against IGNORED_PATTERNS + if any(fnmatch.fnmatch(filename, pattern) for pattern in self.IGNORED_PATTERNS): + return True + + return False + def _hash_file(self, file_path): if not os.path.exists(file_path): return None @@ -34,37 +79,47 @@ class IntegrityManager: manifest = json.load(f) issues = [] + manifest_files = manifest.get("files", {}) # Check Database - db_rel = str(self.database_path.relative_to(self.storage_dir)) - actual_db_hash = self._hash_file(self.database_path) - if actual_db_hash and actual_db_hash != manifest.get("files", {}).get( - db_rel, - ): - issues.append(f"Database modified: {db_rel}") + if self.database_path.exists(): + db_rel = str(self.database_path.relative_to(self.storage_dir)) + actual_db_hash = self._hash_file(self.database_path) + if actual_db_hash and actual_db_hash != manifest_files.get(db_rel): + issues.append(f"Database modified: {db_rel}") - # Check Identities and other critical files in storage_dir + # Check other critical files in storage_dir for root, _, files_in_dir in os.walk(self.storage_dir): for file in files_in_dir: full_path = Path(root) / file - # Skip the manifest itself and temporary sqlite files - if ( - file == "integrity-manifest.json" - or file.endswith("-wal") - or file.endswith("-shm") - ): + rel_path = str(full_path.relative_to(self.storage_dir)) + + if self._should_ignore(rel_path): + continue + + # Database already checked separately, skip here to avoid double reporting + if full_path == self.database_path: continue - rel_path = str(full_path.relative_to(self.storage_dir)) actual_hash = self._hash_file(full_path) - if rel_path in manifest.get("files", {}): - if actual_hash != manifest["files"][rel_path]: + if rel_path in manifest_files: + if actual_hash != manifest_files[rel_path]: issues.append(f"File modified: {rel_path}") else: # New files are also a concern for integrity + # but we only report them if they are not in ignored dirs/patterns issues.append(f"New file detected: {rel_path}") + # Check for missing files that were in manifest + for rel_path in manifest_files: + if self._should_ignore(rel_path): + continue + + full_path = self.storage_dir / rel_path + if not full_path.exists(): + issues.append(f"File missing: {rel_path}") + if issues: m_date = manifest.get("date", "Unknown") m_time = manifest.get("time", "Unknown") @@ -85,6 +140,9 @@ class IntegrityManager: self.issues = issues return len(issues) == 0, issues except Exception as e: + import traceback + + traceback.print_exc() return False, [f"Integrity check failed: {e!s}"] def save_manifest(self): @@ -96,15 +154,11 @@ class IntegrityManager: for root, _, files_in_dir in os.walk(self.storage_dir): for file in files_in_dir: full_path = Path(root) / file - # Skip the manifest itself and temporary sqlite files - if ( - file == "integrity-manifest.json" - or file.endswith("-wal") - or file.endswith("-shm") - ): + rel_path = str(full_path.relative_to(self.storage_dir)) + + if self._should_ignore(rel_path): continue - rel_path = str(full_path.relative_to(self.storage_dir)) files[rel_path] = self._hash_file(full_path) now = datetime.now(UTC) diff --git a/meshchatx/src/backend/lxmf_utils.py b/meshchatx/src/backend/lxmf_utils.py index 9153d75..6c2398f 100644 --- a/meshchatx/src/backend/lxmf_utils.py +++ b/meshchatx/src/backend/lxmf_utils.py @@ -23,15 +23,17 @@ def convert_lxmf_message_to_dict( file_attachments = [] for file_attachment in value: file_name = file_attachment[0] + file_data = file_attachment[1] file_bytes = None if include_attachments: - file_bytes = base64.b64encode(file_attachment[1]).decode( + file_bytes = base64.b64encode(file_data).decode( "utf-8", ) file_attachments.append( { "file_name": file_name, + "file_size": len(file_data), "file_bytes": file_bytes, }, ) @@ -42,24 +44,28 @@ def convert_lxmf_message_to_dict( # handle image field if field_type == LXMF.FIELD_IMAGE: image_type = value[0] + image_data = value[1] image_bytes = None if include_attachments: - image_bytes = base64.b64encode(value[1]).decode("utf-8") + image_bytes = base64.b64encode(image_data).decode("utf-8") fields["image"] = { "image_type": image_type, + "image_size": len(image_data), "image_bytes": image_bytes, } # handle audio field if field_type == LXMF.FIELD_AUDIO: audio_mode = value[0] + audio_data = value[1] audio_bytes = None if include_attachments: - audio_bytes = base64.b64encode(value[1]).decode("utf-8") + audio_bytes = base64.b64encode(audio_data).decode("utf-8") fields["audio"] = { "audio_mode": audio_mode, + "audio_size": len(audio_data), "audio_bytes": audio_bytes, } @@ -67,6 +73,59 @@ def convert_lxmf_message_to_dict( if field_type == LXMF.FIELD_TELEMETRY: fields["telemetry"] = Telemeter.from_packed(value) + # handle commands field + if field_type == LXMF.FIELD_COMMANDS or field_type == 0x01: + # value is usually a list of dicts, or a single dict + if isinstance(value, dict): + # convert dict keys back to ints if they look like hex or int strings + new_cmd = {} + for k, v in value.items(): + try: + ki = None + if isinstance(k, int): + ki = k + elif isinstance(k, str): + if k.startswith("0x"): + ki = int(k, 16) + else: + ki = int(k) + + if ki is not None: + new_cmd[f"0x{ki:02x}"] = v + else: + new_cmd[str(k)] = v + except (ValueError, TypeError): + new_cmd[str(k)] = v + fields["commands"] = [new_cmd] + elif isinstance(value, list): + processed_commands = [] + for cmd in value: + if isinstance(cmd, dict): + new_cmd = {} + for k, v in cmd.items(): + try: + ki = None + if isinstance(k, int): + ki = k + elif isinstance(k, str): + if k.startswith("0x"): + ki = int(k, 16) + else: + ki = int(k) + + if ki is not None: + new_cmd[f"0x{ki:02x}"] = v + else: + new_cmd[str(k)] = v + except (ValueError, TypeError): + new_cmd[str(k)] = v + processed_commands.append(new_cmd) + else: + processed_commands.append(cmd) + fields["commands"] = processed_commands + else: + fields["commands"] = value + # convert 0.0-1.0 progress to 0.00-100 percentage progress_percentage = round(lxmf_message.progress * 100, 2) @@ -164,13 +223,44 @@ def convert_db_lxmf_message_to_dict( if not isinstance(fields, dict): fields = {} + # normalize commands if present + if "commands" in fields: + cmds = fields["commands"] + if isinstance(cmds, list): + new_cmds = [] + for cmd in cmds: + if isinstance(cmd, dict): + new_cmd = {} + for k, v in cmd.items(): + # normalize key to 0xXX format if it's a number string + try: + ki = None + if isinstance(k, int): + ki = k + elif isinstance(k, str): + if k.startswith("0x"): + ki = int(k, 16) + else: + ki = int(k) + + if ki is not None: + new_cmd[f"0x{ki:02x}"] = v + else: + new_cmd[str(k)] = v + except (ValueError, TypeError): + new_cmd[str(k)] = v + new_cmds.append(new_cmd) + else: + new_cmds.append(cmd) + fields["commands"] = new_cmds + # strip attachments if requested if not include_attachments: if "image" in fields: # keep type but strip bytes - image_size = 0 + image_size = fields["image"].get("image_size") or 0 b64_bytes = fields["image"].get("image_bytes") - if b64_bytes: + if not image_size and b64_bytes: # Optimized size calculation without full decoding image_size = (len(b64_bytes) * 3) // 4 if b64_bytes.endswith("=="): @@ -184,9 +274,9 @@ def convert_db_lxmf_message_to_dict( } if "audio" in fields: # keep mode but strip bytes - audio_size = 0 + audio_size = fields["audio"].get("audio_size") or 0 b64_bytes = fields["audio"].get("audio_bytes") - if b64_bytes: + if not audio_size and b64_bytes: audio_size = (len(b64_bytes) * 3) // 4 if b64_bytes.endswith("=="): audio_size -= 2 @@ -200,9 +290,9 @@ def convert_db_lxmf_message_to_dict( if "file_attachments" in fields: # keep file names but strip bytes for i in range(len(fields["file_attachments"])): - file_size = 0 + file_size = fields["file_attachments"][i].get("file_size") or 0 b64_bytes = fields["file_attachments"][i].get("file_bytes") - if b64_bytes: + if not file_size and b64_bytes: file_size = (len(b64_bytes) * 3) // 4 if b64_bytes.endswith("=="): file_size -= 2 diff --git a/meshchatx/src/backend/meshchat_utils.py b/meshchatx/src/backend/meshchat_utils.py index bd0e8bd..87d549b 100644 --- a/meshchatx/src/backend/meshchat_utils.py +++ b/meshchatx/src/backend/meshchat_utils.py @@ -116,29 +116,59 @@ def convert_db_favourite_to_dict(favourite): def parse_lxmf_display_name( - app_data_base64: str | None, + app_data_base64: str | bytes | None, default_value: str | None = "Anonymous Peer", ): if app_data_base64 is None: return default_value try: - app_data_bytes = base64.b64decode(app_data_base64) - display_name = LXMF.display_name_from_app_data(app_data_bytes) - if display_name is not None: - return display_name + if isinstance(app_data_base64, bytes): + app_data_bytes = app_data_base64 + else: + app_data_bytes = base64.b64decode(app_data_base64) + + # Try using the library first + try: + display_name = LXMF.display_name_from_app_data(app_data_bytes) + if display_name is not None: + return display_name + except (AttributeError, Exception): + # Handle cases where library might fail or has the 'str' object has no attribute 'decode' bug + pass + + # Fallback manual parsing if library failed or returned None + if len(app_data_bytes) > 0: + # Version 0.5.0+ announce format (msgpack list) + if ( + app_data_bytes[0] >= 0x90 and app_data_bytes[0] <= 0x9F + ) or app_data_bytes[0] == 0xDC: + try: + peer_data = msgpack.unpackb(app_data_bytes) + if isinstance(peer_data, list) and len(peer_data) >= 1: + dn = peer_data[0] + if dn is not None: + if isinstance(dn, bytes): + return dn.decode("utf-8") + return str(dn) + except Exception: + pass except Exception as e: print(f"Failed to parse LXMF display name: {e}") return default_value -def parse_lxmf_stamp_cost(app_data_base64: str | None): +def parse_lxmf_stamp_cost(app_data_base64: str | bytes | None): if app_data_base64 is None: return None try: - app_data_bytes = base64.b64decode(app_data_base64) + if isinstance(app_data_base64, bytes): + app_data_bytes = app_data_base64 + else: + app_data_bytes = base64.b64decode(app_data_base64) + return LXMF.stamp_cost_from_app_data(app_data_bytes) except Exception as e: print(f"Failed to parse LXMF stamp cost: {e}") @@ -146,26 +176,34 @@ def parse_lxmf_stamp_cost(app_data_base64: str | None): def parse_nomadnetwork_node_display_name( - app_data_base64: str | None, + app_data_base64: str | bytes | None, default_value: str | None = "Anonymous Node", ): if app_data_base64 is None: return default_value try: - app_data_bytes = base64.b64decode(app_data_base64) + if isinstance(app_data_base64, bytes): + app_data_bytes = app_data_base64 + else: + app_data_bytes = base64.b64decode(app_data_base64) + return app_data_bytes.decode("utf-8") except Exception as e: print(f"Failed to parse NomadNetwork display name: {e}") return default_value -def parse_lxmf_propagation_node_app_data(app_data_base64: str | None): +def parse_lxmf_propagation_node_app_data(app_data_base64: str | bytes | None): if app_data_base64 is None: return None try: - app_data_bytes = base64.b64decode(app_data_base64) + if isinstance(app_data_base64, bytes): + app_data_bytes = app_data_base64 + else: + app_data_bytes = base64.b64decode(app_data_base64) + data = msgpack.unpackb(app_data_bytes) if not isinstance(data, list) or len(data) < 4: diff --git a/meshchatx/src/backend/rnpath_trace_handler.py b/meshchatx/src/backend/rnpath_trace_handler.py new file mode 100644 index 0000000..4563f15 --- /dev/null +++ b/meshchatx/src/backend/rnpath_trace_handler.py @@ -0,0 +1,100 @@ +import asyncio +import time +import traceback + +import RNS + + +class RNPathTraceHandler: + def __init__(self, reticulum_instance, identity): + self.reticulum = reticulum_instance + self.identity = identity + + async def trace_path(self, destination_hash_str): + try: + try: + destination_hash = bytes.fromhex(destination_hash_str) + except Exception: + return {"error": "Invalid destination hash"} + + # Request path if we don't have it + if not RNS.Transport.has_path(destination_hash): + RNS.Transport.request_path(destination_hash) + timeout = 10 + start_time = time.time() + while ( + not RNS.Transport.has_path(destination_hash) + and time.time() - start_time < timeout + ): + await asyncio.sleep(0.2) + + if not RNS.Transport.has_path(destination_hash): + return {"error": "Path not found after timeout"} + + hops = RNS.Transport.hops_to(destination_hash) + + next_hop_bytes = None + next_hop_interface = None + if self.reticulum: + try: + next_hop_bytes = self.reticulum.get_next_hop(destination_hash) + next_hop_interface = self.reticulum.get_next_hop_if_name( + destination_hash, + ) + except Exception as e: + print(f"Error calling reticulum methods: {e}") + + path = [] + # Me + local_hash = "unknown" + if self.identity and hasattr(self.identity, "hash"): + local_hash = self.identity.hash.hex() + elif ( + self.reticulum + and hasattr(self.reticulum, "identity") + and self.reticulum.identity + ): + local_hash = self.reticulum.identity.hash.hex() + + path.append({"type": "local", "hash": local_hash, "name": "Local Node"}) + + if hops == 1: + # Direct + path.append( + { + "type": "destination", + "hash": destination_hash_str, + "hops": 1, + "interface": next_hop_interface, + }, + ) + elif hops > 1: + # Next hop + path.append( + { + "type": "hop", + "hash": next_hop_bytes.hex() if next_hop_bytes else None, + "name": "Next Hop", + "interface": next_hop_interface, + "hop_number": 1, + }, + ) + + # Intermediate unknown hops + if hops > 2: + path.append({"type": "unknown", "count": hops - 2}) + + # Destination + path.append( + {"type": "destination", "hash": destination_hash_str, "hops": hops}, + ) + + return { + "destination": destination_hash_str, + "hops": hops, + "path": path, + "interface": next_hop_interface, + "next_hop": next_hop_bytes.hex() if next_hop_bytes else None, + } + except Exception as e: + return {"error": f"Trace failed: {e}\n{traceback.format_exc()}"} diff --git a/meshchatx/src/backend/telemetry_utils.py b/meshchatx/src/backend/telemetry_utils.py index 2a71288..8a8c2c0 100644 --- a/meshchatx/src/backend/telemetry_utils.py +++ b/meshchatx/src/backend/telemetry_utils.py @@ -70,7 +70,7 @@ class Telemeter: struct.pack("!I", int(round(speed, 2) * 1e2)), struct.pack("!i", int(round(bearing, 2) * 1e2)), struct.pack("!H", int(round(accuracy, 2) * 1e2)), - int(last_update or time.time()), + int(last_update) if last_update is not None else int(time.time()), ] except Exception: return None @@ -84,15 +84,33 @@ class Telemeter: res["time"] = {"utc": p[Sensor.SID_TIME]} if Sensor.SID_LOCATION in p: res["location"] = Telemeter.unpack_location(p[Sensor.SID_LOCATION]) + if Sensor.SID_PHYSICAL_LINK in p: + pl = p[Sensor.SID_PHYSICAL_LINK] + if isinstance(pl, (list, tuple)) and len(pl) >= 3: + res["physical_link"] = {"rssi": pl[0], "snr": pl[1], "q": pl[2]} + if Sensor.SID_BATTERY in p: + b = p[Sensor.SID_BATTERY] + if isinstance(b, (list, tuple)) and len(b) >= 2: + res["battery"] = {"charge_percent": b[0], "charging": b[1]} # Add other sensors as needed return res except Exception: return None @staticmethod - def pack(time_utc=None, location=None): + def pack(time_utc=None, location=None, battery=None, physical_link=None): p = {} p[Sensor.SID_TIME] = int(time_utc or time.time()) if location: p[Sensor.SID_LOCATION] = Telemeter.pack_location(**location) + if battery: + # battery should be [charge_percent, charging] + p[Sensor.SID_BATTERY] = [battery["charge_percent"], battery["charging"]] + if physical_link: + # physical_link should be [rssi, snr, q] + p[Sensor.SID_PHYSICAL_LINK] = [ + physical_link["rssi"], + physical_link["snr"], + physical_link["q"], + ] return umsgpack.packb(p)