feat(telemetry): implement telemetry tracking and path tracing features

- Added telemetry tracking capabilities, allowing users to toggle tracking for specific peers and retrieve tracked peers.
- Introduced RNPathTraceHandler for tracing paths to destination hashes.
- Enhanced database schema to support telemetry tracking and added related fields in contacts.
- Updated configuration management to include telemetry settings.
- Implemented API endpoints for downloading database backups and snapshots, as well as for telemetry-related functionalities.
- Improved error handling and response messages for telemetry requests and path tracing.
This commit is contained in:
2026-01-07 19:13:08 -06:00
parent ce568c2965
commit df306cc67b
11 changed files with 991 additions and 135 deletions

View File

@@ -26,6 +26,7 @@ import traceback
import webbrowser
from datetime import UTC, datetime, timedelta
from logging.handlers import RotatingFileHandler
from urllib.parse import urlparse
import aiohttp
import bcrypt
@@ -435,6 +436,17 @@ class ReticulumMeshChat:
if self.current_context:
self.current_context.rnpath_handler = value
@property
def rnpath_trace_handler(self):
return (
self.current_context.rnpath_trace_handler if self.current_context else None
)
@rnpath_trace_handler.setter
def rnpath_trace_handler(self, value):
if self.current_context:
self.current_context.rnpath_trace_handler = value
@property
def rnprobe_handler(self):
return self.current_context.rnprobe_handler if self.current_context else None
@@ -793,7 +805,6 @@ class ReticulumMeshChat:
# Wait another moment for sockets to definitely be released by OS
# Also give some time for the RPC listener port to settle
print("Waiting for ports to settle...")
# We add a settle time here similar to Sideband's logic
await asyncio.sleep(4)
# Detect RPC type from reticulum instance if possible, otherwise default to both
@@ -2292,6 +2303,64 @@ class ReticulumMeshChat:
status=500,
)
@routes.get("/api/v1/database/backups/{filename}/download")
async def download_db_backup(request):
try:
filename = request.match_info.get("filename")
if not filename.endswith(".zip"):
filename += ".zip"
backup_dir = os.path.join(self.storage_dir, "database-backups")
full_path = os.path.join(backup_dir, filename)
if not os.path.exists(full_path) or not full_path.startswith(
backup_dir,
):
return web.json_response(
{"status": "error", "message": "Backup not found"},
status=404,
)
return web.FileResponse(
path=full_path,
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
},
)
except Exception as e:
return web.json_response(
{"status": "error", "message": str(e)},
status=500,
)
@routes.get("/api/v1/database/snapshots/{filename}/download")
async def download_db_snapshot(request):
try:
filename = request.match_info.get("filename")
if not filename.endswith(".zip"):
filename += ".zip"
snapshot_dir = os.path.join(self.storage_dir, "snapshots")
full_path = os.path.join(snapshot_dir, filename)
if not os.path.exists(full_path) or not full_path.startswith(
snapshot_dir,
):
return web.json_response(
{"status": "error", "message": "Snapshot not found"},
status=404,
)
return web.FileResponse(
path=full_path,
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
},
)
except Exception as e:
return web.json_response(
{"status": "error", "message": str(e)},
status=500,
)
@routes.get("/api/v1/status")
async def status(request):
return web.json_response(
@@ -3366,6 +3435,8 @@ class ReticulumMeshChat:
# Get total paths
total_paths = 0
is_connected_to_shared_instance = False
shared_instance_address = None
if hasattr(self, "reticulum") and self.reticulum:
try:
path_table = self.reticulum.get_path_table()
@@ -3373,6 +3444,70 @@ class ReticulumMeshChat:
except Exception: # noqa: S110
pass
is_connected_to_shared_instance = getattr(
self.reticulum,
"is_connected_to_shared_instance",
False,
)
if is_connected_to_shared_instance:
# Try to find the shared instance address from active connections
try:
for conn in process.connections(kind="all"):
if conn.status == psutil.CONN_ESTABLISHED and conn.raddr:
# Check for common Reticulum shared instance ports or UNIX sockets
if (
isinstance(conn.raddr, tuple)
and conn.raddr[1] == 37428
):
shared_instance_address = (
f"{conn.raddr[0]}:{conn.raddr[1]}"
)
break
if (
isinstance(conn.raddr, str)
and (
"rns" in conn.raddr or "reticulum" in conn.raddr
)
and ".sock" in conn.raddr
):
shared_instance_address = conn.raddr
break
except Exception: # noqa: S110
pass
# Fallback to reading config if not found via connections
if not shared_instance_address:
try:
config_dir = getattr(self, "reticulum_config_dir", None)
if not config_dir:
config_dir = getattr(
RNS.Reticulum,
"configdir",
os.path.expanduser("~/.reticulum"),
)
config_path = os.path.join(config_dir, "config")
if os.path.isfile(config_path):
cp = configparser.ConfigParser()
cp.read(config_path)
if cp.has_section("reticulum"):
shared_port = cp.getint(
"reticulum",
"shared_instance_port",
fallback=37428,
)
shared_bind = cp.get(
"reticulum",
"shared_instance_bind",
fallback="127.0.0.1",
)
shared_instance_address = (
f"{shared_bind}:{shared_port}"
)
except Exception: # noqa: S110
pass
# Calculate announce rates
current_time = time.time()
announces_per_second = len(
@@ -3453,15 +3588,8 @@ class ReticulumMeshChat:
if hasattr(self, "reticulum") and self.reticulum
else None
),
"is_connected_to_shared_instance": (
getattr(
self.reticulum,
"is_connected_to_shared_instance",
False,
)
if hasattr(self, "reticulum") and self.reticulum
else False
),
"is_connected_to_shared_instance": is_connected_to_shared_instance,
"shared_instance_address": shared_instance_address,
"is_transport_enabled": (
self.reticulum.transport_enabled()
if hasattr(self, "reticulum") and self.reticulum
@@ -5245,6 +5373,7 @@ class ReticulumMeshChat:
lxst_address = data.get("lxst_address")
preferred_ringtone_id = data.get("preferred_ringtone_id")
custom_image = data.get("custom_image")
is_telemetry_trusted = data.get("is_telemetry_trusted", 0)
if not name:
return web.json_response(
@@ -5278,6 +5407,7 @@ class ReticulumMeshChat:
lxst_address=lxst_address,
preferred_ringtone_id=preferred_ringtone_id,
custom_image=custom_image,
is_telemetry_trusted=is_telemetry_trusted,
)
return web.json_response({"message": "Contact added"})
@@ -5292,6 +5422,7 @@ class ReticulumMeshChat:
preferred_ringtone_id = data.get("preferred_ringtone_id")
custom_image = data.get("custom_image")
clear_image = data.get("clear_image", False)
is_telemetry_trusted = data.get("is_telemetry_trusted")
self.database.contacts.update_contact(
contact_id,
@@ -5302,6 +5433,7 @@ class ReticulumMeshChat:
preferred_ringtone_id=preferred_ringtone_id,
custom_image=custom_image,
clear_image=clear_image,
is_telemetry_trusted=is_telemetry_trusted,
)
return web.json_response({"message": "Contact updated"})
@@ -6374,6 +6506,31 @@ class ReticulumMeshChat:
except Exception as e:
return web.json_response({"message": str(e)}, status=500)
@routes.get("/api/v1/rnpath/trace/{destination_hash}")
async def rnpath_trace(request):
destination_hash = request.match_info.get("destination_hash")
if not destination_hash:
return web.json_response(
{"error": "destination_hash is required"},
status=400,
)
try:
if not self.rnpath_trace_handler:
return web.json_response(
{
"error": "RNPathTraceHandler not initialized for current context",
},
status=503,
)
result = await self.rnpath_trace_handler.trace_path(destination_hash)
return web.json_response(result)
except Exception as e:
import traceback
error_msg = f"Trace route failed: {e}\n{traceback.format_exc()}"
print(error_msg)
return web.json_response({"error": error_msg}, status=500)
@routes.post("/api/v1/rnprobe")
async def rnprobe(request):
data = await request.json()
@@ -7219,6 +7376,9 @@ class ReticulumMeshChat:
"contact_image": contact_image,
"destination_hash": other_user_hash,
"is_unread": is_unread,
"is_tracking": self.database.telemetry.is_tracking(
other_user_hash,
),
"failed_messages_count": row["failed_count"],
"has_attachments": message_fields_have_attachments(
row["fields"],
@@ -7866,10 +8026,40 @@ class ReticulumMeshChat:
if r["physical_link"]
else None,
"updated_at": r["updated_at"],
"is_tracking": self.database.telemetry.is_tracking(
r["destination_hash"],
),
},
)
return web.json_response({"telemetry": telemetry_list})
@routes.get("/api/v1/telemetry/trusted-peers")
async def telemetry_trusted_peers_get(request):
# get all contacts that are telemetry trusted
contacts = self.database.provider.fetchall(
"SELECT * FROM contacts WHERE is_telemetry_trusted = 1 ORDER BY name ASC",
)
return web.json_response({"trusted_peers": [dict(c) for c in contacts]})
# toggle telemetry tracking for a destination
@routes.post("/api/v1/telemetry/tracking/{destination_hash}/toggle")
async def toggle_telemetry_tracking(request):
destination_hash = request.match_info["destination_hash"]
data = await request.json()
is_tracking = data.get("is_tracking")
new_status = self.database.telemetry.toggle_tracking(
destination_hash,
is_tracking,
)
return web.json_response({"status": "ok", "is_tracking": new_status})
# get all tracked peers
@routes.get("/api/v1/telemetry/tracking")
async def get_tracked_peers(request):
results = self.database.telemetry.get_tracked_peers()
return web.json_response({"tracked_peers": results})
# get telemetry history for a destination
@routes.get("/api/v1/telemetry/history/{destination_hash}")
async def get_telemetry_history(request):
@@ -8088,10 +8278,7 @@ class ReticulumMeshChat:
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# CSP: allow localhost for development and Electron, websockets, and blob URLs
# Add 'unsafe-inline' and 'unsafe-eval' for some legacy doc scripts if needed,
# and allow framing ourselves for the docs page.
gitea_url = "https://git.quad4.io"
# CSP base configuration
connect_sources = [
"'self'",
"ws://localhost:*",
@@ -8100,13 +8287,47 @@ class ReticulumMeshChat:
"https://*.tile.openstreetmap.org",
"https://tile.openstreetmap.org",
"https://nominatim.openstreetmap.org",
"https://*.cartocdn.com",
]
img_sources = [
"'self'",
"data:",
"blob:",
"https://*.tile.openstreetmap.org",
"https://tile.openstreetmap.org",
"https://*.cartocdn.com",
]
frame_sources = [
"'self'",
"https://reticulum.network",
]
script_sources = ["'self'", "'unsafe-inline'", "'unsafe-eval'"]
style_sources = ["'self'", "'unsafe-inline'"]
if self.current_context and self.current_context.config:
# Helper to add domain from URL
def add_domain_from_url(url, target_list):
if not url:
return None
try:
parsed = urlparse(url)
if parsed.netloc:
domain = f"{parsed.scheme}://{parsed.netloc}"
if domain not in target_list:
target_list.append(domain)
return domain
except Exception: # noqa: S110
pass
return None
# Add configured Gitea base URL
gitea_url = self.current_context.config.gitea_base_url.get()
if gitea_url not in connect_sources:
connect_sources.append(gitea_url)
add_domain_from_url(
self.current_context.config.gitea_base_url.get(),
connect_sources,
)
# Add configured docs download URLs domains
docs_urls_str = self.current_context.config.docs_download_urls.get()
@@ -8116,33 +8337,67 @@ class ReticulumMeshChat:
if u.strip()
]
for url in docs_urls:
try:
from urllib.parse import urlparse
domain = add_domain_from_url(url, connect_sources)
if domain and "github.com" in domain:
content_domain = "https://objects.githubusercontent.com"
if content_domain not in connect_sources:
connect_sources.append(content_domain)
parsed = urlparse(url)
if parsed.netloc:
domain = f"{parsed.scheme}://{parsed.netloc}"
if domain not in connect_sources:
connect_sources.append(domain)
# Add map tile server domain
map_tile_url = self.current_context.config.map_tile_server_url.get()
add_domain_from_url(map_tile_url, img_sources)
add_domain_from_url(map_tile_url, connect_sources)
# If GitHub is used, also allow objects.githubusercontent.com for redirects
if "github.com" in domain:
content_domain = "https://objects.githubusercontent.com"
if content_domain not in connect_sources:
connect_sources.append(content_domain)
except Exception: # noqa: S110
pass
# Add nominatim API domain
nominatim_url = self.current_context.config.map_nominatim_api_url.get()
add_domain_from_url(nominatim_url, connect_sources)
# Add custom CSP sources from config
def add_extra_sources(extra_str, target_list):
if not extra_str:
return
sources = [
s.strip()
for s in extra_str.replace("\n", ",")
.replace(";", ",")
.split(",")
if s.strip()
]
for s in sources:
if s not in target_list:
target_list.append(s)
add_extra_sources(
self.current_context.config.csp_extra_connect_src.get(),
connect_sources,
)
add_extra_sources(
self.current_context.config.csp_extra_img_src.get(),
img_sources,
)
add_extra_sources(
self.current_context.config.csp_extra_frame_src.get(),
frame_sources,
)
add_extra_sources(
self.current_context.config.csp_extra_script_src.get(),
script_sources,
)
add_extra_sources(
self.current_context.config.csp_extra_style_src.get(),
style_sources,
)
csp = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' 'unsafe-eval'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: blob: https://*.tile.openstreetmap.org https://tile.openstreetmap.org; "
f"script-src {' '.join(script_sources)}; "
f"style-src {' '.join(style_sources)}; "
f"img-src {' '.join(img_sources)}; "
"font-src 'self' data:; "
f"connect-src {' '.join(connect_sources)}; "
"media-src 'self' blob:; "
"worker-src 'self' blob:; "
"frame-src 'self' https://reticulum.network; "
f"frame-src {' '.join(frame_sources)}; "
"object-src 'none'; "
"base-uri 'self';"
)
@@ -8329,6 +8584,47 @@ class ReticulumMeshChat:
# Sleep for 12 hours
await asyncio.sleep(12 * 3600)
async def telemetry_tracking_loop(self, session_id, context=None):
ctx = context or self.current_context
if not ctx:
return
while self.running and ctx.running and ctx.session_id == session_id:
try:
# Only run if telemetry is enabled globally
if not ctx.config.telemetry_enabled.get():
await asyncio.sleep(60)
continue
# Get all tracked peers
tracked_peers = ctx.database.telemetry.get_tracked_peers()
now = time.time()
for peer in tracked_peers:
dest_hash = peer["destination_hash"]
interval = peer.get("interval_seconds", 60)
last_req = peer.get("last_request_at")
if last_req is None or now - last_req >= interval:
print(f"Sending telemetry request to tracked peer: {dest_hash}")
# Send telemetry request
await self.send_message(
destination_hash=dest_hash,
content="",
commands=[{SidebandCommands.TELEMETRY_REQUEST: 0}],
delivery_method="opportunistic",
no_display=False,
context=ctx,
)
# Update last request time
ctx.database.telemetry.update_last_request_at(dest_hash, now)
except Exception as e:
print(f"Telemetry tracking loop error: {e}")
# Check every 10 seconds
await asyncio.sleep(10)
# handle announcing
async def announce(self, context=None):
ctx = context or self.current_context
@@ -8590,6 +8886,24 @@ class ReticulumMeshChat:
if "map_nominatim_api_url" in data:
self.config.map_nominatim_api_url.set(data["map_nominatim_api_url"])
# update location settings
if "location_source" in data:
self.config.location_source.set(data["location_source"])
if "location_manual_lat" in data:
self.config.location_manual_lat.set(str(data["location_manual_lat"]))
if "location_manual_lon" in data:
self.config.location_manual_lon.set(str(data["location_manual_lon"]))
if "location_manual_alt" in data:
self.config.location_manual_alt.set(str(data["location_manual_alt"]))
if "telemetry_enabled" in data:
self.config.telemetry_enabled.set(
self._parse_bool(data["telemetry_enabled"])
)
# update banishment settings
if "banished_effect_enabled" in data:
self.config.banished_effect_enabled.set(
@@ -8644,6 +8958,18 @@ class ReticulumMeshChat:
value = self._parse_bool(data["blackhole_integration_enabled"])
self.config.blackhole_integration_enabled.set(value)
# update csp extra sources
if "csp_extra_connect_src" in data:
self.config.csp_extra_connect_src.set(data["csp_extra_connect_src"])
if "csp_extra_img_src" in data:
self.config.csp_extra_img_src.set(data["csp_extra_img_src"])
if "csp_extra_frame_src" in data:
self.config.csp_extra_frame_src.set(data["csp_extra_frame_src"])
if "csp_extra_script_src" in data:
self.config.csp_extra_script_src.set(data["csp_extra_script_src"])
if "csp_extra_style_src" in data:
self.config.csp_extra_style_src.set(data["csp_extra_style_src"])
# update voicemail settings
if "voicemail_enabled" in data:
self.config.voicemail_enabled.set(
@@ -9604,8 +9930,18 @@ class ReticulumMeshChat:
"desktop_open_calls_in_separate_window": ctx.config.desktop_open_calls_in_separate_window.get(),
"desktop_hardware_acceleration_enabled": ctx.config.desktop_hardware_acceleration_enabled.get(),
"blackhole_integration_enabled": ctx.config.blackhole_integration_enabled.get(),
"csp_extra_connect_src": ctx.config.csp_extra_connect_src.get(),
"csp_extra_img_src": ctx.config.csp_extra_img_src.get(),
"csp_extra_frame_src": ctx.config.csp_extra_frame_src.get(),
"csp_extra_script_src": ctx.config.csp_extra_script_src.get(),
"csp_extra_style_src": ctx.config.csp_extra_style_src.get(),
"telephone_tone_generator_enabled": ctx.config.telephone_tone_generator_enabled.get(),
"telephone_tone_generator_volume": ctx.config.telephone_tone_generator_volume.get(),
"location_source": ctx.config.location_source.get(),
"location_manual_lat": ctx.config.location_manual_lat.get(),
"location_manual_lon": ctx.config.location_manual_lon.get(),
"location_manual_alt": ctx.config.location_manual_alt.get(),
"telemetry_enabled": ctx.config.telemetry_enabled.get(),
}
# try and get a name for the provided identity hash
@@ -9938,18 +10274,82 @@ class ReticulumMeshChat:
print(f"Rejecting LXMF message from blocked source: {source_hash}")
return
# check if this lxmf message contains a telemetry request command from sideband
is_sideband_telemetry_request = False
lxmf_fields = lxmf_message.get_fields()
# check both standard LXMF.FIELD_COMMANDS (9) and FIELD_COMMANDS (1)
commands = []
if LXMF.FIELD_COMMANDS in lxmf_fields:
for command in lxmf_fields[LXMF.FIELD_COMMANDS]:
if SidebandCommands.TELEMETRY_REQUEST in command:
val = lxmf_fields[LXMF.FIELD_COMMANDS]
if isinstance(val, list):
commands.extend(val)
elif isinstance(val, dict):
commands.append(val)
if 0x01 in lxmf_fields and 0x01 != LXMF.FIELD_COMMANDS:
val = lxmf_fields[0x01]
if isinstance(val, list):
commands.extend(val)
elif isinstance(val, dict):
commands.append(val)
if commands:
for command in commands:
if (
(
isinstance(command, dict)
and (
SidebandCommands.TELEMETRY_REQUEST in command
or str(SidebandCommands.TELEMETRY_REQUEST) in command
or f"0x{SidebandCommands.TELEMETRY_REQUEST:02x}"
in command
)
)
or (
isinstance(command, (list, tuple))
and SidebandCommands.TELEMETRY_REQUEST in command
)
or command == SidebandCommands.TELEMETRY_REQUEST
or str(command) == str(SidebandCommands.TELEMETRY_REQUEST)
):
is_sideband_telemetry_request = True
# respond to telemetry requests from sideband
# respond to telemetry requests
if is_sideband_telemetry_request:
print(f"Responding to telemetry request from {source_hash}")
self.handle_telemetry_request(source_hash)
# Check if telemetry is enabled globally
if not ctx.config.telemetry_enabled.get():
print(f"Telemetry is disabled, ignoring request from {source_hash}")
else:
# Check if peer is trusted
contact = ctx.database.contacts.get_contact_by_identity_hash(
source_hash
)
if not contact or not contact.get("is_telemetry_trusted"):
print(
f"Telemetry request from untrusted peer {source_hash}, ignoring"
)
else:
print(f"Responding to telemetry request from {source_hash}")
self.handle_telemetry_request(source_hash)
self.db_upsert_lxmf_message(lxmf_message, context=ctx)
# broadcast notification
AsyncUtils.run_async(
self.websocket_broadcast(
json.dumps(
{
"type": "lxmf.delivery",
"remote_identity_name": source_hash[:8],
"lxmf_message": convert_db_lxmf_message_to_dict(
ctx.database.messages.get_lxmf_message_by_hash(
lxmf_message.hash.hex()
),
include_attachments=False,
),
},
),
),
)
return
# check for spam keywords
@@ -9989,47 +10389,39 @@ class ReticulumMeshChat:
# handle telemetry
try:
message_fields = lxmf_message.get_fields()
# Single telemetry entry
if LXMF.FIELD_TELEMETRY in message_fields:
telemetry_data = message_fields[LXMF.FIELD_TELEMETRY]
# unpack to get timestamp
unpacked = Telemeter.from_packed(telemetry_data)
if unpacked and "time" in unpacked:
timestamp = unpacked["time"]["utc"]
self.process_incoming_telemetry(
source_hash,
message_fields[LXMF.FIELD_TELEMETRY],
lxmf_message,
context=ctx,
)
# physical link info
physical_link = {
"rssi": self.reticulum.get_packet_rssi(lxmf_message.hash)
if hasattr(self, "reticulum") and self.reticulum
else None,
"snr": self.reticulum.get_packet_snr(lxmf_message.hash)
if hasattr(self, "reticulum") and self.reticulum
else None,
"q": self.reticulum.get_packet_q(lxmf_message.hash)
if hasattr(self, "reticulum") and self.reticulum
else None,
}
ctx.database.telemetry.upsert_telemetry(
destination_hash=source_hash,
timestamp=timestamp,
data=telemetry_data,
received_from=ctx.local_lxmf_destination.hexhash,
physical_link=physical_link,
)
# broadcast telemetry update via websocket
AsyncUtils.run_async(
self.websocket_broadcast(
json.dumps(
{
"type": "lxmf.telemetry",
"destination_hash": source_hash,
"timestamp": timestamp,
"telemetry": unpacked,
},
),
),
)
# Telemetry stream (multiple entries)
if (
hasattr(LXMF, "FIELD_TELEMETRY_STREAM")
and LXMF.FIELD_TELEMETRY_STREAM in message_fields
):
stream = message_fields[LXMF.FIELD_TELEMETRY_STREAM]
if isinstance(stream, (list, tuple)):
for entry in stream:
if isinstance(entry, (list, tuple)) and len(entry) >= 3:
entry_source = (
entry[0].hex()
if isinstance(entry[0], bytes)
else entry[0]
)
entry_timestamp = entry[1]
entry_data = entry[2]
self.process_incoming_telemetry(
entry_source,
entry_data,
lxmf_message,
timestamp_override=entry_timestamp,
context=ctx,
)
except Exception as e:
print(f"Failed to handle telemetry in LXMF message: {e}")
@@ -10050,9 +10442,7 @@ class ReticulumMeshChat:
source_hash = lxmf_message.source_hash.hex()
# ignore our own icon and empty payloads to avoid overwriting peers with our appearance
if source_hash and local_hash and source_hash == local_hash:
pass
elif (
if (source_hash and local_hash and source_hash == local_hash) or (
not icon_name or not foreground_colour or not background_colour
):
pass
@@ -10560,6 +10950,66 @@ class ReticulumMeshChat:
data = f"{name}|{fg}|{bg}"
return hashlib.sha256(data.encode()).hexdigest()
def process_incoming_telemetry(
self,
source_hash,
telemetry_data,
lxmf_message,
timestamp_override=None,
context=None,
):
ctx = context or self.current_context
if not ctx:
return
try:
unpacked = Telemeter.from_packed(telemetry_data)
if unpacked:
timestamp = timestamp_override or (
unpacked["time"]["utc"] if "time" in unpacked else int(time.time())
)
# physical link info
physical_link = {
"rssi": self.reticulum.get_packet_rssi(lxmf_message.hash)
if hasattr(self, "reticulum") and self.reticulum
else None,
"snr": self.reticulum.get_packet_snr(lxmf_message.hash)
if hasattr(self, "reticulum") and self.reticulum
else None,
"q": self.reticulum.get_packet_q(lxmf_message.hash)
if hasattr(self, "reticulum") and self.reticulum
else None,
}
ctx.database.telemetry.upsert_telemetry(
destination_hash=source_hash,
timestamp=timestamp,
data=telemetry_data,
received_from=ctx.local_lxmf_destination.hexhash,
physical_link=physical_link,
)
# broadcast telemetry update via websocket
AsyncUtils.run_async(
self.websocket_broadcast(
json.dumps(
{
"type": "lxmf.telemetry",
"destination_hash": source_hash,
"timestamp": timestamp,
"telemetry": unpacked,
"physical_link": physical_link,
"is_tracking": ctx.database.telemetry.is_tracking(
source_hash,
),
},
),
),
)
except Exception as e:
print(f"Error processing incoming telemetry: {e}")
def handle_telemetry_request(self, to_addr_hash: str):
# get our location from config
lat = self.database.config.get("map_default_lat")
@@ -10584,15 +11034,13 @@ class ReticulumMeshChat:
telemetry_data = Telemeter.pack(location=location)
# send as an LXMF message with no content, only telemetry field
# use no_display=True to avoid showing in chat UI
AsyncUtils.run_async(
self.send_message(
destination_hash=to_addr_hash,
content="",
telemetry_data=telemetry_data,
delivery_method="opportunistic",
no_display=True,
no_display=False,
),
)
except Exception as e:
@@ -10672,7 +11120,7 @@ class ReticulumMeshChat:
f" ({display_name})"
if (
display_name := parse_lxmf_display_name(
base64.b64encode(app_data).decode() if app_data else None,
app_data,
None,
)
)

View File

@@ -245,6 +245,9 @@ class ConfigManager:
"https://nominatim.openstreetmap.org",
)
# telemetry config
self.telemetry_enabled = self.BoolConfig(self, "telemetry_enabled", False)
# translator config
self.translator_enabled = self.BoolConfig(self, "translator_enabled", False)
self.libretranslate_url = self.StringConfig(
@@ -253,6 +256,12 @@ class ConfigManager:
"http://localhost:5000",
)
# location config
self.location_source = self.StringConfig(self, "location_source", "browser")
self.location_manual_lat = self.StringConfig(self, "location_manual_lat", "0.0")
self.location_manual_lon = self.StringConfig(self, "location_manual_lon", "0.0")
self.location_manual_alt = self.StringConfig(self, "location_manual_alt", "0.0")
# banishment config
self.banished_effect_enabled = self.BoolConfig(
self,
@@ -279,6 +288,17 @@ class ConfigManager:
True,
)
# csp config so users can set extra CSP sources for local offgrid environments (tile servers, etc.)
self.csp_extra_connect_src = self.StringConfig(
self,
"csp_extra_connect_src",
"",
)
self.csp_extra_img_src = self.StringConfig(self, "csp_extra_img_src", "")
self.csp_extra_frame_src = self.StringConfig(self, "csp_extra_frame_src", "")
self.csp_extra_script_src = self.StringConfig(self, "csp_extra_script_src", "")
self.csp_extra_style_src = self.StringConfig(self, "csp_extra_style_src", "")
def get(self, key: str, default_value=None) -> str | None:
return self.db.config.get(key, default_value)

View File

@@ -13,17 +13,19 @@ class ContactsDAO:
lxst_address=None,
preferred_ringtone_id=None,
custom_image=None,
is_telemetry_trusted=0,
):
self.provider.execute(
"""
INSERT INTO contacts (name, remote_identity_hash, lxmf_address, lxst_address, preferred_ringtone_id, custom_image)
VALUES (?, ?, ?, ?, ?, ?)
INSERT INTO contacts (name, remote_identity_hash, lxmf_address, lxst_address, preferred_ringtone_id, custom_image, is_telemetry_trusted)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(remote_identity_hash) DO UPDATE SET
name = EXCLUDED.name,
lxmf_address = COALESCE(EXCLUDED.lxmf_address, contacts.lxmf_address),
lxst_address = COALESCE(EXCLUDED.lxst_address, contacts.lxst_address),
preferred_ringtone_id = EXCLUDED.preferred_ringtone_id,
custom_image = EXCLUDED.custom_image,
is_telemetry_trusted = EXCLUDED.is_telemetry_trusted,
updated_at = CURRENT_TIMESTAMP
""",
(
@@ -33,6 +35,7 @@ class ContactsDAO:
lxst_address,
preferred_ringtone_id,
custom_image,
is_telemetry_trusted,
),
)
@@ -74,6 +77,7 @@ class ContactsDAO:
preferred_ringtone_id=None,
custom_image=None,
clear_image=False,
is_telemetry_trusted=None,
):
updates = []
params = []
@@ -93,6 +97,9 @@ class ContactsDAO:
if preferred_ringtone_id is not None:
updates.append("preferred_ringtone_id = ?")
params.append(preferred_ringtone_id)
if is_telemetry_trusted is not None:
updates.append("is_telemetry_trusted = ?")
params.append(1 if is_telemetry_trusted else 0)
if clear_image:
updates.append("custom_image = NULL")
elif custom_image is not None:

