From 9d8611bb97df55bcb1fb38bc6de17e9f7846ffa2 Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Sun, 4 Jan 2026 23:18:55 -0600 Subject: [PATCH] feat(audio): implement WebAudioBridge for websocket audio transport and add configuration options --- meshchatx/meshchat.py | 90 +++++++++ meshchatx/src/backend/config_manager.py | 10 + meshchatx/src/backend/web_audio_bridge.py | 236 ++++++++++++++++++++++ 3 files changed, 336 insertions(+) create mode 100644 meshchatx/src/backend/web_audio_bridge.py diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index 2aa6270..99eff2a 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -85,6 +85,7 @@ from meshchatx.src.backend.recovery import CrashRecovery from meshchatx.src.backend.rnprobe_handler import RNProbeHandler from meshchatx.src.backend.sideband_commands import SidebandCommands from meshchatx.src.backend.telemetry_utils import Telemeter +from meshchatx.src.backend.web_audio_bridge import WebAudioBridge from meshchatx.src.version import __version__ as app_version import logging @@ -233,6 +234,7 @@ class ReticulumMeshChat: self.current_context: IdentityContext | None = None self.setup_identity(identity) + self.web_audio_bridge = WebAudioBridge(None, None) # Proxy properties for backward compatibility @property @@ -497,6 +499,10 @@ class ReticulumMeshChat: self.current_context = self.contexts[identity_hash] if not self.current_context.running: self.current_context.setup() + self.web_audio_bridge = WebAudioBridge( + self.current_context.telephone_manager, + self.current_context.config, + ) return # Initialize Reticulum if not already done @@ -508,6 +514,9 @@ class ReticulumMeshChat: self.contexts[identity_hash] = context self.current_context = context context.setup() + self.web_audio_bridge = WebAudioBridge( + context.telephone_manager, context.config + ) # Link database to memory log handler memory_log_handler.set_database(context.database) @@ -1745,6 +1754,10 @@ class ReticulumMeshChat: print( f"on_telephone_call_ended: {caller_identity.hash.hex() if caller_identity else 'Unknown'}", ) + try: + self.web_audio_bridge.on_call_ended() + except Exception: + pass # Record call history if caller_identity: @@ -3142,6 +3155,51 @@ class ReticulumMeshChat: return websocket_response + @routes.get("/ws/telephone/audio") + async def telephone_audio_ws(request): + websocket_response = web.WebSocketResponse( + max_msg_size=5 * 1024 * 1024, + ) + await websocket_response.prepare(request) + + # Early guard on config + if not self.web_audio_bridge.config_enabled(): + await websocket_response.send_str( + json.dumps( + {"type": "error", "message": "Web audio is disabled in config"}, + ), + ) + else: + await self.web_audio_bridge.send_status(websocket_response) + attached = self.web_audio_bridge.attach_client(websocket_response) + if not attached: + await websocket_response.send_str( + json.dumps( + {"type": "error", "message": "No active call to attach"}, + ), + ) + + async for msg in websocket_response: + msg: WSMessage = msg + if msg.type == WSMsgType.BINARY: + self.web_audio_bridge.push_client_frame(msg.data) + elif msg.type == WSMsgType.TEXT: + try: + data = json.loads(msg.data) + if data.get("type") == "attach": + self.web_audio_bridge.attach_client(websocket_response) + elif data.get("type") == "ping": + await websocket_response.send_str( + json.dumps({"type": "pong"}) + ) + except Exception: + pass + elif msg.type == WSMsgType.ERROR: + print(f"telephone audio ws error {websocket_response.exception()}") + + self.web_audio_bridge.detach_client(websocket_response) + return websocket_response + # get app info @routes.get("/api/v1/app/info") async def app_info(request): @@ -3990,6 +4048,26 @@ class ReticulumMeshChat: "initiation_status": self.telephone_manager.initiation_status, "initiation_target_hash": initiation_target_hash, "initiation_target_name": initiation_target_name, + "web_audio": { + "enabled": getattr( + self.config.telephone_web_audio_enabled, + "get", + lambda: False, + )(), + "allow_fallback": getattr( + self.config.telephone_web_audio_allow_fallback, + "get", + lambda: True, + )(), + "has_client": bool( + getattr(self.web_audio_bridge, "clients", []), + ), + "frame_ms": getattr( + self.telephone_manager.telephone, + "target_frame_time_ms", + None, + ), + }, }, ) @@ -7921,6 +7999,16 @@ class ReticulumMeshChat: profile_id, ) + if "telephone_web_audio_enabled" in data: + self.config.telephone_web_audio_enabled.set( + self._parse_bool(data["telephone_web_audio_enabled"]), + ) + + if "telephone_web_audio_allow_fallback" in data: + self.config.telephone_web_audio_allow_fallback.set( + self._parse_bool(data["telephone_web_audio_allow_fallback"]), + ) + if "translator_enabled" in data: value = self._parse_bool(data["translator_enabled"]) self.config.translator_enabled.set(value) @@ -8728,6 +8816,8 @@ class ReticulumMeshChat: "do_not_disturb_enabled": ctx.config.do_not_disturb_enabled.get(), "telephone_allow_calls_from_contacts_only": ctx.config.telephone_allow_calls_from_contacts_only.get(), "telephone_audio_profile_id": ctx.config.telephone_audio_profile_id.get(), + "telephone_web_audio_enabled": ctx.config.telephone_web_audio_enabled.get(), + "telephone_web_audio_allow_fallback": ctx.config.telephone_web_audio_allow_fallback.get(), "call_recording_enabled": ctx.config.call_recording_enabled.get(), "banished_effect_enabled": ctx.config.banished_effect_enabled.get(), "banished_text": ctx.config.banished_text.get(), diff --git a/meshchatx/src/backend/config_manager.py b/meshchatx/src/backend/config_manager.py index c86d249..a623e57 100644 --- a/meshchatx/src/backend/config_manager.py +++ b/meshchatx/src/backend/config_manager.py @@ -189,6 +189,16 @@ class ConfigManager: "telephone_audio_profile_id", 2, # Default to Voice (profile 2) ) + self.telephone_web_audio_enabled = self.BoolConfig( + self, + "telephone_web_audio_enabled", + False, + ) + self.telephone_web_audio_allow_fallback = self.BoolConfig( + self, + "telephone_web_audio_allow_fallback", + True, + ) self.call_recording_enabled = self.BoolConfig( self, "call_recording_enabled", diff --git a/meshchatx/src/backend/web_audio_bridge.py b/meshchatx/src/backend/web_audio_bridge.py new file mode 100644 index 0000000..0aa5a19 --- /dev/null +++ b/meshchatx/src/backend/web_audio_bridge.py @@ -0,0 +1,236 @@ +import asyncio +import json +import threading +from typing import Optional + +import numpy as np +import RNS +from LXST.Codecs import Null, Raw +from LXST.Mixer import Mixer +from LXST.Pipeline import Pipeline +from LXST.Sinks import LocalSink +from LXST.Sources import LocalSource + +from .telephone_manager import Tee + + +def _log_debug(msg: str): + RNS.log(msg, RNS.LOG_DEBUG) + + +class WebAudioSource(LocalSource): + """Injects PCM frames (int16 little-endian) received over websocket into the transmit mixer.""" + + def __init__(self, target_frame_ms: int, sink: Mixer): + self.target_frame_ms = target_frame_ms or 60 + self.sink = sink + self.codec = Raw(channels=1, bitdepth=16) + self.channels = 1 + self.samplerate = 48000 + self.bitdepth = 16 + + def start(self): + # Nothing to start; frames are pushed from the websocket thread. + pass + + def stop(self): + # Nothing to stop; kept for interface compatibility. + pass + + def can_receive(self, from_source=None): + return True + + def handle_frame(self, frame, source=None): + # Not used; frames are pushed via push_pcm. + pass + + def push_pcm(self, pcm_bytes: bytes): + try: + samples = ( + np.frombuffer(pcm_bytes, dtype=np.int16).astype(np.float32) / 32768.0 + ) + if samples.size == 0: + return + samples = samples.reshape(-1, 1) + frame = self.codec.encode(samples) + if self.sink and self.sink.can_receive(from_source=self): + self.sink.handle_frame(frame, self) + except Exception as exc: # noqa: BLE001 + RNS.log(f"WebAudioSource: failed to push pcm: {exc}", RNS.LOG_ERROR) + + +class WebAudioSink(LocalSink): + """Pushes received PCM frames to websocket clients.""" + + def __init__(self, loop: asyncio.AbstractEventLoop, send_bytes): + self.loop = loop + self.send_bytes = send_bytes + + def can_receive(self, from_source=None): + return True + + def handle_frame(self, frame, source): + try: + # frame is expected to be numpy float PCM from receive mixer + if hasattr(frame, "astype"): + samples = np.clip(frame, -1.0, 1.0).astype(np.float32) + pcm = (samples * 32767.0).astype(np.int16).tobytes() + else: + pcm = frame + self.loop.call_soon_threadsafe(asyncio.create_task, self.send_bytes(pcm)) + except Exception as exc: # noqa: BLE001 + RNS.log(f"WebAudioSink: failed to handle frame: {exc}", RNS.LOG_ERROR) + + +class WebAudioBridge: + """Coordinates websocket audio transport with an active LXST telephone call.""" + + def __init__(self, telephone_manager, config_manager): + self.telephone_manager = telephone_manager + self.config_manager = config_manager + self.clients = set() + self.tx_source: Optional[WebAudioSource] = None + self.rx_sink: Optional[WebAudioSink] = None + self.rx_tee: Optional[Tee] = None + self.loop = asyncio.get_event_loop() + self.lock = threading.Lock() + + def _tele(self): + return getattr(self.telephone_manager, "telephone", None) + + def config_enabled(self): + return ( + self.config_manager + and hasattr(self.config_manager, "telephone_web_audio_enabled") + and self.config_manager.telephone_web_audio_enabled.get() + ) + + def allow_fallback(self): + return ( + self.config_manager + and hasattr(self.config_manager, "telephone_web_audio_allow_fallback") + and self.config_manager.telephone_web_audio_allow_fallback.get() + ) + + def attach_client(self, client): + with self.lock: + self.clients.add(client) + tele = self._tele() + if not tele or not tele.active_call: + return False + self._ensure_remote_tx(tele) + self._ensure_rx_tee(tele) + return True + + def detach_client(self, client): + with self.lock: + if client in self.clients: + self.clients.remove(client) + if not self.clients and self.allow_fallback(): + self._restore_host_audio() + + async def send_status(self, client): + tele = self._tele() + frame_ms = getattr(tele, "target_frame_time_ms", None) or 60 + await client.send_str( + json.dumps( + { + "type": "web_audio.ready", + "frame_ms": frame_ms, + } + ) + ) + + def push_client_frame(self, pcm_bytes: bytes): + with self.lock: + if not self.tx_source: + return + self.tx_source.push_pcm(pcm_bytes) + + async def _send_bytes_to_all(self, pcm_bytes: bytes): + stale = [] + for ws in list(self.clients): + try: + await ws.send_bytes(pcm_bytes) + except Exception: + stale.append(ws) + for ws in stale: + self.detach_client(ws) + + def _ensure_remote_tx(self, tele): + # Rebuild transmit path with websocket-backed source + if self.tx_source: + return + try: + if hasattr(tele, "audio_input") and tele.audio_input: + tele.audio_input.stop() + self.tx_source = WebAudioSource( + target_frame_ms=getattr(tele, "target_frame_time_ms", 60), + sink=tele.transmit_mixer, + ) + tele.audio_input = self.tx_source + if tele.transmit_mixer and not tele.transmit_mixer.should_run: + tele.transmit_mixer.start() + except Exception as exc: # noqa: BLE001 + RNS.log( + f"WebAudioBridge: failed to swap transmit path: {exc}", RNS.LOG_ERROR + ) + + def _ensure_rx_tee(self, tele): + if self.rx_sink: + return + try: + send_fn = lambda pcm: self._send_bytes_to_all(pcm) # noqa: E731 + self.rx_sink = WebAudioSink(self.loop, send_fn) + # Build tee with existing audio_output as first sink to preserve speaker + base_sink = tele.audio_output + self.rx_tee = Tee(base_sink) if base_sink else Tee(self.rx_sink) + if base_sink: + self.rx_tee.add_sink(self.rx_sink) + tele.audio_output = self.rx_tee + if tele.receive_pipeline: + tele.receive_pipeline.stop() + tele.receive_pipeline = Pipeline( + source=tele.receive_mixer, + codec=Null(), + sink=self.rx_tee, + ) + tele.receive_pipeline.start() + except Exception as exc: # noqa: BLE001 + RNS.log(f"WebAudioBridge: failed to tee receive path: {exc}", RNS.LOG_ERROR) + + def _restore_host_audio(self): + tele = self._tele() + if not tele: + return + try: + if hasattr(tele, "_Telephony__reconfigure_transmit_pipeline"): + tele._Telephony__reconfigure_transmit_pipeline() + except Exception: + pass + try: + if tele.receive_pipeline: + tele.receive_pipeline.stop() + if tele.audio_output and self.rx_tee: + # If tee had original sink as first element, revert + primary = self.rx_tee.sinks[0] if self.rx_tee.sinks else None + if primary is not None: + tele.audio_output = primary + if tele.receive_mixer: + tele.receive_pipeline = Pipeline( + source=tele.receive_mixer, + codec=Null(), + sink=tele.audio_output, + ) + tele.receive_pipeline.start() + except Exception: + pass + self.tx_source = None + self.rx_sink = None + self.rx_tee = None + + def on_call_ended(self): + with self.lock: + self.tx_source = None + self.rx_sink = None + self.rx_tee = None