Implement Custom Interface #1

Closed
Sudo-Ivan wants to merge 7 commits from custom-interface into master
Showing only changes of commit 447492dc88 - Show all commits

391
HTTPInterface.py Normal file → Executable file
View File

@@ -1,141 +1,153 @@
"""RNS-over-HTTP Interface import os
import threading
Custom HTTP interface for Reticulum that implements bidirectional communication using HTTP POST requests. import time
"""
import importlib
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
from queue import Empty, Queue from queue import Empty, Queue
from socketserver import ThreadingMixIn from socketserver import ThreadingMixIn
from threading import Event, Thread from threading import Event, Thread
from time import sleep
import requests
import RNS import RNS
from RNS.Interfaces.Interface import Interface from RNS.Interfaces.Interface import Interface
MTU = 4096
DEFAULT_USER_AGENT = "RNS-HTTP-Tunnel/1.0"
class HTTPTunnelInterface(Interface):
"""HTTP Tunnel Interface for Reticulum.
class HTTPInterface(Interface): This interface implements bidirectional communication over HTTP POST requests,
"""HTTPInterface provides a Reticulum interface over HTTP using bidirectional communication via HTTP POST requests.""" allowing Reticulum to traverse firewalls and proxies that allow HTTP/HTTPS traffic.
DEFAULT_IFAC_SIZE = 8 Configuration:
mode: "client" or "server"
listen_host: IP address to bind server to (server mode)
listen_port: Port to bind server to (server mode)
server_url: URL of the HTTP server (client mode)
poll_interval: Polling interval in seconds for client (default: 0.1)
check_user_agent: Enable User-Agent validation (default: True)
serve_html_page: Serve HTML page on GET requests (default: False)
html_file_path: Path to HTML file to serve (optional)
owner = None Config example for server:
mode = None [[HTTP Tunnel Server]]
listen_host = None type = HTTPTunnelInterface
listen_port = None interface_enabled = True
server_url = None mode = server
poll_interval = None listen_host = 0.0.0.0
check_user_agent = None listen_port = 8080
_recv_queue = None
_send_queue = None Config example for server with HTML:
_stop_event = None [[HTTP Tunnel Server]]
_worker_thread = None type = HTTPTunnelInterface
_http_server = None interface_enabled = True
session = None mode = server
_consecutive_failures = None listen_host = 0.0.0.0
_max_backoff = None listen_port = 8080
serve_html_page = True
html_file_path = index.html
Config example for client:
[[HTTP Tunnel Client]]
type = HTTPTunnelInterface
interface_enabled = True
mode = client
server_url = http://example.com:8080/
poll_interval = 0.1
"""
DEFAULT_IFAC_SIZE = 16
BITRATE_GUESS = 10_000_000
AUTOCONFIGURE_MTU = True
DEFAULT_MTU = 4096
TUNNEL_USER_AGENT = "RNS-HTTP-Tunnel/1.0"
DEFAULT_POLL_INTERVAL = 0.1
def __init__(self, owner, configuration): def __init__(self, owner, configuration):
"""Initialize the HTTPInterface with the given owner and configuration."""
if importlib.util.find_spec("requests") is None:
RNS.log(
"Using this interface requires the requests module to be installed.",
RNS.LOG_CRITICAL,
)
RNS.log(
"You can install it with the command: python3 -m pip install requests",
RNS.LOG_CRITICAL,
)
RNS.panic()
import requests
super().__init__() super().__init__()
ifconf = Interface.get_config_obj(configuration) ifconf = Interface.get_config_obj(configuration)
name = ifconf["name"] self.name = ifconf["name"]
self.name = name
mode = ifconf["mode"] if "mode" in ifconf else "client" mode = ifconf["mode"] if "mode" in ifconf else "client"
listen_host = ifconf["listen_host"] if "listen_host" in ifconf else "0.0.0.0" listen_host = ifconf["listen_host"] if "listen_host" in ifconf else "0.0.0.0"
listen_port = int(ifconf["listen_port"]) if "listen_port" in ifconf else 8080 listen_port = int(ifconf["listen_port"]) if "listen_port" in ifconf else 8080
server_url = ifconf["server_url"] if "server_url" in ifconf else None server_url = ifconf["server_url"] if "server_url" in ifconf else None
poll_interval = ( poll_interval = float(ifconf["poll_interval"]) if "poll_interval" in ifconf else self.DEFAULT_POLL_INTERVAL
float(ifconf["poll_interval"]) if "poll_interval" in ifconf else 1.0 check_user_agent = ifconf.as_bool("check_user_agent") if "check_user_agent" in ifconf else True
) mtu = int(ifconf["mtu"]) if "mtu" in ifconf else self.DEFAULT_MTU
check_user_agent = ( serve_html_page = ifconf.as_bool("serve_html_page") if "serve_html_page" in ifconf else False
bool(ifconf["check_user_agent"]) if "check_user_agent" in ifconf else True html_file_path = ifconf["html_file_path"] if "html_file_path" in ifconf else None
)
user_agent = ( if mode not in ["client", "server"]:
ifconf["user_agent"] if "user_agent" in ifconf else DEFAULT_USER_AGENT raise ValueError(f"Invalid mode '{mode}' for {self}. Must be 'client' or 'server'")
)
mtu = int(ifconf["mtu"]) if "mtu" in ifconf else MTU
if mode not in ["server", "client"]:
raise ValueError(f"Invalid mode '{mode}'. Must be 'server' or 'client'")
if mode == "client" and server_url is None: if mode == "client" and server_url is None:
raise ValueError("server_url is required for client mode") raise ValueError(f"No server_url specified for client mode in {self}")
self.HW_MTU = mtu self.HW_MTU = mtu
self.online = False self.online = False
self.bitrate = 1000000 self.bitrate = HTTPTunnelInterface.BITRATE_GUESS
self.optimise_mtu()
self.owner = owner self.owner = owner
self.mode = mode self.mode = mode
self.listen_host = listen_host self.mtu = mtu
self.listen_port = listen_port
self.server_url = server_url
self.poll_interval = poll_interval
self.check_user_agent = check_user_agent self.check_user_agent = check_user_agent
self.user_agent = user_agent self.serve_html_page = serve_html_page
self.html_file_path = html_file_path
self.html_content = None
if self.serve_html_page and self.html_file_path:
self._load_html_content()
self._recv_queue = Queue() self._recv_queue = Queue()
self._send_queue = Queue() self._send_queue = Queue()
self._stop_event = Event() self._stop_event = Event()
if self.mode == "server": if mode == "server":
self._http_server = None self.listen_host = listen_host
self._request_handler_class = self._create_request_handler() self.listen_port = listen_port
self.setup_server()
else: else:
self.session = requests.Session() self.server_url = server_url
self.session.headers.update({"User-Agent": self.user_agent}) self.poll_interval = poll_interval
self._consecutive_failures = 0 self.setup_client()
self._max_backoff = 30.0
def _load_html_content(self):
try: try:
self._start_interface() if os.path.isfile(self.html_file_path):
self._read_thread = Thread(target=self._read_loop, daemon=True) with open(self.html_file_path, encoding="utf-8") as f:
self._read_thread.start() self.html_content = f.read()
RNS.log(f"Loaded HTML content from {self.html_file_path}", RNS.LOG_INFO)
else:
RNS.log(f"HTML file not found: {self.html_file_path}", RNS.LOG_WARNING)
self.html_content = None
except Exception as e: except Exception as e:
RNS.log("Could not start HTTP interface " + str(self), RNS.LOG_ERROR) RNS.log(f"Error loading HTML file {self.html_file_path}: {e}", RNS.LOG_ERROR)
raise e self.html_content = None
def _create_request_handler(self): def setup_server(self):
"""Create a custom HTTP request handler class for the server mode."""
interface_instance = self interface_instance = self
class TunnelRequestHandler(BaseHTTPRequestHandler): class TunnelRequestHandler(BaseHTTPRequestHandler):
"""Handles HTTP POST requests for the HTTPInterface server.""" def do_GET(self):
if self.path == "/" and interface_instance.serve_html_page and interface_instance.html_content:
def __init__(self, request, client_address, server): self.send_response(200)
self.interface = interface_instance self.send_header("Content-Type", "text/html; charset=utf-8")
super().__init__(request, client_address, server) self.send_header("Content-Length", str(len(interface_instance.html_content)))
self.end_headers()
self.wfile.write(interface_instance.html_content.encode("utf-8"))
else:
self.send_response(404)
self.end_headers()
def do_POST(self): def do_POST(self):
"""Handle HTTP POST requests for bidirectional data exchange."""
if self.path == "/": if self.path == "/":
if self.interface.check_user_agent: if interface_instance.check_user_agent:
user_agent = self.headers.get("User-Agent", "") user_agent = self.headers.get("User-Agent", "")
if user_agent != self.interface.user_agent: if user_agent != HTTPTunnelInterface.TUNNEL_USER_AGENT:
RNS.log( RNS.log(f"Rejected request with invalid User-Agent: {user_agent}", RNS.LOG_WARNING)
f"Rejected request with invalid User-Agent: {user_agent}",
RNS.LOG_WARNING,
)
self.send_response(403) self.send_response(403)
self.send_header("Content-Type", "text/plain") self.send_header("Content-Type", "text/plain")
self.end_headers() self.end_headers()
@@ -143,32 +155,22 @@ class HTTPInterface(Interface):
return return
content_length = int(self.headers.get("Content-Length", 0)) content_length = int(self.headers.get("Content-Length", 0))
client_data = ( client_data = self.rfile.read(content_length) if content_length > 0 else b""
self.rfile.read(content_length) if content_length > 0 else b""
)
if client_data: if client_data:
RNS.log( RNS.log(f"Received {len(client_data)} bytes from client", RNS.LOG_EXTREME)
f"Received {len(client_data)} bytes from client", interface_instance._recv_queue.put(client_data)
RNS.LOG_DEBUG,
)
self.interface._recv_queue.put(client_data)
server_data_parts = [] server_data_parts = []
while True: while not interface_instance._send_queue.empty():
try: try:
server_data_parts.append( server_data_parts.append(interface_instance._send_queue.get_nowait())
self.interface._send_queue.get_nowait(),
)
except Empty: except Empty:
break break
server_data = b"".join(server_data_parts) server_data = b"".join(server_data_parts)
if server_data: if server_data:
RNS.log( RNS.log(f"Sending {len(server_data)} bytes ({len(server_data_parts)} chunks) to client", RNS.LOG_EXTREME)
f"Sending {len(server_data)} bytes ({len(server_data_parts)} chunks) to client",
RNS.LOG_DEBUG,
)
self.send_response(200) self.send_response(200)
self.send_header("Content-Type", "application/octet-stream") self.send_header("Content-Type", "application/octet-stream")
@@ -180,154 +182,127 @@ class HTTPInterface(Interface):
self.end_headers() self.end_headers()
def log_message(self, fmt, *args): def log_message(self, fmt, *args):
"""Override to suppress default logging.""" pass
return TunnelRequestHandler
def _start_interface(self):
"""Start the interface in either server or client mode."""
if self.mode == "server":
self._start_server()
else:
self._start_client()
def _start_server(self):
"""Start the HTTP server in a separate thread."""
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Threaded HTTP server for handling multiple connections.""" pass
def run_server(): def run_server():
"""Run the HTTP server and handle requests."""
try: try:
self._http_server = ThreadedHTTPServer( self._http_server = ThreadedHTTPServer((self.listen_host, self.listen_port), TunnelRequestHandler)
(self.listen_host, self.listen_port),
self._request_handler_class,
)
self.online = True
RNS.log(
f"HTTP server started on http://{self.listen_host}:{self.listen_port}",
RNS.LOG_INFO,
)
self._http_server.serve_forever() self._http_server.serve_forever()
except Exception as e: except Exception as e:
if not self._stop_event.is_set(): if not self._stop_event.is_set():
RNS.log(f"Server error: {e}", RNS.LOG_ERROR) RNS.log(f"HTTP server error for {self}: {e}", RNS.LOG_ERROR)
self.online = False if RNS.Reticulum.panic_on_interface_error:
RNS.panic()
try: self._server_thread = Thread(target=run_server, daemon=True)
self._http_server = ThreadedHTTPServer( self._server_thread.start()
(self.listen_host, self.listen_port),
self._request_handler_class, thread = threading.Thread(target=self.receive_loop)
) thread.daemon = True
self.online = True thread.start()
RNS.log(
f"HTTP server started on http://{self.listen_host}:{self.listen_port}",
RNS.LOG_INFO,
)
self._worker_thread = Thread(
target=self._http_server.serve_forever, daemon=True
)
self._worker_thread.start()
except Exception as e:
RNS.log(f"Could not start HTTP server: {e}", RNS.LOG_ERROR)
self.online = False
raise e
def _start_client(self):
"""Start the HTTP client loop in a separate thread."""
self._worker_thread = Thread(target=self._client_loop, daemon=True)
self._worker_thread.start()
self.online = True self.online = True
RNS.log(f"HTTP client started, connecting to {self.server_url}", RNS.LOG_INFO) RNS.log(f"HTTP server started on http://{self.listen_host}:{self.listen_port}", RNS.LOG_NOTICE)
def _client_loop(self): def setup_client(self):
"""Main loop for the HTTP client, handling polling and data transfer.""" self.session = requests.Session()
self.session.headers.update({"User-Agent": HTTPTunnelInterface.TUNNEL_USER_AGENT})
self._consecutive_failures = 0
self._max_backoff = 30.0
thread = threading.Thread(target=self.client_loop)
thread.daemon = True
thread.start()
self.online = True
RNS.log(f"HTTP client started, connecting to {self.server_url}", RNS.LOG_NOTICE)
def receive_loop(self):
while not self._stop_event.is_set():
try:
received_data = self._recv_queue.get(timeout=1)
if received_data:
self.process_incoming(received_data)
except Empty:
continue
except Exception as e:
if not self._stop_event.is_set():
RNS.log(f"Error in receive loop for {self}: {e}", RNS.LOG_ERROR)
def client_loop(self):
while not self._stop_event.is_set(): while not self._stop_event.is_set():
data_to_send = b"" data_to_send = b""
if not self._send_queue.empty(): if not self._send_queue.empty():
from contextlib import suppress try:
with suppress(Empty):
data_to_send = self._send_queue.get_nowait() data_to_send = self._send_queue.get_nowait()
except Empty:
pass
try: try:
RNS.log(f"Sending {len(data_to_send)} bytes to server", RNS.LOG_DEBUG) RNS.log(f"Sending {len(data_to_send)} bytes to server", RNS.LOG_EXTREME)
response = self.session.post( response = self.session.post(self.server_url, data=data_to_send, timeout=5)
self.server_url,
data=data_to_send,
timeout=5,
)
response.raise_for_status() response.raise_for_status()
if response.content: if response.content:
RNS.log( RNS.log(f"Received {len(response.content)} bytes from server", RNS.LOG_EXTREME)
f"Received {len(response.content)} bytes from server", self.process_incoming(response.content)
RNS.LOG_DEBUG,
)
self._recv_queue.put(response.content)
if self._consecutive_failures > 0: if self._consecutive_failures > 0:
RNS.log("Reconnected to server", RNS.LOG_INFO) RNS.log(f"Reconnected to server for {self}", RNS.LOG_INFO)
self._consecutive_failures = 0 self._consecutive_failures = 0
except Exception as e: except requests.exceptions.RequestException as e:
self._consecutive_failures += 1 self._consecutive_failures += 1
if self._consecutive_failures % 10 == 1: if self._consecutive_failures % 10 == 1:
RNS.log( RNS.log(f"Error communicating with server for {self} (attempt {self._consecutive_failures}): {e}", RNS.LOG_WARNING)
f"Error communicating with server (attempt {self._consecutive_failures}): {e}",
RNS.LOG_ERROR,
)
if self._consecutive_failures > 0: if self._consecutive_failures > 0:
delay = min( delay = min(self.poll_interval * (2 ** min(self._consecutive_failures - 1, 5)), self._max_backoff)
self.poll_interval * (2 ** min(self._consecutive_failures - 1, 5)),
self._max_backoff,
)
else: else:
delay = self.poll_interval delay = self.poll_interval
sleep(delay) time.sleep(delay)
def process_incoming(self, data): def process_incoming(self, data):
"""Process incoming data received from the underlying medium.""" if len(data) > 0 and self.online:
self.rxb += len(data) self.rxb += len(data)
self.owner.inbound(data, self) self.owner.inbound(data, self)
def process_outgoing(self, data): def process_outgoing(self, data):
"""Process outgoing data to be transmitted by the interface."""
if self.online: if self.online:
if len(data) > self.HW_MTU: if len(data) > self.mtu:
RNS.log( RNS.log(f"Payload too large ({len(data)} > {self.mtu}) for {self}", RNS.LOG_ERROR)
f"Packet too large ({len(data)} > {self.HW_MTU}), dropping",
RNS.LOG_WARNING,
)
return return
self._send_queue.put(data) self._send_queue.put(data)
self.txb += len(data) self.txb += len(data)
def detach(self):
RNS.log(f"Detaching {self}", RNS.LOG_DEBUG)
self._stop_event.set()
self.online = False
if self.mode == "server":
if hasattr(self, "_http_server") and self._http_server:
try:
self._http_server.shutdown()
self._http_server.server_close()
except Exception as e:
RNS.log(f"Error while shutting down HTTP server for {self}: {e}", RNS.LOG_ERROR)
if hasattr(self, "_server_thread") and self._server_thread:
self._server_thread.join(timeout=2)
def should_ingress_limit(self): def should_ingress_limit(self):
"""Indicate that this interface should not perform ingress limiting."""
return False return False
def __str__(self): def __str__(self):
"""Return a string representation of the HTTPInterface.""" if self.mode == "server":
return f"HTTPInterface[{self.name}]" return f"HTTPTunnelInterface[{self.name}/server/{self.listen_host}:{self.listen_port}]"
return f"HTTPTunnelInterface[{self.name}/client/{self.server_url}]"
def _read_loop(self): interface_class = HTTPTunnelInterface
"""Read loop that processes incoming data from the queue."""
try:
while not self._stop_event.is_set():
try:
data = self._recv_queue.get(timeout=0.1)
self.process_incoming(data)
except Empty:
continue
except Exception as e:
if not self._stop_event.is_set():
RNS.log(f"Error in read loop: {e}", RNS.LOG_ERROR)
self.online = False
interface_class = HTTPInterface