feat(audio): implement WebAudioBridge for websocket audio transport and add configuration options

This commit is contained in:
2026-01-04 23:18:55 -06:00
parent 52e5a60724
commit 9d8611bb97
3 changed files with 336 additions and 0 deletions

View File

@@ -85,6 +85,7 @@ from meshchatx.src.backend.recovery import CrashRecovery
from meshchatx.src.backend.rnprobe_handler import RNProbeHandler
from meshchatx.src.backend.sideband_commands import SidebandCommands
from meshchatx.src.backend.telemetry_utils import Telemeter
from meshchatx.src.backend.web_audio_bridge import WebAudioBridge
from meshchatx.src.version import __version__ as app_version
import logging
@@ -233,6 +234,7 @@ class ReticulumMeshChat:
self.current_context: IdentityContext | None = None
self.setup_identity(identity)
self.web_audio_bridge = WebAudioBridge(None, None)
# Proxy properties for backward compatibility
@property
@@ -497,6 +499,10 @@ class ReticulumMeshChat:
self.current_context = self.contexts[identity_hash]
if not self.current_context.running:
self.current_context.setup()
self.web_audio_bridge = WebAudioBridge(
self.current_context.telephone_manager,
self.current_context.config,
)
return
# Initialize Reticulum if not already done
@@ -508,6 +514,9 @@ class ReticulumMeshChat:
self.contexts[identity_hash] = context
self.current_context = context
context.setup()
self.web_audio_bridge = WebAudioBridge(
context.telephone_manager, context.config
)
# Link database to memory log handler
memory_log_handler.set_database(context.database)
@@ -1745,6 +1754,10 @@ class ReticulumMeshChat:
print(
f"on_telephone_call_ended: {caller_identity.hash.hex() if caller_identity else 'Unknown'}",
)
try:
self.web_audio_bridge.on_call_ended()
except Exception:
pass
# Record call history
if caller_identity:
@@ -3142,6 +3155,51 @@ class ReticulumMeshChat:
return websocket_response
@routes.get("/ws/telephone/audio")
async def telephone_audio_ws(request):
websocket_response = web.WebSocketResponse(
max_msg_size=5 * 1024 * 1024,
)
await websocket_response.prepare(request)
# Early guard on config
if not self.web_audio_bridge.config_enabled():
await websocket_response.send_str(
json.dumps(
{"type": "error", "message": "Web audio is disabled in config"},
),
)
else:
await self.web_audio_bridge.send_status(websocket_response)
attached = self.web_audio_bridge.attach_client(websocket_response)
if not attached:
await websocket_response.send_str(
json.dumps(
{"type": "error", "message": "No active call to attach"},
),
)
async for msg in websocket_response:
msg: WSMessage = msg
if msg.type == WSMsgType.BINARY:
self.web_audio_bridge.push_client_frame(msg.data)
elif msg.type == WSMsgType.TEXT:
try:
data = json.loads(msg.data)
if data.get("type") == "attach":
self.web_audio_bridge.attach_client(websocket_response)
elif data.get("type") == "ping":
await websocket_response.send_str(
json.dumps({"type": "pong"})
)
except Exception:
pass
elif msg.type == WSMsgType.ERROR:
print(f"telephone audio ws error {websocket_response.exception()}")
self.web_audio_bridge.detach_client(websocket_response)
return websocket_response
# get app info
@routes.get("/api/v1/app/info")
async def app_info(request):
@@ -3990,6 +4048,26 @@ class ReticulumMeshChat:
"initiation_status": self.telephone_manager.initiation_status,
"initiation_target_hash": initiation_target_hash,
"initiation_target_name": initiation_target_name,
"web_audio": {
"enabled": getattr(
self.config.telephone_web_audio_enabled,
"get",
lambda: False,
)(),
"allow_fallback": getattr(
self.config.telephone_web_audio_allow_fallback,
"get",
lambda: True,
)(),
"has_client": bool(
getattr(self.web_audio_bridge, "clients", []),
),
"frame_ms": getattr(
self.telephone_manager.telephone,
"target_frame_time_ms",
None,
),
},
},
)
@@ -7921,6 +7999,16 @@ class ReticulumMeshChat:
profile_id,
)
if "telephone_web_audio_enabled" in data:
self.config.telephone_web_audio_enabled.set(
self._parse_bool(data["telephone_web_audio_enabled"]),
)
if "telephone_web_audio_allow_fallback" in data:
self.config.telephone_web_audio_allow_fallback.set(
self._parse_bool(data["telephone_web_audio_allow_fallback"]),
)
if "translator_enabled" in data:
value = self._parse_bool(data["translator_enabled"])
self.config.translator_enabled.set(value)
@@ -8728,6 +8816,8 @@ class ReticulumMeshChat:
"do_not_disturb_enabled": ctx.config.do_not_disturb_enabled.get(),
"telephone_allow_calls_from_contacts_only": ctx.config.telephone_allow_calls_from_contacts_only.get(),
"telephone_audio_profile_id": ctx.config.telephone_audio_profile_id.get(),
"telephone_web_audio_enabled": ctx.config.telephone_web_audio_enabled.get(),
"telephone_web_audio_allow_fallback": ctx.config.telephone_web_audio_allow_fallback.get(),
"call_recording_enabled": ctx.config.call_recording_enabled.get(),
"banished_effect_enabled": ctx.config.banished_effect_enabled.get(),
"banished_text": ctx.config.banished_text.get(),

