From bdbf7461a9cb0c182a380bd36c09829ea5fb7ff0 Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Thu, 1 Jan 2026 17:34:34 -0600 Subject: [PATCH] feat(telemetry): implement telemetry handling and greeting management in ReticulumMeshChat --- meshchatx/meshchat.py | 310 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 284 insertions(+), 26 deletions(-) diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index ec4a59c..c12ceae 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -55,6 +55,7 @@ from meshchatx.src.backend.lxmf_message_fields import ( LxmfFileAttachmentsField, LxmfImageField, ) +from meshchatx.src.backend.telemetry_utils import Telemeter from meshchatx.src.backend.map_manager import MapManager from meshchatx.src.backend.message_handler import MessageHandler from meshchatx.src.backend.rncp_handler import RNCPHandler @@ -265,7 +266,9 @@ class ReticulumMeshChat: ) # load and register all forwarding alias identities - self.forwarding_manager = ForwardingManager(self.database, self.message_router) + self.forwarding_manager = ForwardingManager( + self.database, lxmf_router_path, self.on_lxmf_delivery + ) self.forwarding_manager.load_aliases() # set a callback for when an lxmf message is received @@ -333,6 +336,7 @@ class ReticulumMeshChat: # init Voicemail Manager self.voicemail_manager = VoicemailManager( db=self.database, + config=self.config, telephone_manager=self.telephone_manager, storage_dir=self.storage_path, ) @@ -2740,11 +2744,15 @@ class ReticulumMeshChat: # voicemail status @routes.get("/api/v1/telephone/voicemail/status") async def telephone_voicemail_status(request): + greeting_path = os.path.join( + self.voicemail_manager.greetings_dir, "greeting.opus" + ) return web.json_response( { "has_espeak": self.voicemail_manager.has_espeak, "has_ffmpeg": self.voicemail_manager.has_ffmpeg, "is_recording": self.voicemail_manager.is_recording, + "has_greeting": os.path.exists(greeting_path), }, ) @@ -2822,6 +2830,55 @@ class ReticulumMeshChat: except Exception as e: return web.json_response({"message": str(e)}, status=500) + # upload greeting + @routes.post("/api/v1/telephone/voicemail/greeting/upload") + async def telephone_voicemail_greeting_upload(request): + try: + reader = await request.multipart() + field = await reader.next() + if field.name != "file": + return web.json_response({"message": "File field required"}, status=400) + + filename = field.filename + extension = os.path.splitext(filename)[1].lower() + if extension not in [".mp3", ".ogg", ".wav", ".m4a", ".flac"]: + return web.json_response( + {"message": f"Unsupported file type: {extension}"}, status=400 + ) + + # Save temp file + with tempfile.NamedTemporaryFile(suffix=extension, delete=False) as f: + temp_path = f.name + while True: + chunk = await field.read_chunk() + if not chunk: + break + f.write(chunk) + + try: + # Convert to greeting + path = await asyncio.to_thread( + self.voicemail_manager.convert_to_greeting, temp_path + ) + return web.json_response( + {"message": "Greeting uploaded and converted", "path": path} + ) + finally: + if os.path.exists(temp_path): + os.remove(temp_path) + + except Exception as e: + return web.json_response({"message": str(e)}, status=500) + + # delete greeting + @routes.delete("/api/v1/telephone/voicemail/greeting") + async def telephone_voicemail_greeting_delete(request): + try: + self.voicemail_manager.remove_greeting() + return web.json_response({"message": "Greeting deleted"}) + except Exception as e: + return web.json_response({"message": str(e)}, status=500) + # announce @routes.get("/api/v1/announce") async def announce_trigger(request): @@ -3342,10 +3399,10 @@ class ReticulumMeshChat: @routes.get("/api/v1/ping/{destination_hash}/lxmf.delivery") async def ping_lxmf_delivery(request): # get path params - destination_hash = request.match_info.get("destination_hash", "") + destination_hash_str = request.match_info.get("destination_hash", "") # convert destination hash to bytes - destination_hash = bytes.fromhex(destination_hash) + destination_hash = bytes.fromhex(destination_hash_str) # determine how long until we should time out timeout_seconds = int(request.query.get("timeout", 15)) @@ -3362,8 +3419,8 @@ class ReticulumMeshChat: ): await asyncio.sleep(0.1) - # find destination identity - destination_identity = self.recall_identity(destination_hash) + # find destination identity (pass string hash, not bytes) + destination_identity = self.recall_identity(destination_hash_str) if destination_identity is None: return web.json_response( { @@ -3910,6 +3967,34 @@ class ReticulumMeshChat: file_attachments_field = LxmfFileAttachmentsField(file_attachments) + # parse telemetry field + telemetry_data = None + if "telemetry" in fields: + telemetry_val = fields["telemetry"] + if isinstance(telemetry_val, dict): + # Frontend sent raw dict, pack it here + telemetry_data = Telemeter.pack(location=telemetry_val) + elif isinstance(telemetry_val, str): + # Frontend sent base64 packed data + telemetry_data = base64.b64decode(telemetry_val) + + # parse commands field + commands = None + if "commands" in fields: + # convert dict keys back to ints if they look like hex or int strings + commands = [] + for cmd in fields["commands"]: + new_cmd = {} + for k, v in cmd.items(): + try: + if k.startswith("0x"): + new_cmd[int(k, 16)] = v + else: + new_cmd[int(k)] = v + except: + new_cmd[k] = v + commands.append(new_cmd) + try: # send lxmf message to destination lxmf_message = await self.send_message( @@ -3918,6 +4003,8 @@ class ReticulumMeshChat: image_field=image_field, audio_field=audio_field, file_attachments_field=file_attachments_field, + telemetry_data=telemetry_data, + commands=commands, delivery_method=delivery_method, ) @@ -4496,6 +4583,59 @@ class ReticulumMeshChat: ) return web.json_response({"error": "File not found"}, status=404) + # get latest telemetry for all peers + @routes.get("/api/v1/telemetry/peers") + async def get_all_latest_telemetry(request): + results = self.database.telemetry.get_all_latest_telemetry() + telemetry_list = [] + for r in results: + unpacked = Telemeter.from_packed(r["data"]) + telemetry_list.append({ + "destination_hash": r["destination_hash"], + "timestamp": r["timestamp"], + "telemetry": unpacked, + "physical_link": json.loads(r["physical_link"]) if r["physical_link"] else None, + "updated_at": r["updated_at"], + }) + return web.json_response({"telemetry": telemetry_list}) + + # get telemetry history for a destination + @routes.get("/api/v1/telemetry/history/{destination_hash}") + async def get_telemetry_history(request): + destination_hash = request.match_info.get("destination_hash") + limit = int(request.query.get("limit", 100)) + offset = int(request.query.get("offset", 0)) + + results = self.database.telemetry.get_telemetry_history(destination_hash, limit, offset) + telemetry_list = [] + for r in results: + unpacked = Telemeter.from_packed(r["data"]) + telemetry_list.append({ + "destination_hash": r["destination_hash"], + "timestamp": r["timestamp"], + "telemetry": unpacked, + "physical_link": json.loads(r["physical_link"]) if r["physical_link"] else None, + "updated_at": r["updated_at"], + }) + return web.json_response({"telemetry": telemetry_list}) + + # get latest telemetry for a destination + @routes.get("/api/v1/telemetry/latest/{destination_hash}") + async def get_latest_telemetry(request): + destination_hash = request.match_info.get("destination_hash") + r = self.database.telemetry.get_latest_telemetry(destination_hash) + if not r: + return web.json_response({"error": "No telemetry found"}, status=404) + + unpacked = Telemeter.from_packed(r["data"]) + return web.json_response({ + "destination_hash": r["destination_hash"], + "timestamp": r["timestamp"], + "telemetry": unpacked, + "physical_link": json.loads(r["physical_link"]) if r["physical_link"] else None, + "updated_at": r["updated_at"], + }) + # upload offline map @routes.post("/api/v1/map/offline") async def upload_map_offline(request): @@ -5515,6 +5655,10 @@ class ReticulumMeshChat: "crawler_retry_delay_seconds": self.config.crawler_retry_delay_seconds.get(), "crawler_max_concurrent": self.config.crawler_max_concurrent.get(), "auth_enabled": self.auth_enabled, + "voicemail_enabled": self.config.voicemail_enabled.get(), + "voicemail_greeting": self.config.voicemail_greeting.get(), + "voicemail_auto_answer_delay_seconds": self.config.voicemail_auto_answer_delay_seconds.get(), + "voicemail_max_recording_seconds": self.config.voicemail_max_recording_seconds.get(), } # try and get a name for the provided identity hash @@ -5671,6 +5815,10 @@ class ReticulumMeshChat: "audio_bytes": audio_bytes, } + # handle telemetry field + if field_type == LXMF.FIELD_TELEMETRY: + fields["telemetry"] = Telemeter.from_packed(value) + # convert 0.0-1.0 progress to 0.00-100 percentage progress_percentage = round(lxmf_message.progress * 100, 2) @@ -6014,11 +6162,10 @@ class ReticulumMeshChat: if SidebandCommands.TELEMETRY_REQUEST in command: is_sideband_telemetry_request = True - # ignore telemetry requests from sideband + # respond to telemetry requests from sideband if is_sideband_telemetry_request: - print( - "Ignoring received LXMF message as it is a telemetry request command", - ) + print(f"Responding to telemetry request from {source_hash}") + self.handle_telemetry_request(source_hash) return # check for spam keywords @@ -6055,6 +6202,47 @@ class ReticulumMeshChat: # handle forwarding self.handle_forwarding(lxmf_message) + # handle telemetry + try: + message_fields = lxmf_message.get_fields() + if LXMF.FIELD_TELEMETRY in message_fields: + telemetry_data = message_fields[LXMF.FIELD_TELEMETRY] + # unpack to get timestamp + unpacked = Telemeter.from_packed(telemetry_data) + if unpacked and "time" in unpacked: + timestamp = unpacked["time"]["utc"] + + # physical link info + physical_link = { + "rssi": self.reticulum.get_packet_rssi(lxmf_message.hash), + "snr": self.reticulum.get_packet_snr(lxmf_message.hash), + "q": self.reticulum.get_packet_q(lxmf_message.hash), + } + + self.database.telemetry.upsert_telemetry( + destination_hash=source_hash, + timestamp=timestamp, + data=telemetry_data, + received_from=self.local_lxmf_destination.hexhash, + physical_link=physical_link, + ) + + # broadcast telemetry update via websocket + AsyncUtils.run_async( + self.websocket_broadcast( + json.dumps( + { + "type": "lxmf.telemetry", + "destination_hash": source_hash, + "timestamp": timestamp, + "telemetry": unpacked, + }, + ), + ), + ) + except Exception as e: + print(f"Failed to handle telemetry in LXMF message: {e}") + # update lxmf user icon if icon appearance field is available try: message_fields = lxmf_message.get_fields() @@ -6210,7 +6398,14 @@ class ReticulumMeshChat: lxmf_message.try_propagation_on_fail = False # resend message - self.message_router.handle_outbound(lxmf_message) + source_hash = lxmf_message.source_hash.hex() + router = self.message_router + if ( + self.forwarding_manager + and source_hash in self.forwarding_manager.forwarding_routers + ): + router = self.forwarding_manager.forwarding_routers[source_hash] + router.handle_outbound(lxmf_message) # upserts the provided lxmf message to the database def db_upsert_lxmf_message( @@ -6232,9 +6427,12 @@ class ReticulumMeshChat: image_field: LxmfImageField = None, audio_field: LxmfAudioField = None, file_attachments_field: LxmfFileAttachmentsField = None, + telemetry_data: bytes = None, + commands: list = None, delivery_method: str = None, title: str = "", sender_identity_hash: str = None, + no_display: bool = False, ) -> LXMF.LXMessage: # convert destination hash to bytes destination_hash_bytes = bytes.fromhex(destination_hash) @@ -6296,8 +6494,13 @@ class ReticulumMeshChat: # determine which identity to send from source_destination = self.local_lxmf_destination if sender_identity_hash is not None: - if sender_identity_hash in self.forwarding_destinations: - source_destination = self.forwarding_destinations[sender_identity_hash] + if ( + self.forwarding_manager + and sender_identity_hash in self.forwarding_manager.forwarding_destinations + ): + source_destination = self.forwarding_manager.forwarding_destinations[ + sender_identity_hash + ] else: print( f"Warning: requested sender identity {sender_identity_hash} not found, using default.", @@ -6342,6 +6545,14 @@ class ReticulumMeshChat: audio_field.audio_bytes, ] + # add telemetry field + if telemetry_data is not None: + lxmf_message.fields[LXMF.FIELD_TELEMETRY] = telemetry_data + + # add commands field + if commands is not None: + lxmf_message.fields[LXMF.FIELD_COMMANDS] = commands + # add icon appearance if configured # fixme: we could save a tiny amount of bandwidth here, but this requires more effort... # we could keep track of when the icon appearance was last sent to this destination, and when it last changed @@ -6368,32 +6579,79 @@ class ReticulumMeshChat: lxmf_message.register_delivery_callback(self.on_lxmf_sending_state_updated) lxmf_message.register_failed_callback(self.on_lxmf_sending_failed) + # determine which router to use + router = self.message_router + if ( + sender_identity_hash is not None + and sender_identity_hash in self.forwarding_manager.forwarding_routers + ): + router = self.forwarding_manager.forwarding_routers[sender_identity_hash] + # send lxmf message to be routed to destination - self.message_router.handle_outbound(lxmf_message) + router.handle_outbound(lxmf_message) # upsert lxmf message to database - self.db_upsert_lxmf_message(lxmf_message) + if not no_display: + self.db_upsert_lxmf_message(lxmf_message) # tell all websocket clients that old failed message was deleted so it can remove from ui - await self.websocket_broadcast( - json.dumps( - { - "type": "lxmf_message_created", - "lxmf_message": self.convert_lxmf_message_to_dict( - lxmf_message, - include_attachments=False, - ), - }, - ), - ) + if not no_display: + await self.websocket_broadcast( + json.dumps( + { + "type": "lxmf_message_created", + "lxmf_message": self.convert_lxmf_message_to_dict( + lxmf_message, + include_attachments=False, + ), + }, + ), + ) # handle lxmf message progress loop without blocking or awaiting # otherwise other incoming websocket packets will not be processed until sending is complete # which results in the next message not showing up until the first message is finished - AsyncUtils.run_async(self.handle_lxmf_message_progress(lxmf_message)) + if not no_display: + AsyncUtils.run_async(self.handle_lxmf_message_progress(lxmf_message)) return lxmf_message + def handle_telemetry_request(self, to_addr_hash: str): + # get our location from config + lat = self.database.config.get("map_default_lat") + lon = self.database.config.get("map_default_lon") + + if lat is None or lon is None: + print(f"Cannot respond to telemetry request from {to_addr_hash}: No location set") + return + + try: + location = { + "latitude": float(lat), + "longitude": float(lon), + "altitude": 0, + "speed": 0, + "bearing": 0, + "accuracy": 0, + "last_update": int(time.time()), + } + + telemetry_data = Telemeter.pack(location=location) + + # send as an LXMF message with no content, only telemetry field + # use no_display=True to avoid showing in chat UI + AsyncUtils.run_async( + self.send_message( + destination_hash=to_addr_hash, + content="", + telemetry_data=telemetry_data, + delivery_method="opportunistic", + no_display=True, + ) + ) + except Exception as e: + print(f"Failed to respond to telemetry request: {e}") + # updates lxmf message in database and broadcasts to websocket until it's delivered, or it fails async def handle_lxmf_message_progress(self, lxmf_message): # FIXME: there's no register_progress_callback on the lxmf message, so manually send progress until delivered, propagated or failed