From 9dbb1ff9e7016f5cf0a290c52ef8bba000b6967e Mon Sep 17 00:00:00 2001 From: liamcottle Date: Mon, 20 May 2024 23:18:17 +1200 Subject: [PATCH] refactor audio calls into manager class and rewrite user interface --- public/call.html | 548 ++++++++++++++++++++------------------ src/audio_call_manager.py | 195 ++++++++++++++ web.py | 163 ++++++++---- 3 files changed, 599 insertions(+), 307 deletions(-) create mode 100644 src/audio_call_manager.py diff --git a/public/call.html b/public/call.html index 98fac1c..a7ccee6 100644 --- a/public/call.html +++ b/public/call.html @@ -1,300 +1,340 @@ - -
+ + + + + + Phone | Reticulum WebChat + + + + + + + + + + + + + +
+ +
+ + +
+
+
+
+ + + +
+
Reticulum Phone
+
+
+ +
+
Call Hash
+
{{ callHash }}
+
+ +
+
TX Bytes
+
{{ formatBytes(txBytes) }}
+
+ +
+
RX Bytes
+
{{ formatBytes(rxBytes) }}
+
+ +
+
Codec2 Mode
+ +
+ +
+
+ + +
+
+
+ + +
+
+
+
+ + + +
+
Reticulum Phone
+
+
+
+ +
+ +
+
+
-
- - - -
-
- Mute Mic -
- -
- -
- -
Encoded Bytes Sent:
-
- - - - + \ No newline at end of file diff --git a/src/audio_call_manager.py b/src/audio_call_manager.py new file mode 100644 index 0000000..8ee4124 --- /dev/null +++ b/src/audio_call_manager.py @@ -0,0 +1,195 @@ +import asyncio +from typing import List + +import RNS + +# todo optionally identity self over link +# todo allowlist/denylist for incoming calls + + +class AudioCall: + + def __init__(self, link: RNS.Link, is_outbound: bool): + self.link = link + self.is_outbound = is_outbound + self.link.set_link_closed_callback(self.on_link_closed) + self.link.set_packet_callback(self.on_packet) + self.audio_packet_listeners = [] + + def register_audio_packet_listener(self, callback): + self.audio_packet_listeners.append(callback) + + def unregister_audio_packet_listener(self, callback): + self.audio_packet_listeners.remove(callback) + + # handle link being closed + def on_link_closed(self): + print("[AudioCall] on_link_closed") + self.hangup() + + # handle packet received over link + def on_packet(self, message, packet): + + # send audio received from call initiator to all audio packet listeners + for audio_packet_listener in self.audio_packet_listeners: + audio_packet_listener(message) + + # send an audio packet over the link + def send_audio_packet(self, data): + + # do nothing if link is not active + if self.is_active() is False: + return + + # drop audio packet if it is too big to send + if len(data) > RNS.Link.MDU: + print("[AudioCall] dropping audio packet " + str(len(data)) + " bytes exceeds the link packet MDU of " + str(RNS.Link.MDU) + " bytes") + return + + # send codec2 audio received from call receiver to call initiator over reticulum link + RNS.Packet(self.link, data).send() + + # gets the identity of the caller, or returns None if they did not identify + def initiator_identity(self): + return self.link.get_remote_identity() + + # determine if this call is still active + def is_active(self): + return self.link.status == RNS.Link.ACTIVE + + # handle hanging up the call + def hangup(self): + print("[AudioCall] hangup") + self.link.teardown() + pass + + +class AudioCallManager: + + def __init__(self, identity: RNS.Identity): + + self.identity = identity + self.on_incoming_call_callback = None + self.on_outgoing_call_callback = None + self.audio_call_receiver = AudioCallReceiver(manager=self) + + # remember audio calls + self.audio_calls: List[AudioCall] = [] + + # announces the audio call destination + def announce(self, app_data=None): + self.audio_call_receiver.destination.announce(app_data) + print("[AudioCallManager] announced destination: " + RNS.prettyhexrep(self.audio_call_receiver.destination.hash)) + + # set the callback for incoming calls + def register_incoming_call_callback(self, callback): + self.on_incoming_call_callback = callback + + # set the callback for outgoing calls + def register_outgoing_call_callback(self, callback): + self.on_outgoing_call_callback = callback + + # handle incoming calls from audio call receiver + def handle_incoming_call(self, audio_call: AudioCall): + + # remember it + self.audio_calls.append(audio_call) + + # fire callback + if self.on_incoming_call_callback is not None: + self.on_incoming_call_callback(audio_call) + + # handle outgoing calls + def handle_outgoing_call(self, audio_call: AudioCall): + + # remember it + self.audio_calls.append(audio_call) + + # fire callback + if self.on_outgoing_call_callback is not None: + self.on_outgoing_call_callback(audio_call) + + # find an existing audio call from the provided link hash + def find_audio_call_by_link_hash(self, link_hash: bytes): + for audio_call in self.audio_calls: + if audio_call.link.hash == link_hash: + return audio_call + return None + + # attempts to initiate a call to the provided destination and returns the link hash on success + # FIXME: implement timeout. at the moment, it loops forever if no path is found + async def initiate(self, destination_hash: bytes) -> bytes: + + # wait until we have a path to the destination + # FIXME: implement timeout instead of looping forever + if not RNS.Transport.has_path(destination_hash): + RNS.Transport.request_path(destination_hash) + while not RNS.Transport.has_path(destination_hash): + await asyncio.sleep(0.1) + + # create outbound destination to initiate audio calls + server_identity = RNS.Identity.recall(destination_hash) + server_destination = RNS.Destination( + server_identity, + RNS.Destination.OUT, + RNS.Destination.SINGLE, + "call", + "audio" + ) + + # create link + link = RNS.Link(server_destination) + + # register link state callbacks + link.set_link_established_callback(self.on_link_established) + + return link.hash + + def on_link_established(self, link: RNS.Link): + + # todo: this can be optional, it's only being sent by default for ui, can be removed + link.identify(self.identity) + + # create audio call + audio_call = AudioCall(link, is_outbound=True) + + # handle new outgoing call + self.handle_outgoing_call(audio_call) + + +class AudioCallReceiver: + + def __init__(self, manager: AudioCallManager): + + self.manager = manager + + # create destination for receiver audio calls + self.destination = RNS.Destination( + self.manager.identity, + RNS.Destination.IN, + RNS.Destination.SINGLE, + "call", + "audio", + ) + + # register link state callbacks + self.destination.set_link_established_callback(self.client_connected) + + # find an existing audio call from the provided link + def find_audio_call_by_link_hash(self, link_hash: bytes): + for audio_call in self.manager.audio_calls: + if audio_call.link.hash == link_hash: + return audio_call + return None + + # client connected to us, set up an audio call instance + def client_connected(self, link: RNS.Link): + + # todo: this can be optional, it's only being sent by default for ui, can be removed + link.identify(self.manager.identity) + + # create audio call + audio_call = AudioCall(link, is_outbound=False) + + # pass to manager + self.manager.handle_incoming_call(audio_call) diff --git a/web.py b/web.py index 912edda..b1dbe8d 100644 --- a/web.py +++ b/web.py @@ -18,6 +18,7 @@ from peewee import SqliteDatabase import database from lxmf_message_fields import LxmfImageField, LxmfFileAttachmentsField, LxmfFileAttachment +from src.audio_call_manager import AudioCall, AudioCallManager class ReticulumWebChat: @@ -84,7 +85,13 @@ class ReticulumWebChat: # remember websocket clients self.websocket_clients: List[web.WebSocketResponse] = [] - self.link_call_audio = None + # register audio call identity + self.audio_call_manager = AudioCallManager(identity=self.identity) + self.audio_call_manager.register_incoming_call_callback(self.on_incoming_audio_call) + + # handle receiving a new audio call + def on_incoming_audio_call(self, audio_call: AudioCall): + print("on_incoming_audio_call: {}".format(audio_call.link.hash.hex())) # web server has shutdown, likely ctrl+c, but if we don't do the following, the script never exits async def shutdown(self, app): @@ -232,69 +239,89 @@ class ReticulumWebChat: return websocket_response - # handle websocket clients for listening for calls - @routes.get("/call/listen") + # get calls + @routes.get("/api/v1/calls") + async def index(request): + + # get audio calls + audio_calls = [] + for audio_call in self.audio_call_manager.audio_calls: + + # get initiator identity hash + initiator_identity_hash = None + initiator_identity = audio_call.initiator_identity() + if initiator_identity is not None: + initiator_identity_hash = initiator_identity.hash.hex() + + audio_calls.append({ + "hash": audio_call.link.hash.hex(), + "initiator_identity_hash": initiator_identity_hash, + "is_active": audio_call.is_active(), + "is_outbound": audio_call.is_outbound, + }) + + return web.json_response({ + "audio_calls": audio_calls, + }) + + # initiate a call to the provided destination + @routes.get("/api/v1/calls/initiate/{destination_hash}") + async def index(request): + + # get path params + destination_hash = request.match_info.get("destination_hash", "") + + # convert destination hash to bytes + destination_hash = bytes.fromhex(destination_hash) + + # initiate audio call + link_hash = await self.audio_call_manager.initiate(destination_hash) + + return web.json_response({ + "hash": link_hash.hex(), + }) + + # handle websocket client for sending and receiving audio packets in a call + @routes.get("/api/v1/calls/{audio_call_link_hash}/audio") async def ws(request): + # get path params + audio_call_link_hash = request.match_info.get("audio_call_link_hash", "") + + # convert hash to bytes + audio_call_link_hash = bytes.fromhex(audio_call_link_hash) + + # find audio call + audio_call = self.audio_call_manager.find_audio_call_by_link_hash(audio_call_link_hash) + if audio_call is None: + # fixme: web browser expects websocket, so this won't be useful + return web.json_response({ + "message": "audio call not found", + }, status=404) + + # send audio received from call initiator to call receiver websocket + def on_audio_packet(data): + if websocket_response.closed is False: + try: + asyncio.run(websocket_response.send_bytes(data)) + except: + # ignore errors sending audio packets to websocket + pass + + # register audio packet listener + audio_call.register_audio_packet_listener(on_audio_packet) + # prepare websocket response websocket_response = web.WebSocketResponse() await websocket_response.prepare(request) - # create destination to allow incoming audio calls - server_identity = self.identity - server_destination = RNS.Destination( - server_identity, - RNS.Destination.IN, - RNS.Destination.SINGLE, - "call", - "audio", - ) - - # client connected to us - def client_connected(link): - print("client connected") - self.link_call_audio = link - link.set_link_closed_callback(client_disconnected) - link.set_packet_callback(server_packet_received) - - # client disconnected from us - def client_disconnected(link): - print("client disconnected") - self.link_call_audio = None - - # client sent us a packet - def server_packet_received(message, packet): - - # send audio received from call initiator to call receiver websocket - asyncio.run(websocket_response.send_bytes(message)) - - # todo send our audio back to call initiator - - # register link state callbacks - server_destination.set_link_established_callback(client_connected) - - # announce our call.audio destination - print("call.audio announced and waiting for connection: "+ RNS.prettyhexrep(server_destination.hash)) - server_destination.announce() - # handle websocket messages until disconnected + # FIXME: we should send a type with the message, so we can send other data as well async for msg in websocket_response: msg: WSMessage = msg if msg.type == WSMsgType.BINARY: try: - - # drop audio packet if it is too big to send - if len(msg.data) > RNS.Link.MDU: - print("dropping packet " + str(len(msg.data)) + " bytes exceeds the link packet MDU of " + str(RNS.Link.MDU) + " bytes") - continue - - # send codec2 audio received from call receiver on websocket, to call initiator over reticulum link - if self.link_call_audio is not None: - print("sending bytes to call initiator: {}".format(len(msg.data))) - RNS.Packet(self.link_call_audio, msg.data).send() - else: - print("link to call initiator not available") - + audio_call.send_audio_packet(msg.data) except Exception as e: # ignore errors while handling message print("failed to process client message") @@ -303,8 +330,35 @@ class ReticulumWebChat: # ignore errors while handling message print('ws connection error %s' % websocket_response.exception()) + # unregister audio packet handler now that the websocket has been closed + audio_call.register_audio_packet_listener(on_audio_packet) + return websocket_response + # hangup calls + @routes.get("/api/v1/calls/{audio_call_link_hash}/hangup") + async def index(request): + + # get path params + audio_call_link_hash = request.match_info.get("audio_call_link_hash", "") + + # convert hash to bytes + audio_call_link_hash = bytes.fromhex(audio_call_link_hash) + + # find audio call + audio_call = self.audio_call_manager.find_audio_call_by_link_hash(audio_call_link_hash) + if audio_call is None: + return web.json_response({ + "message": "audio call not found", + }, status=404) + + # hangup the call + audio_call.hangup() + + return web.json_response({ + "message": "call has been hungup", + }) + # serve announces @routes.get("/api/v1/announces") async def index(request): @@ -575,6 +629,9 @@ class ReticulumWebChat: # send announce for lxmf self.local_lxmf_destination.announce(app_data=self.config.display_name.get().encode("utf-8")) + # send announce for audio call + self.audio_call_manager.announce(app_data=self.config.display_name.get().encode("utf-8")) + # handle downloading a file from a nomadnet node elif _type == "nomadnet.file.download":