View File

@@ -2,7 +2,7 @@ from .provider import DatabaseProvider
class DatabaseSchema:
LATEST_VERSION = 36
LATEST_VERSION = 37
def __init__(self, provider: DatabaseProvider):
self.provider = provider
@@ -348,6 +348,17 @@ class DatabaseSchema:
UNIQUE(destination_hash, timestamp)
)
""",
"telemetry_tracking": """
CREATE TABLE IF NOT EXISTS telemetry_tracking (
id INTEGER PRIMARY KEY AUTOINCREMENT,
destination_hash TEXT UNIQUE,
is_tracking INTEGER DEFAULT 1,
interval_seconds INTEGER DEFAULT 60,
last_request_at REAL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""",
"ringtones": """
CREATE TABLE IF NOT EXISTS ringtones (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -977,6 +988,17 @@ class DatabaseSchema:
"CREATE INDEX IF NOT EXISTS idx_lxmf_conversation_folders_folder_id ON lxmf_conversation_folders(folder_id)",
)
if current_version < 37:
# Add is_telemetry_trusted to contacts
self._safe_execute(
"ALTER TABLE contacts ADD COLUMN is_telemetry_trusted INTEGER DEFAULT 0",
)
# Ensure telemetry_enabled exists in config and is false by default
self._safe_execute(
"INSERT OR IGNORE INTO config (key, value) VALUES (?, ?)",
("telemetry_enabled", "false"),
)
# Update version in config
self._safe_execute(
"""

View File

@@ -65,3 +65,42 @@ class TelemetryDAO:
"DELETE FROM lxmf_telemetry WHERE destination_hash = ?",
(destination_hash,),
)
def is_tracking(self, destination_hash):
row = self.provider.fetchone(
"SELECT is_tracking FROM telemetry_tracking WHERE destination_hash = ?",
(destination_hash,),
)
return bool(row["is_tracking"]) if row else False
def toggle_tracking(self, destination_hash, is_tracking=None):
if is_tracking is None:
is_tracking = not self.is_tracking(destination_hash)
now = datetime.now(UTC).isoformat()
self.provider.execute(
"""
INSERT INTO telemetry_tracking (destination_hash, is_tracking, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(destination_hash) DO UPDATE SET
is_tracking = EXCLUDED.is_tracking,
updated_at = EXCLUDED.updated_at
""",
(destination_hash, int(is_tracking), now),
)
return is_tracking
def get_tracked_peers(self):
return self.provider.fetchall(
"SELECT * FROM telemetry_tracking WHERE is_tracking = 1",
)
def update_last_request_at(self, destination_hash, timestamp=None):
if timestamp is None:
import time
timestamp = time.time()
self.provider.execute(
"UPDATE telemetry_tracking SET last_request_at = ? WHERE destination_hash = ?",
(timestamp, destination_hash),
)

View File

@@ -21,6 +21,7 @@ from meshchatx.src.backend.nomadnet_utils import NomadNetworkManager
from meshchatx.src.backend.ringtone_manager import RingtoneManager
from meshchatx.src.backend.rncp_handler import RNCPHandler
from meshchatx.src.backend.rnpath_handler import RNPathHandler
from meshchatx.src.backend.rnpath_trace_handler import RNPathTraceHandler
from meshchatx.src.backend.rnprobe_handler import RNProbeHandler
from meshchatx.src.backend.rnstatus_handler import RNStatusHandler
from meshchatx.src.backend.telephone_manager import TelephoneManager
@@ -72,6 +73,8 @@ class IdentityContext:
self.ringtone_manager = None
self.rncp_handler = None
self.rnstatus_handler = None
self.rnpath_handler = None
self.rnpath_trace_handler = None
self.rnprobe_handler = None
self.translator_handler = None
self.bot_handler = None
@@ -228,6 +231,10 @@ class IdentityContext:
self.rnpath_handler = RNPathHandler(
reticulum_instance=getattr(self.app, "reticulum", None),
)
self.rnpath_trace_handler = RNPathTraceHandler(
reticulum_instance=getattr(self.app, "reticulum", None),
identity=self.identity,
)
self.rnprobe_handler = RNProbeHandler(
reticulum_instance=getattr(self.app, "reticulum", None),
identity=self.identity,
@@ -360,6 +367,14 @@ class IdentityContext:
thread.daemon = True
thread.start()
# start background thread for telemetry tracking loop
thread = threading.Thread(
target=asyncio.run,
args=(self.app.telemetry_tracking_loop(self.session_id, context=self),),
)
thread.daemon = True
thread.start()
def register_announce_handlers(self):
handlers = [
AnnounceHandler(
@@ -468,6 +483,11 @@ class IdentityContext:
self.message_router.jobs = lambda: None
if hasattr(self.message_router, "exit_handler"):
self.message_router.exit_handler()
# Give LXMF/RNS a moment to finish any final disk writes
import time
time.sleep(1.0)
except Exception as e:
print(
f"Error while tearing down LXMRouter for {self.identity_hash}: {e}",

View File

@@ -1,3 +1,4 @@
import fnmatch
import hashlib
import json
import os
@@ -8,6 +9,21 @@ from pathlib import Path
class IntegrityManager:
"""Manages the integrity of the database and identity files at rest."""
# Files and directories that are frequently modified by RNS/LXMF or SQLite
# and should be ignored during integrity checks.
IGNORED_PATTERNS = [
"*-wal",
"*-shm",
"*-journal",
"*.tmp",
"*.lock",
"*.log",
"*~",
".DS_Store",
"Thumbs.db",
"integrity-manifest.json",
]
def __init__(self, storage_dir, database_path, identity_hash=None):
self.storage_dir = Path(storage_dir)
self.database_path = Path(database_path)
@@ -15,6 +31,35 @@ class IntegrityManager:
self.manifest_path = self.storage_dir / "integrity-manifest.json"
self.issues = []
def _should_ignore(self, rel_path):
"""Determine if a file path should be ignored based on name or directory."""
path = Path(rel_path)
path_parts = path.parts
# Check for volatile LXMF/RNS directories
# We only ignore these if they are inside the lxmf_router directory
# to avoid accidentally ignoring important files with similar names.
if "lxmf_router" in path_parts:
if any(
part in ["announces", "storage", "identities"] for part in path_parts
):
return True
# Check for other generally ignored directories
if any(
part in ["tmp", "recordings", "greetings", "docs", "bots", "ringtones"]
for part in path_parts
):
return True
filename = path_parts[-1]
# Check against IGNORED_PATTERNS
if any(fnmatch.fnmatch(filename, pattern) for pattern in self.IGNORED_PATTERNS):
return True
return False
def _hash_file(self, file_path):
if not os.path.exists(file_path):
return None
@@ -34,37 +79,47 @@ class IntegrityManager:
manifest = json.load(f)
issues = []
manifest_files = manifest.get("files", {})
# Check Database
db_rel = str(self.database_path.relative_to(self.storage_dir))
actual_db_hash = self._hash_file(self.database_path)
if actual_db_hash and actual_db_hash != manifest.get("files", {}).get(
db_rel,
):
issues.append(f"Database modified: {db_rel}")
if self.database_path.exists():
db_rel = str(self.database_path.relative_to(self.storage_dir))
actual_db_hash = self._hash_file(self.database_path)
if actual_db_hash and actual_db_hash != manifest_files.get(db_rel):
issues.append(f"Database modified: {db_rel}")
# Check Identities and other critical files in storage_dir
# Check other critical files in storage_dir
for root, _, files_in_dir in os.walk(self.storage_dir):
for file in files_in_dir:
full_path = Path(root) / file
# Skip the manifest itself and temporary sqlite files
if (
file == "integrity-manifest.json"
or file.endswith("-wal")
or file.endswith("-shm")
):
rel_path = str(full_path.relative_to(self.storage_dir))
if self._should_ignore(rel_path):
continue
# Database already checked separately, skip here to avoid double reporting
if full_path == self.database_path:
continue
rel_path = str(full_path.relative_to(self.storage_dir))
actual_hash = self._hash_file(full_path)
if rel_path in manifest.get("files", {}):
if actual_hash != manifest["files"][rel_path]:
if rel_path in manifest_files:
if actual_hash != manifest_files[rel_path]:
issues.append(f"File modified: {rel_path}")
else:
# New files are also a concern for integrity
# but we only report them if they are not in ignored dirs/patterns
issues.append(f"New file detected: {rel_path}")
# Check for missing files that were in manifest
for rel_path in manifest_files:
if self._should_ignore(rel_path):
continue
full_path = self.storage_dir / rel_path
if not full_path.exists():
issues.append(f"File missing: {rel_path}")
if issues:
m_date = manifest.get("date", "Unknown")
m_time = manifest.get("time", "Unknown")
@@ -85,6 +140,9 @@ class IntegrityManager:
self.issues = issues
return len(issues) == 0, issues
except Exception as e:
import traceback
traceback.print_exc()
return False, [f"Integrity check failed: {e!s}"]
def save_manifest(self):
@@ -96,15 +154,11 @@ class IntegrityManager:
for root, _, files_in_dir in os.walk(self.storage_dir):
for file in files_in_dir:
full_path = Path(root) / file
# Skip the manifest itself and temporary sqlite files
if (
file == "integrity-manifest.json"
or file.endswith("-wal")
or file.endswith("-shm")
):
rel_path = str(full_path.relative_to(self.storage_dir))
if self._should_ignore(rel_path):
continue
rel_path = str(full_path.relative_to(self.storage_dir))
files[rel_path] = self._hash_file(full_path)
now = datetime.now(UTC)

View File

@@ -23,15 +23,17 @@ def convert_lxmf_message_to_dict(
file_attachments = []
for file_attachment in value:
file_name = file_attachment[0]
file_data = file_attachment[1]
file_bytes = None
if include_attachments:
file_bytes = base64.b64encode(file_attachment[1]).decode(
file_bytes = base64.b64encode(file_data).decode(
"utf-8",
)
file_attachments.append(
{
"file_name": file_name,
"file_size": len(file_data),
"file_bytes": file_bytes,
},
)
@@ -42,24 +44,28 @@ def convert_lxmf_message_to_dict(
# handle image field
if field_type == LXMF.FIELD_IMAGE:
image_type = value[0]
image_data = value[1]
image_bytes = None
if include_attachments:
image_bytes = base64.b64encode(value[1]).decode("utf-8")
image_bytes = base64.b64encode(image_data).decode("utf-8")
fields["image"] = {
"image_type": image_type,
"image_size": len(image_data),
"image_bytes": image_bytes,
}
# handle audio field
if field_type == LXMF.FIELD_AUDIO:
audio_mode = value[0]
audio_data = value[1]
audio_bytes = None
if include_attachments:
audio_bytes = base64.b64encode(value[1]).decode("utf-8")
audio_bytes = base64.b64encode(audio_data).decode("utf-8")
fields["audio"] = {
"audio_mode": audio_mode,
"audio_size": len(audio_data),
"audio_bytes": audio_bytes,
}
@@ -67,6 +73,59 @@ def convert_lxmf_message_to_dict(
if field_type == LXMF.FIELD_TELEMETRY:
fields["telemetry"] = Telemeter.from_packed(value)
# handle commands field
if field_type == LXMF.FIELD_COMMANDS or field_type == 0x01:
# value is usually a list of dicts, or a single dict
if isinstance(value, dict):
# convert dict keys back to ints if they look like hex or int strings
new_cmd = {}
for k, v in value.items():
try:
ki = None
if isinstance(k, int):
ki = k
elif isinstance(k, str):
if k.startswith("0x"):
ki = int(k, 16)
else:
ki = int(k)
if ki is not None:
new_cmd[f"0x{ki:02x}"] = v
else:
new_cmd[str(k)] = v
except (ValueError, TypeError):
new_cmd[str(k)] = v
fields["commands"] = [new_cmd]
elif isinstance(value, list):
processed_commands = []
for cmd in value:
if isinstance(cmd, dict):
new_cmd = {}
for k, v in cmd.items():
try:
ki = None
if isinstance(k, int):
ki = k
elif isinstance(k, str):
if k.startswith("0x"):
ki = int(k, 16)
else:
ki = int(k)
if ki is not None:
new_cmd[f"0x{ki:02x}"] = v
else:
new_cmd[str(k)] = v
except (ValueError, TypeError):
new_cmd[str(k)] = v
processed_commands.append(new_cmd)
else:
processed_commands.append(cmd)
fields["commands"] = processed_commands
else:
fields["commands"] = value
# convert 0.0-1.0 progress to 0.00-100 percentage
progress_percentage = round(lxmf_message.progress * 100, 2)
@@ -164,13 +223,44 @@ def convert_db_lxmf_message_to_dict(
if not isinstance(fields, dict):
fields = {}
# normalize commands if present
if "commands" in fields:
cmds = fields["commands"]
if isinstance(cmds, list):
new_cmds = []
for cmd in cmds:
if isinstance(cmd, dict):
new_cmd = {}
for k, v in cmd.items():
# normalize key to 0xXX format if it's a number string
try:
ki = None
if isinstance(k, int):
ki = k
elif isinstance(k, str):
if k.startswith("0x"):
ki = int(k, 16)
else:
ki = int(k)
if ki is not None:
new_cmd[f"0x{ki:02x}"] = v
else:
new_cmd[str(k)] = v
except (ValueError, TypeError):
new_cmd[str(k)] = v
new_cmds.append(new_cmd)
else:
new_cmds.append(cmd)
fields["commands"] = new_cmds
# strip attachments if requested
if not include_attachments:
if "image" in fields:
# keep type but strip bytes
image_size = 0
image_size = fields["image"].get("image_size") or 0
b64_bytes = fields["image"].get("image_bytes")
if b64_bytes:
if not image_size and b64_bytes:
# Optimized size calculation without full decoding
image_size = (len(b64_bytes) * 3) // 4
if b64_bytes.endswith("=="):
@@ -184,9 +274,9 @@ def convert_db_lxmf_message_to_dict(
}
if "audio" in fields:
# keep mode but strip bytes
audio_size = 0
audio_size = fields["audio"].get("audio_size") or 0
b64_bytes = fields["audio"].get("audio_bytes")
if b64_bytes:
if not audio_size and b64_bytes:
audio_size = (len(b64_bytes) * 3) // 4
if b64_bytes.endswith("=="):
audio_size -= 2
@@ -200,9 +290,9 @@ def convert_db_lxmf_message_to_dict(
if "file_attachments" in fields:
# keep file names but strip bytes
for i in range(len(fields["file_attachments"])):
file_size = 0
file_size = fields["file_attachments"][i].get("file_size") or 0
b64_bytes = fields["file_attachments"][i].get("file_bytes")
if b64_bytes:
if not file_size and b64_bytes:
file_size = (len(b64_bytes) * 3) // 4
if b64_bytes.endswith("=="):
file_size -= 2

View File

@@ -116,29 +116,59 @@ def convert_db_favourite_to_dict(favourite):
def parse_lxmf_display_name(
app_data_base64: str | None,
app_data_base64: str | bytes | None,
default_value: str | None = "Anonymous Peer",
):
if app_data_base64 is None:
return default_value
try:
app_data_bytes = base64.b64decode(app_data_base64)
display_name = LXMF.display_name_from_app_data(app_data_bytes)
if display_name is not None:
return display_name
if isinstance(app_data_base64, bytes):
app_data_bytes = app_data_base64
else:
app_data_bytes = base64.b64decode(app_data_base64)
# Try using the library first
try:
display_name = LXMF.display_name_from_app_data(app_data_bytes)
if display_name is not None:
return display_name
except (AttributeError, Exception):
# Handle cases where library might fail or has the 'str' object has no attribute 'decode' bug
pass
# Fallback manual parsing if library failed or returned None
if len(app_data_bytes) > 0:
# Version 0.5.0+ announce format (msgpack list)
if (
app_data_bytes[0] >= 0x90 and app_data_bytes[0] <= 0x9F
) or app_data_bytes[0] == 0xDC:
try:
peer_data = msgpack.unpackb(app_data_bytes)
if isinstance(peer_data, list) and len(peer_data) >= 1:
dn = peer_data[0]
if dn is not None:
if isinstance(dn, bytes):
return dn.decode("utf-8")
return str(dn)
except Exception:
pass
except Exception as e:
print(f"Failed to parse LXMF display name: {e}")
return default_value
def parse_lxmf_stamp_cost(app_data_base64: str | None):
def parse_lxmf_stamp_cost(app_data_base64: str | bytes | None):
if app_data_base64 is None:
return None
try:
app_data_bytes = base64.b64decode(app_data_base64)
if isinstance(app_data_base64, bytes):
app_data_bytes = app_data_base64
else:
app_data_bytes = base64.b64decode(app_data_base64)
return LXMF.stamp_cost_from_app_data(app_data_bytes)
except Exception as e:
print(f"Failed to parse LXMF stamp cost: {e}")
@@ -146,26 +176,34 @@ def parse_lxmf_stamp_cost(app_data_base64: str | None):
def parse_nomadnetwork_node_display_name(
app_data_base64: str | None,
app_data_base64: str | bytes | None,
default_value: str | None = "Anonymous Node",
):
if app_data_base64 is None:
return default_value
try:
app_data_bytes = base64.b64decode(app_data_base64)
if isinstance(app_data_base64, bytes):
app_data_bytes = app_data_base64
else:
app_data_bytes = base64.b64decode(app_data_base64)
return app_data_bytes.decode("utf-8")
except Exception as e:
print(f"Failed to parse NomadNetwork display name: {e}")
return default_value
def parse_lxmf_propagation_node_app_data(app_data_base64: str | None):
def parse_lxmf_propagation_node_app_data(app_data_base64: str | bytes | None):
if app_data_base64 is None:
return None
try:
app_data_bytes = base64.b64decode(app_data_base64)
if isinstance(app_data_base64, bytes):
app_data_bytes = app_data_base64
else:
app_data_bytes = base64.b64decode(app_data_base64)
data = msgpack.unpackb(app_data_bytes)
if not isinstance(data, list) or len(data) < 4:

View File

@@ -0,0 +1,100 @@
import asyncio
import time
import traceback
import RNS
class RNPathTraceHandler:
def __init__(self, reticulum_instance, identity):
self.reticulum = reticulum_instance
self.identity = identity
async def trace_path(self, destination_hash_str):
try:
try:
destination_hash = bytes.fromhex(destination_hash_str)
except Exception:
return {"error": "Invalid destination hash"}
# Request path if we don't have it
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
timeout = 10
start_time = time.time()
while (
not RNS.Transport.has_path(destination_hash)
and time.time() - start_time < timeout
):
await asyncio.sleep(0.2)
if not RNS.Transport.has_path(destination_hash):
return {"error": "Path not found after timeout"}
hops = RNS.Transport.hops_to(destination_hash)
next_hop_bytes = None
next_hop_interface = None
if self.reticulum:
try:
next_hop_bytes = self.reticulum.get_next_hop(destination_hash)
next_hop_interface = self.reticulum.get_next_hop_if_name(
destination_hash,
)
except Exception as e:
print(f"Error calling reticulum methods: {e}")
path = []
# Me
local_hash = "unknown"
if self.identity and hasattr(self.identity, "hash"):
local_hash = self.identity.hash.hex()
elif (
self.reticulum
and hasattr(self.reticulum, "identity")
and self.reticulum.identity
):
local_hash = self.reticulum.identity.hash.hex()
path.append({"type": "local", "hash": local_hash, "name": "Local Node"})
if hops == 1:
# Direct
path.append(
{
"type": "destination",
"hash": destination_hash_str,
"hops": 1,
"interface": next_hop_interface,
},
)
elif hops > 1:
# Next hop
path.append(
{
"type": "hop",
"hash": next_hop_bytes.hex() if next_hop_bytes else None,
"name": "Next Hop",
"interface": next_hop_interface,
"hop_number": 1,
},
)
# Intermediate unknown hops
if hops > 2:
path.append({"type": "unknown", "count": hops - 2})
# Destination
path.append(
{"type": "destination", "hash": destination_hash_str, "hops": hops},
)
return {
"destination": destination_hash_str,
"hops": hops,
"path": path,
"interface": next_hop_interface,
"next_hop": next_hop_bytes.hex() if next_hop_bytes else None,
}
except Exception as e:
return {"error": f"Trace failed: {e}\n{traceback.format_exc()}"}

View File

@@ -70,7 +70,7 @@ class Telemeter:
struct.pack("!I", int(round(speed, 2) * 1e2)),
struct.pack("!i", int(round(bearing, 2) * 1e2)),
struct.pack("!H", int(round(accuracy, 2) * 1e2)),
int(last_update or time.time()),
int(last_update) if last_update is not None else int(time.time()),
]
except Exception:
return None
@@ -84,15 +84,33 @@ class Telemeter:
res["time"] = {"utc": p[Sensor.SID_TIME]}
if Sensor.SID_LOCATION in p:
res["location"] = Telemeter.unpack_location(p[Sensor.SID_LOCATION])
if Sensor.SID_PHYSICAL_LINK in p:
pl = p[Sensor.SID_PHYSICAL_LINK]
if isinstance(pl, (list, tuple)) and len(pl) >= 3:
res["physical_link"] = {"rssi": pl[0], "snr": pl[1], "q": pl[2]}
if Sensor.SID_BATTERY in p:
b = p[Sensor.SID_BATTERY]
if isinstance(b, (list, tuple)) and len(b) >= 2:
res["battery"] = {"charge_percent": b[0], "charging": b[1]}
# Add other sensors as needed
return res
except Exception:
return None
@staticmethod
def pack(time_utc=None, location=None):
def pack(time_utc=None, location=None, battery=None, physical_link=None):
p = {}
p[Sensor.SID_TIME] = int(time_utc or time.time())
if location:
p[Sensor.SID_LOCATION] = Telemeter.pack_location(**location)
if battery:
# battery should be [charge_percent, charging]
p[Sensor.SID_BATTERY] = [battery["charge_percent"], battery["charging"]]
if physical_link:
# physical_link should be [rssi, snr, q]
p[Sensor.SID_PHYSICAL_LINK] = [
physical_link["rssi"],
physical_link["snr"],
physical_link["q"],
]
return umsgpack.packb(p)