refactor websocket interfaces to use threading and implement detach
This commit is contained in:
@@ -1,12 +1,10 @@
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
|
||||
import RNS
|
||||
import websockets
|
||||
from RNS.Interfaces.Interface import Interface
|
||||
from websockets.asyncio.connection import Connection
|
||||
|
||||
from src.backend.async_utils import AsyncUtils
|
||||
from websockets.sync.client import connect
|
||||
from websockets.sync.connection import Connection
|
||||
|
||||
|
||||
class WebsocketClientInterface(Interface):
|
||||
@@ -14,6 +12,8 @@ class WebsocketClientInterface(Interface):
|
||||
# TODO: required?
|
||||
DEFAULT_IFAC_SIZE = 16
|
||||
|
||||
RECONNECT_DELAY_MILLIS = 5000
|
||||
|
||||
def __str__(self):
|
||||
return f"WebsocketClientInterface[{self.name}/{self.target_host}:{self.target_port}]"
|
||||
|
||||
@@ -49,7 +49,7 @@ class WebsocketClientInterface(Interface):
|
||||
# connect to websocket server if an existing connection was not provided
|
||||
self.websocket = websocket
|
||||
if self.websocket is None:
|
||||
thread = threading.Thread(target=asyncio.run, args=(self.connect(),))
|
||||
thread = threading.Thread(target=self.connect)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
@@ -73,43 +73,53 @@ class WebsocketClientInterface(Interface):
|
||||
|
||||
# send to websocket server
|
||||
print(f"{self} process_outgoing: {data.hex()}")
|
||||
AsyncUtils.run_async(self.websocket.send(data))
|
||||
self.websocket.send(data)
|
||||
|
||||
# update sent bytes counter
|
||||
self.txb += len(data)
|
||||
|
||||
# connect to the configured websocket server
|
||||
async def connect(self):
|
||||
def connect(self):
|
||||
|
||||
# do nothing if interface is detached
|
||||
if self.detached:
|
||||
return
|
||||
|
||||
# connect to websocket server
|
||||
try:
|
||||
# todo: ws:// and wss:// support in config file?
|
||||
async with websockets.connect(f"ws://{self.target_host}:{self.target_port}", max_size=None, compression=None) as websocket:
|
||||
self.websocket = websocket
|
||||
await self.read_loop()
|
||||
self.websocket = connect(f"ws://{self.target_host}:{self.target_port}", max_size=None, compression=None)
|
||||
self.read_loop()
|
||||
except Exception as e:
|
||||
RNS.log(f"{self} failed with error: {e}", RNS.LOG_ERROR)
|
||||
|
||||
# todo implement reconnect delay
|
||||
await self.connect()
|
||||
# auto reconnect after delay
|
||||
time.sleep(self.RECONNECT_DELAY_MILLIS)
|
||||
self.connect()
|
||||
|
||||
async def read_loop(self):
|
||||
def read_loop(self):
|
||||
|
||||
self.online = True
|
||||
|
||||
try:
|
||||
async for message in self.websocket:
|
||||
for message in self.websocket:
|
||||
self.process_incoming(message)
|
||||
except Exception as e:
|
||||
RNS.log(f"{self} read loop error: {e}", RNS.LOG_ERROR)
|
||||
|
||||
self.online = False
|
||||
|
||||
# todo implement
|
||||
def detach(self):
|
||||
# todo mark as offline
|
||||
# todo close websocket
|
||||
# todo mark as detached
|
||||
pass
|
||||
|
||||
# mark as offline
|
||||
self.online = False
|
||||
|
||||
# close websocket
|
||||
if self.websocket is not None:
|
||||
self.websocket.close()
|
||||
|
||||
# mark as detached
|
||||
self.detached = True
|
||||
|
||||
# set interface class RNS should use when importing this external interface
|
||||
interface_class = WebsocketClientInterface
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
|
||||
import RNS
|
||||
import websockets
|
||||
from RNS.Interfaces.Interface import Interface
|
||||
from websockets import ServerConnection
|
||||
from websockets.sync.server import Server
|
||||
from websockets.sync.server import serve
|
||||
from websockets.sync.server import ServerConnection
|
||||
|
||||
from src.backend.interfaces.WebsocketClientInterface import WebsocketClientInterface
|
||||
|
||||
@@ -15,6 +15,8 @@ class WebsocketServerInterface(Interface):
|
||||
# TODO: required?
|
||||
DEFAULT_IFAC_SIZE = 16
|
||||
|
||||
RESTART_DELAY_MILLIS = 5000
|
||||
|
||||
def __str__(self):
|
||||
return f"WebsocketServerInterface[{self.name}/{self.listen_ip}:{self.listen_port}]"
|
||||
|
||||
@@ -29,7 +31,9 @@ class WebsocketServerInterface(Interface):
|
||||
self.HW_MTU = 262144 # 256KiB
|
||||
self.bitrate = 1_000_000_000 # 1Gbps
|
||||
self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL
|
||||
self.spawned_interfaces = []
|
||||
|
||||
self.server: Server | None = None
|
||||
self.spawned_interfaces: [WebsocketClientInterface] = []
|
||||
|
||||
# parse config
|
||||
ifconf = Interface.get_config_obj(configuration)
|
||||
@@ -49,7 +53,7 @@ class WebsocketServerInterface(Interface):
|
||||
self.listen_port = int(self.listen_port)
|
||||
|
||||
# run websocket server
|
||||
thread = threading.Thread(target=asyncio.run, args=(self.serve(),))
|
||||
thread = threading.Thread(target=self.serve)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
@@ -78,10 +82,10 @@ class WebsocketServerInterface(Interface):
|
||||
def process_outgoing(self, data):
|
||||
pass
|
||||
|
||||
async def serve(self):
|
||||
def serve(self):
|
||||
|
||||
# handle new websocket client connections
|
||||
async def on_websocket_client_connected(websocket: ServerConnection):
|
||||
def on_websocket_client_connected(websocket: ServerConnection):
|
||||
|
||||
# create new child interface
|
||||
RNS.log("Accepting incoming WebSocket connection", RNS.LOG_VERBOSE)
|
||||
@@ -118,27 +122,33 @@ class WebsocketServerInterface(Interface):
|
||||
self.spawned_interfaces.append(spawned_interface)
|
||||
|
||||
# run read loop
|
||||
await spawned_interface.read_loop()
|
||||
spawned_interface.read_loop()
|
||||
|
||||
# run websocket server
|
||||
try:
|
||||
async with websockets.serve(on_websocket_client_connected, self.listen_ip, self.listen_port, compression=None) as server:
|
||||
with serve(on_websocket_client_connected, self.listen_ip, self.listen_port, compression=None) as server:
|
||||
self.online = True
|
||||
await server.serve_forever()
|
||||
self.server = server
|
||||
server.serve_forever()
|
||||
except Exception as e:
|
||||
RNS.log(f"{self} failed with error: {e}", RNS.LOG_ERROR)
|
||||
|
||||
# websocket server is no longer running, let's restart it
|
||||
# todo implement retry delay
|
||||
self.online = False
|
||||
await self.serve()
|
||||
time.sleep(self.RESTART_DELAY_MILLIS)
|
||||
self.serve()
|
||||
|
||||
# todo implement
|
||||
def detach(self):
|
||||
# todo mark as offline
|
||||
# todo stop websocket server and all existing connections
|
||||
# todo mark as detached
|
||||
pass
|
||||
|
||||
# mark as offline
|
||||
self.online = False
|
||||
|
||||
# stop websocket server
|
||||
if self.server is not None:
|
||||
self.server.shutdown()
|
||||
|
||||
# mark as detached
|
||||
self.detached = True
|
||||
|
||||
# set interface class RNS should use when importing this external interface
|
||||
interface_class = WebsocketServerInterface
|
||||
|
||||
Reference in New Issue
Block a user