From 0889cd0a049c6706bf41db77434579dee930d13c Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Thu, 1 Jan 2026 17:34:42 -0600 Subject: [PATCH] feat(telemetry): add Sensor and Telemeter classes for telemetry data handling and location packing/unpacking --- meshchatx/src/backend/telemetry_utils.py | 88 ++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 meshchatx/src/backend/telemetry_utils.py diff --git a/meshchatx/src/backend/telemetry_utils.py b/meshchatx/src/backend/telemetry_utils.py new file mode 100644 index 0000000..b7c18a0 --- /dev/null +++ b/meshchatx/src/backend/telemetry_utils.py @@ -0,0 +1,88 @@ +import struct +import time +import RNS.vendor.umsgpack as umsgpack + +class Sensor: + SID_NONE = 0x00 + SID_TIME = 0x01 + SID_LOCATION = 0x02 + SID_PRESSURE = 0x03 + SID_BATTERY = 0x04 + SID_PHYSICAL_LINK = 0x05 + SID_ACCELERATION = 0x06 + SID_TEMPERATURE = 0x07 + SID_HUMIDITY = 0x08 + SID_MAGNETIC_FIELD = 0x09 + SID_AMBIENT_LIGHT = 0x0A + SID_GRAVITY = 0x0B + SID_ANGULAR_VELOCITY = 0x0C + SID_PROXIMITY = 0x0E + SID_INFORMATION = 0x0F + SID_RECEIVED = 0x10 + SID_POWER_CONSUMPTION = 0x11 + SID_POWER_PRODUCTION = 0x12 + SID_PROCESSOR = 0x13 + SID_RAM = 0x14 + SID_NVM = 0x15 + SID_TANK = 0x16 + SID_FUEL = 0x17 + SID_RNS_TRANSPORT = 0x19 + SID_LXMF_PROPAGATION = 0x18 + SID_CONNECTION_MAP = 0x1A + SID_CUSTOM = 0xff + +class Telemeter: + @staticmethod + def unpack_location(packed): + try: + if packed is None: + return None + return { + "latitude": struct.unpack("!i", packed[0])[0]/1e6, + "longitude": struct.unpack("!i", packed[1])[0]/1e6, + "altitude": struct.unpack("!i", packed[2])[0]/1e2, + "speed": struct.unpack("!I", packed[3])[0]/1e2, + "bearing": struct.unpack("!i", packed[4])[0]/1e2, + "accuracy": struct.unpack("!H", packed[5])[0]/1e2, + "last_update": packed[6], + } + except Exception: + return None + + @staticmethod + def pack_location(latitude, longitude, altitude=0, speed=0, bearing=0, accuracy=0, last_update=None): + try: + return [ + struct.pack("!i", int(round(latitude, 6)*1e6)), + struct.pack("!i", int(round(longitude, 6)*1e6)), + struct.pack("!i", int(round(altitude, 2)*1e2)), + struct.pack("!I", int(round(speed, 2)*1e2)), + struct.pack("!i", int(round(bearing, 2)*1e2)), + struct.pack("!H", int(round(accuracy, 2)*1e2)), + int(last_update or time.time()), + ] + except Exception: + return None + + @staticmethod + def from_packed(packed): + try: + p = umsgpack.unpackb(packed) + res = {} + if Sensor.SID_TIME in p: + res["time"] = {"utc": p[Sensor.SID_TIME]} + if Sensor.SID_LOCATION in p: + res["location"] = Telemeter.unpack_location(p[Sensor.SID_LOCATION]) + # Add other sensors as needed + return res + except Exception: + return None + + @staticmethod + def pack(time_utc=None, location=None): + p = {} + p[Sensor.SID_TIME] = int(time_utc or time.time()) + if location: + p[Sensor.SID_LOCATION] = Telemeter.pack_location(**location) + return umsgpack.packb(p) +