View File

@@ -189,6 +189,16 @@ class ConfigManager:
"telephone_audio_profile_id",
2, # Default to Voice (profile 2)
)
self.telephone_web_audio_enabled = self.BoolConfig(
self,
"telephone_web_audio_enabled",
False,
)
self.telephone_web_audio_allow_fallback = self.BoolConfig(
self,
"telephone_web_audio_allow_fallback",
True,
)
self.call_recording_enabled = self.BoolConfig(
self,
"call_recording_enabled",

View File

@@ -0,0 +1,236 @@
import asyncio
import json
import threading
from typing import Optional
import numpy as np
import RNS
from LXST.Codecs import Null, Raw
from LXST.Mixer import Mixer
from LXST.Pipeline import Pipeline
from LXST.Sinks import LocalSink
from LXST.Sources import LocalSource
from .telephone_manager import Tee
def _log_debug(msg: str):
RNS.log(msg, RNS.LOG_DEBUG)
class WebAudioSource(LocalSource):
"""Injects PCM frames (int16 little-endian) received over websocket into the transmit mixer."""
def __init__(self, target_frame_ms: int, sink: Mixer):
self.target_frame_ms = target_frame_ms or 60
self.sink = sink
self.codec = Raw(channels=1, bitdepth=16)
self.channels = 1
self.samplerate = 48000
self.bitdepth = 16
def start(self):
# Nothing to start; frames are pushed from the websocket thread.
pass
def stop(self):
# Nothing to stop; kept for interface compatibility.
pass
def can_receive(self, from_source=None):
return True
def handle_frame(self, frame, source=None):
# Not used; frames are pushed via push_pcm.
pass
def push_pcm(self, pcm_bytes: bytes):
try:
samples = (
np.frombuffer(pcm_bytes, dtype=np.int16).astype(np.float32) / 32768.0
)
if samples.size == 0:
return
samples = samples.reshape(-1, 1)
frame = self.codec.encode(samples)
if self.sink and self.sink.can_receive(from_source=self):
self.sink.handle_frame(frame, self)
except Exception as exc: # noqa: BLE001
RNS.log(f"WebAudioSource: failed to push pcm: {exc}", RNS.LOG_ERROR)
class WebAudioSink(LocalSink):
"""Pushes received PCM frames to websocket clients."""
def __init__(self, loop: asyncio.AbstractEventLoop, send_bytes):
self.loop = loop
self.send_bytes = send_bytes
def can_receive(self, from_source=None):
return True
def handle_frame(self, frame, source):
try:
# frame is expected to be numpy float PCM from receive mixer
if hasattr(frame, "astype"):
samples = np.clip(frame, -1.0, 1.0).astype(np.float32)
pcm = (samples * 32767.0).astype(np.int16).tobytes()
else:
pcm = frame
self.loop.call_soon_threadsafe(asyncio.create_task, self.send_bytes(pcm))
except Exception as exc: # noqa: BLE001
RNS.log(f"WebAudioSink: failed to handle frame: {exc}", RNS.LOG_ERROR)
class WebAudioBridge:
"""Coordinates websocket audio transport with an active LXST telephone call."""
def __init__(self, telephone_manager, config_manager):
self.telephone_manager = telephone_manager
self.config_manager = config_manager
self.clients = set()
self.tx_source: Optional[WebAudioSource] = None
self.rx_sink: Optional[WebAudioSink] = None
self.rx_tee: Optional[Tee] = None
self.loop = asyncio.get_event_loop()
self.lock = threading.Lock()
def _tele(self):
return getattr(self.telephone_manager, "telephone", None)
def config_enabled(self):
return (
self.config_manager
and hasattr(self.config_manager, "telephone_web_audio_enabled")
and self.config_manager.telephone_web_audio_enabled.get()
)
def allow_fallback(self):
return (
self.config_manager
and hasattr(self.config_manager, "telephone_web_audio_allow_fallback")
and self.config_manager.telephone_web_audio_allow_fallback.get()
)
def attach_client(self, client):
with self.lock:
self.clients.add(client)
tele = self._tele()
if not tele or not tele.active_call:
return False
self._ensure_remote_tx(tele)
self._ensure_rx_tee(tele)
return True
def detach_client(self, client):
with self.lock:
if client in self.clients:
self.clients.remove(client)
if not self.clients and self.allow_fallback():
self._restore_host_audio()
async def send_status(self, client):
tele = self._tele()
frame_ms = getattr(tele, "target_frame_time_ms", None) or 60
await client.send_str(
json.dumps(
{
"type": "web_audio.ready",
"frame_ms": frame_ms,
}
)
)
def push_client_frame(self, pcm_bytes: bytes):
with self.lock:
if not self.tx_source:
return
self.tx_source.push_pcm(pcm_bytes)
async def _send_bytes_to_all(self, pcm_bytes: bytes):
stale = []
for ws in list(self.clients):
try:
await ws.send_bytes(pcm_bytes)
except Exception:
stale.append(ws)
for ws in stale:
self.detach_client(ws)
def _ensure_remote_tx(self, tele):
# Rebuild transmit path with websocket-backed source
if self.tx_source:
return
try:
if hasattr(tele, "audio_input") and tele.audio_input:
tele.audio_input.stop()
self.tx_source = WebAudioSource(
target_frame_ms=getattr(tele, "target_frame_time_ms", 60),
sink=tele.transmit_mixer,
)
tele.audio_input = self.tx_source
if tele.transmit_mixer and not tele.transmit_mixer.should_run:
tele.transmit_mixer.start()
except Exception as exc: # noqa: BLE001
RNS.log(
f"WebAudioBridge: failed to swap transmit path: {exc}", RNS.LOG_ERROR
)
def _ensure_rx_tee(self, tele):
if self.rx_sink:
return
try:
send_fn = lambda pcm: self._send_bytes_to_all(pcm) # noqa: E731
self.rx_sink = WebAudioSink(self.loop, send_fn)
# Build tee with existing audio_output as first sink to preserve speaker
base_sink = tele.audio_output
self.rx_tee = Tee(base_sink) if base_sink else Tee(self.rx_sink)
if base_sink:
self.rx_tee.add_sink(self.rx_sink)
tele.audio_output = self.rx_tee
if tele.receive_pipeline:
tele.receive_pipeline.stop()
tele.receive_pipeline = Pipeline(
source=tele.receive_mixer,
codec=Null(),
sink=self.rx_tee,
)
tele.receive_pipeline.start()
except Exception as exc: # noqa: BLE001
RNS.log(f"WebAudioBridge: failed to tee receive path: {exc}", RNS.LOG_ERROR)
def _restore_host_audio(self):
tele = self._tele()
if not tele:
return
try:
if hasattr(tele, "_Telephony__reconfigure_transmit_pipeline"):
tele._Telephony__reconfigure_transmit_pipeline()
except Exception:
pass
try:
if tele.receive_pipeline:
tele.receive_pipeline.stop()
if tele.audio_output and self.rx_tee:
# If tee had original sink as first element, revert
primary = self.rx_tee.sinks[0] if self.rx_tee.sinks else None
if primary is not None:
tele.audio_output = primary
if tele.receive_mixer:
tele.receive_pipeline = Pipeline(
source=tele.receive_mixer,
codec=Null(),
sink=tele.audio_output,
)
tele.receive_pipeline.start()
except Exception:
pass
self.tx_source = None
self.rx_sink = None
self.rx_tee = None
def on_call_ended(self):
with self.lock:
self.tx_source = None
self.rx_sink = None
self.rx_tee = None