feat(telemetry): implement telemetry handling and greeting management in ReticulumMeshChat

This commit is contained in:
2026-01-01 17:34:34 -06:00
parent 3f9a22e79e
commit bdbf7461a9

View File

@@ -55,6 +55,7 @@ from meshchatx.src.backend.lxmf_message_fields import (
LxmfFileAttachmentsField,
LxmfImageField,
)
from meshchatx.src.backend.telemetry_utils import Telemeter
from meshchatx.src.backend.map_manager import MapManager
from meshchatx.src.backend.message_handler import MessageHandler
from meshchatx.src.backend.rncp_handler import RNCPHandler
@@ -265,7 +266,9 @@ class ReticulumMeshChat:
)
# load and register all forwarding alias identities
self.forwarding_manager = ForwardingManager(self.database, self.message_router)
self.forwarding_manager = ForwardingManager(
self.database, lxmf_router_path, self.on_lxmf_delivery
)
self.forwarding_manager.load_aliases()
# set a callback for when an lxmf message is received
@@ -333,6 +336,7 @@ class ReticulumMeshChat:
# init Voicemail Manager
self.voicemail_manager = VoicemailManager(
db=self.database,
config=self.config,
telephone_manager=self.telephone_manager,
storage_dir=self.storage_path,
)
@@ -2740,11 +2744,15 @@ class ReticulumMeshChat:
# voicemail status
@routes.get("/api/v1/telephone/voicemail/status")
async def telephone_voicemail_status(request):
greeting_path = os.path.join(
self.voicemail_manager.greetings_dir, "greeting.opus"
)
return web.json_response(
{
"has_espeak": self.voicemail_manager.has_espeak,
"has_ffmpeg": self.voicemail_manager.has_ffmpeg,
"is_recording": self.voicemail_manager.is_recording,
"has_greeting": os.path.exists(greeting_path),
},
)
@@ -2822,6 +2830,55 @@ class ReticulumMeshChat:
except Exception as e:
return web.json_response({"message": str(e)}, status=500)
# upload greeting
@routes.post("/api/v1/telephone/voicemail/greeting/upload")
async def telephone_voicemail_greeting_upload(request):
try:
reader = await request.multipart()
field = await reader.next()
if field.name != "file":
return web.json_response({"message": "File field required"}, status=400)
filename = field.filename
extension = os.path.splitext(filename)[1].lower()
if extension not in [".mp3", ".ogg", ".wav", ".m4a", ".flac"]:
return web.json_response(
{"message": f"Unsupported file type: {extension}"}, status=400
)
# Save temp file
with tempfile.NamedTemporaryFile(suffix=extension, delete=False) as f:
temp_path = f.name
while True:
chunk = await field.read_chunk()
if not chunk:
break
f.write(chunk)
try:
# Convert to greeting
path = await asyncio.to_thread(
self.voicemail_manager.convert_to_greeting, temp_path
)
return web.json_response(
{"message": "Greeting uploaded and converted", "path": path}
)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
except Exception as e:
return web.json_response({"message": str(e)}, status=500)
# delete greeting
@routes.delete("/api/v1/telephone/voicemail/greeting")
async def telephone_voicemail_greeting_delete(request):
try:
self.voicemail_manager.remove_greeting()
return web.json_response({"message": "Greeting deleted"})
except Exception as e:
return web.json_response({"message": str(e)}, status=500)
# announce
@routes.get("/api/v1/announce")
async def announce_trigger(request):
@@ -3342,10 +3399,10 @@ class ReticulumMeshChat:
@routes.get("/api/v1/ping/{destination_hash}/lxmf.delivery")
async def ping_lxmf_delivery(request):
# get path params
destination_hash = request.match_info.get("destination_hash", "")
destination_hash_str = request.match_info.get("destination_hash", "")
# convert destination hash to bytes
destination_hash = bytes.fromhex(destination_hash)
destination_hash = bytes.fromhex(destination_hash_str)
# determine how long until we should time out
timeout_seconds = int(request.query.get("timeout", 15))
@@ -3362,8 +3419,8 @@ class ReticulumMeshChat:
):
await asyncio.sleep(0.1)
# find destination identity
destination_identity = self.recall_identity(destination_hash)
# find destination identity (pass string hash, not bytes)
destination_identity = self.recall_identity(destination_hash_str)
if destination_identity is None:
return web.json_response(
{
@@ -3910,6 +3967,34 @@ class ReticulumMeshChat:
file_attachments_field = LxmfFileAttachmentsField(file_attachments)
# parse telemetry field
telemetry_data = None
if "telemetry" in fields:
telemetry_val = fields["telemetry"]
if isinstance(telemetry_val, dict):
# Frontend sent raw dict, pack it here
telemetry_data = Telemeter.pack(location=telemetry_val)
elif isinstance(telemetry_val, str):
# Frontend sent base64 packed data
telemetry_data = base64.b64decode(telemetry_val)
# parse commands field
commands = None
if "commands" in fields:
# convert dict keys back to ints if they look like hex or int strings
commands = []
for cmd in fields["commands"]:
new_cmd = {}
for k, v in cmd.items():
try:
if k.startswith("0x"):
new_cmd[int(k, 16)] = v
else:
new_cmd[int(k)] = v
except:
new_cmd[k] = v
commands.append(new_cmd)
try:
# send lxmf message to destination
lxmf_message = await self.send_message(
@@ -3918,6 +4003,8 @@ class ReticulumMeshChat:
image_field=image_field,
audio_field=audio_field,
file_attachments_field=file_attachments_field,
telemetry_data=telemetry_data,
commands=commands,
delivery_method=delivery_method,
)
@@ -4496,6 +4583,59 @@ class ReticulumMeshChat:
)
return web.json_response({"error": "File not found"}, status=404)
# get latest telemetry for all peers
@routes.get("/api/v1/telemetry/peers")
async def get_all_latest_telemetry(request):
results = self.database.telemetry.get_all_latest_telemetry()
telemetry_list = []
for r in results:
unpacked = Telemeter.from_packed(r["data"])
telemetry_list.append({
"destination_hash": r["destination_hash"],
"timestamp": r["timestamp"],
"telemetry": unpacked,
"physical_link": json.loads(r["physical_link"]) if r["physical_link"] else None,
"updated_at": r["updated_at"],
})
return web.json_response({"telemetry": telemetry_list})
# get telemetry history for a destination
@routes.get("/api/v1/telemetry/history/{destination_hash}")
async def get_telemetry_history(request):
destination_hash = request.match_info.get("destination_hash")
limit = int(request.query.get("limit", 100))
offset = int(request.query.get("offset", 0))
results = self.database.telemetry.get_telemetry_history(destination_hash, limit, offset)
telemetry_list = []
for r in results:
unpacked = Telemeter.from_packed(r["data"])
telemetry_list.append({
"destination_hash": r["destination_hash"],
"timestamp": r["timestamp"],
"telemetry": unpacked,
"physical_link": json.loads(r["physical_link"]) if r["physical_link"] else None,
"updated_at": r["updated_at"],
})
return web.json_response({"telemetry": telemetry_list})
# get latest telemetry for a destination
@routes.get("/api/v1/telemetry/latest/{destination_hash}")
async def get_latest_telemetry(request):
destination_hash = request.match_info.get("destination_hash")
r = self.database.telemetry.get_latest_telemetry(destination_hash)
if not r:
return web.json_response({"error": "No telemetry found"}, status=404)
unpacked = Telemeter.from_packed(r["data"])
return web.json_response({
"destination_hash": r["destination_hash"],
"timestamp": r["timestamp"],
"telemetry": unpacked,
"physical_link": json.loads(r["physical_link"]) if r["physical_link"] else None,
"updated_at": r["updated_at"],
})
# upload offline map
@routes.post("/api/v1/map/offline")
async def upload_map_offline(request):
@@ -5515,6 +5655,10 @@ class ReticulumMeshChat:
"crawler_retry_delay_seconds": self.config.crawler_retry_delay_seconds.get(),
"crawler_max_concurrent": self.config.crawler_max_concurrent.get(),
"auth_enabled": self.auth_enabled,
"voicemail_enabled": self.config.voicemail_enabled.get(),
"voicemail_greeting": self.config.voicemail_greeting.get(),
"voicemail_auto_answer_delay_seconds": self.config.voicemail_auto_answer_delay_seconds.get(),
"voicemail_max_recording_seconds": self.config.voicemail_max_recording_seconds.get(),
}
# try and get a name for the provided identity hash
@@ -5671,6 +5815,10 @@ class ReticulumMeshChat:
"audio_bytes": audio_bytes,
}
# handle telemetry field
if field_type == LXMF.FIELD_TELEMETRY:
fields["telemetry"] = Telemeter.from_packed(value)
# convert 0.0-1.0 progress to 0.00-100 percentage
progress_percentage = round(lxmf_message.progress * 100, 2)
@@ -6014,11 +6162,10 @@ class ReticulumMeshChat:
if SidebandCommands.TELEMETRY_REQUEST in command:
is_sideband_telemetry_request = True
# ignore telemetry requests from sideband
# respond to telemetry requests from sideband
if is_sideband_telemetry_request:
print(
"Ignoring received LXMF message as it is a telemetry request command",
)
print(f"Responding to telemetry request from {source_hash}")
self.handle_telemetry_request(source_hash)
return
# check for spam keywords
@@ -6055,6 +6202,47 @@ class ReticulumMeshChat:
# handle forwarding
self.handle_forwarding(lxmf_message)
# handle telemetry
try:
message_fields = lxmf_message.get_fields()
if LXMF.FIELD_TELEMETRY in message_fields:
telemetry_data = message_fields[LXMF.FIELD_TELEMETRY]
# unpack to get timestamp
unpacked = Telemeter.from_packed(telemetry_data)
if unpacked and "time" in unpacked:
timestamp = unpacked["time"]["utc"]
# physical link info
physical_link = {
"rssi": self.reticulum.get_packet_rssi(lxmf_message.hash),
"snr": self.reticulum.get_packet_snr(lxmf_message.hash),
"q": self.reticulum.get_packet_q(lxmf_message.hash),
}
self.database.telemetry.upsert_telemetry(
destination_hash=source_hash,
timestamp=timestamp,
data=telemetry_data,
received_from=self.local_lxmf_destination.hexhash,
physical_link=physical_link,
)
# broadcast telemetry update via websocket
AsyncUtils.run_async(
self.websocket_broadcast(
json.dumps(
{
"type": "lxmf.telemetry",
"destination_hash": source_hash,
"timestamp": timestamp,
"telemetry": unpacked,
},
),
),
)
except Exception as e:
print(f"Failed to handle telemetry in LXMF message: {e}")
# update lxmf user icon if icon appearance field is available
try:
message_fields = lxmf_message.get_fields()
@@ -6210,7 +6398,14 @@ class ReticulumMeshChat:
lxmf_message.try_propagation_on_fail = False
# resend message
self.message_router.handle_outbound(lxmf_message)
source_hash = lxmf_message.source_hash.hex()
router = self.message_router
if (
self.forwarding_manager
and source_hash in self.forwarding_manager.forwarding_routers
):
router = self.forwarding_manager.forwarding_routers[source_hash]
router.handle_outbound(lxmf_message)
# upserts the provided lxmf message to the database
def db_upsert_lxmf_message(
@@ -6232,9 +6427,12 @@ class ReticulumMeshChat:
image_field: LxmfImageField = None,
audio_field: LxmfAudioField = None,
file_attachments_field: LxmfFileAttachmentsField = None,
telemetry_data: bytes = None,
commands: list = None,
delivery_method: str = None,
title: str = "",
sender_identity_hash: str = None,
no_display: bool = False,
) -> LXMF.LXMessage:
# convert destination hash to bytes
destination_hash_bytes = bytes.fromhex(destination_hash)
@@ -6296,8 +6494,13 @@ class ReticulumMeshChat:
# determine which identity to send from
source_destination = self.local_lxmf_destination
if sender_identity_hash is not None:
if sender_identity_hash in self.forwarding_destinations:
source_destination = self.forwarding_destinations[sender_identity_hash]
if (
self.forwarding_manager
and sender_identity_hash in self.forwarding_manager.forwarding_destinations
):
source_destination = self.forwarding_manager.forwarding_destinations[
sender_identity_hash
]
else:
print(
f"Warning: requested sender identity {sender_identity_hash} not found, using default.",
@@ -6342,6 +6545,14 @@ class ReticulumMeshChat:
audio_field.audio_bytes,
]
# add telemetry field
if telemetry_data is not None:
lxmf_message.fields[LXMF.FIELD_TELEMETRY] = telemetry_data
# add commands field
if commands is not None:
lxmf_message.fields[LXMF.FIELD_COMMANDS] = commands
# add icon appearance if configured
# fixme: we could save a tiny amount of bandwidth here, but this requires more effort...
# we could keep track of when the icon appearance was last sent to this destination, and when it last changed
@@ -6368,32 +6579,79 @@ class ReticulumMeshChat:
lxmf_message.register_delivery_callback(self.on_lxmf_sending_state_updated)
lxmf_message.register_failed_callback(self.on_lxmf_sending_failed)
# determine which router to use
router = self.message_router
if (
sender_identity_hash is not None
and sender_identity_hash in self.forwarding_manager.forwarding_routers
):
router = self.forwarding_manager.forwarding_routers[sender_identity_hash]
# send lxmf message to be routed to destination
self.message_router.handle_outbound(lxmf_message)
router.handle_outbound(lxmf_message)
# upsert lxmf message to database
self.db_upsert_lxmf_message(lxmf_message)
if not no_display:
self.db_upsert_lxmf_message(lxmf_message)
# tell all websocket clients that old failed message was deleted so it can remove from ui
await self.websocket_broadcast(
json.dumps(
{
"type": "lxmf_message_created",
"lxmf_message": self.convert_lxmf_message_to_dict(
lxmf_message,
include_attachments=False,
),
},
),
)
if not no_display:
await self.websocket_broadcast(
json.dumps(
{
"type": "lxmf_message_created",
"lxmf_message": self.convert_lxmf_message_to_dict(
lxmf_message,
include_attachments=False,
),
},
),
)
# handle lxmf message progress loop without blocking or awaiting
# otherwise other incoming websocket packets will not be processed until sending is complete
# which results in the next message not showing up until the first message is finished
AsyncUtils.run_async(self.handle_lxmf_message_progress(lxmf_message))
if not no_display:
AsyncUtils.run_async(self.handle_lxmf_message_progress(lxmf_message))
return lxmf_message
def handle_telemetry_request(self, to_addr_hash: str):
# get our location from config
lat = self.database.config.get("map_default_lat")
lon = self.database.config.get("map_default_lon")
if lat is None or lon is None:
print(f"Cannot respond to telemetry request from {to_addr_hash}: No location set")
return
try:
location = {
"latitude": float(lat),
"longitude": float(lon),
"altitude": 0,
"speed": 0,
"bearing": 0,
"accuracy": 0,
"last_update": int(time.time()),
}
telemetry_data = Telemeter.pack(location=location)
# send as an LXMF message with no content, only telemetry field
# use no_display=True to avoid showing in chat UI
AsyncUtils.run_async(
self.send_message(
destination_hash=to_addr_hash,
content="",
telemetry_data=telemetry_data,
delivery_method="opportunistic",
no_display=True,
)
)
except Exception as e:
print(f"Failed to respond to telemetry request: {e}")
# updates lxmf message in database and broadcasts to websocket until it's delivered, or it fails
async def handle_lxmf_message_progress(self, lxmf_message):
# FIXME: there's no register_progress_callback on the lxmf message, so manually send progress until delivered, propagated or failed