685 lines
27 KiB
Python
685 lines
27 KiB
Python
#!/usr/bin/env python
|
|
|
|
import argparse
|
|
import http
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import time
|
|
from typing import Callable
|
|
|
|
import RNS
|
|
import LXMF
|
|
import asyncio
|
|
import websockets
|
|
import base64
|
|
|
|
|
|
class ReticulumWebChat:
|
|
|
|
def __init__(self, webchat_config_file, reticulum_config_dir, identity: RNS.Identity):
|
|
|
|
# default values before loading config
|
|
self.display_name = "Anonymous Peer"
|
|
|
|
# load config
|
|
self.config_file = webchat_config_file or "storage/config.json"
|
|
self.load_config()
|
|
|
|
# init reticulum
|
|
self.reticulum = RNS.Reticulum(reticulum_config_dir)
|
|
self.identity = identity
|
|
|
|
# init lxmf router
|
|
self.message_router = LXMF.LXMRouter(identity=self.identity, storagepath="storage/lxmf")
|
|
|
|
# register lxmf identity
|
|
self.local_lxmf_destination = self.message_router.register_delivery_identity(self.identity)
|
|
|
|
# set a callback for when an lxmf message is received
|
|
self.message_router.register_delivery_callback(self.on_lxmf_delivery)
|
|
|
|
# set a callback for when an lxmf announce is received
|
|
RNS.Transport.register_announce_handler(LXMFAnnounceHandler(self.on_lxmf_announce_received))
|
|
|
|
# remember websocket clients
|
|
self.websocket_clients = []
|
|
|
|
def load_config(self):
|
|
|
|
# default config
|
|
config = {
|
|
|
|
}
|
|
|
|
# attempt to load config and override default values
|
|
try:
|
|
with open(self.config_file, 'r') as f:
|
|
custom_config = json.load(f)
|
|
config |= custom_config
|
|
|
|
# config is broken, fallback to defaults
|
|
except:
|
|
print("failed to load config, defaults will be used")
|
|
|
|
# update display name from config
|
|
if "display_name" in config:
|
|
self.display_name = config["display_name"]
|
|
|
|
# return loaded config
|
|
return config
|
|
|
|
def save_config(self):
|
|
|
|
# build config
|
|
config = {
|
|
"display_name": self.display_name,
|
|
}
|
|
|
|
# attempt to save config
|
|
try:
|
|
with open(self.config_file, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
|
|
# config is broken, fallback to defaults
|
|
except:
|
|
print("failed to save config")
|
|
|
|
async def run(self, host, port):
|
|
|
|
# start websocket server
|
|
async with websockets.serve(self.on_websocket_client_connected, host, port, process_request=self.process_request):
|
|
print("ReticulumWebChat server running at http://{}:{}".format(host, port))
|
|
await asyncio.Future() # run forever
|
|
|
|
# handle serving custom http paths
|
|
async def process_request(self, path, request_headers):
|
|
|
|
# serve index.html
|
|
if path == "/":
|
|
with open("public/index.html") as f:
|
|
file_content = f.read()
|
|
return http.HTTPStatus.OK, [
|
|
('Content-Type', 'text/html')
|
|
], file_content.encode("utf-8")
|
|
|
|
# serve anything in public folder
|
|
public_file_path = os.path.join("public", path.lstrip("/"))
|
|
if os.path.isfile(public_file_path):
|
|
mime_type, _ = mimetypes.guess_type(public_file_path)
|
|
with open(public_file_path, "rb") as f:
|
|
file_content = f.read()
|
|
return http.HTTPStatus.OK, [
|
|
('Content-Type', mime_type)
|
|
], file_content
|
|
|
|
# by default, websocket is always served, but we only want it to be available at /ws
|
|
# so we will return 404 for everything other than /ws
|
|
if path != "/ws":
|
|
return http.HTTPStatus.NOT_FOUND, [
|
|
('Content-Type', 'text/html')
|
|
], b"Not Found"
|
|
|
|
# handle new client connecting to websocket
|
|
async def on_websocket_client_connected(self, client):
|
|
|
|
# add client to connected clients list
|
|
self.websocket_clients.append(client)
|
|
|
|
# send config to all clients
|
|
await self.send_config_to_websocket_clients()
|
|
|
|
# send known peers to all clients
|
|
await self.send_known_peers_to_websocket_clients()
|
|
|
|
# handle client messages until disconnected
|
|
while True:
|
|
try:
|
|
message = await client.recv()
|
|
data = json.loads(message)
|
|
await self.on_websocket_data_received(client, data)
|
|
except (websockets.ConnectionClosed, websockets.ConnectionClosedOK, websockets.ConnectionClosedError):
|
|
# client disconnected, we can stop looping
|
|
break
|
|
except Exception as e:
|
|
# ignore errors while handling message
|
|
print("failed to process client message")
|
|
print(e)
|
|
|
|
# loop finished, client is no longer connected
|
|
self.websocket_clients.remove(client)
|
|
|
|
# handle data received from websocket client
|
|
async def on_websocket_data_received(self, client, data):
|
|
|
|
# get type from client data
|
|
_type = data["type"]
|
|
|
|
# handle updating config
|
|
if _type == "config.set":
|
|
|
|
# send lxmf message to destination
|
|
config = data["config"]
|
|
|
|
# update display name in state
|
|
if "display_name" in config and config["display_name"] != "":
|
|
self.display_name = config["display_name"]
|
|
print("updated display name to: " + self.display_name)
|
|
|
|
# save config
|
|
self.save_config()
|
|
|
|
# send config to websocket clients
|
|
await self.send_config_to_websocket_clients()
|
|
|
|
# handle sending an lxmf message
|
|
elif _type == "lxmf.delivery":
|
|
|
|
# send lxmf message to destination
|
|
destination_hash = data["destination_hash"]
|
|
message = data["message"]
|
|
await self.send_message(destination_hash, message)
|
|
|
|
# # TODO: send response to client when marked as delivered?
|
|
# await client.send(json.dumps({
|
|
# "type": "lxmf.sent",
|
|
# }))
|
|
|
|
# handle sending an announce
|
|
elif _type == "announce":
|
|
|
|
# send announce for lxmf
|
|
self.local_lxmf_destination.announce(app_data=self.display_name.encode("utf-8"))
|
|
|
|
# handle downloading a file from a nomadnet node
|
|
elif _type == "nomadnet.file.download":
|
|
|
|
# get data from websocket client
|
|
destination_hash = data["nomadnet_file_download"]["destination_hash"]
|
|
file_path = data["nomadnet_file_download"]["file_path"]
|
|
|
|
# convert destination hash to bytes
|
|
destination_hash = bytes.fromhex(destination_hash)
|
|
|
|
# handle successful file download
|
|
def on_file_download_success(file_name, file_bytes):
|
|
asyncio.run(client.send(json.dumps({
|
|
"type": "nomadnet.file.download",
|
|
"nomadnet_file_download": {
|
|
"status": "success",
|
|
"destination_hash": destination_hash.hex(),
|
|
"file_path": file_path,
|
|
"file_name": file_name,
|
|
"file_bytes": base64.b64encode(file_bytes).decode("utf-8"),
|
|
},
|
|
})))
|
|
|
|
# handle file download failure
|
|
def on_file_download_failure(failure_reason):
|
|
asyncio.run(client.send(json.dumps({
|
|
"type": "nomadnet.file.download",
|
|
"nomadnet_file_download": {
|
|
"status": "error",
|
|
"failure_reason": failure_reason,
|
|
"destination_hash": destination_hash.hex(),
|
|
"file_path": file_path,
|
|
},
|
|
})))
|
|
|
|
# handle file download progress
|
|
def on_file_download_progress(progress):
|
|
asyncio.run(client.send(json.dumps({
|
|
"type": "nomadnet.file.download",
|
|
"nomadnet_file_download": {
|
|
"status": "progress",
|
|
"progress": progress,
|
|
"destination_hash": destination_hash.hex(),
|
|
"file_path": file_path,
|
|
},
|
|
})))
|
|
|
|
# todo: handle file download progress
|
|
|
|
# download the file
|
|
NomadnetFileDownloader(destination_hash, file_path, on_file_download_success, on_file_download_failure, on_file_download_progress)
|
|
|
|
# handle downloading a page from a nomadnet node
|
|
elif _type == "nomadnet.page.download":
|
|
|
|
# get data from websocket client
|
|
destination_hash = data["nomadnet_page_download"]["destination_hash"]
|
|
page_path = data["nomadnet_page_download"]["page_path"]
|
|
|
|
# convert destination hash to bytes
|
|
destination_hash = bytes.fromhex(destination_hash)
|
|
|
|
# handle successful page download
|
|
def on_page_download_success(page_content):
|
|
asyncio.run(client.send(json.dumps({
|
|
"type": "nomadnet.page.download",
|
|
"nomadnet_page_download": {
|
|
"status": "success",
|
|
"destination_hash": destination_hash.hex(),
|
|
"page_path": page_path,
|
|
"page_content": page_content,
|
|
},
|
|
})))
|
|
|
|
# handle page download failure
|
|
def on_page_download_failure(failure_reason):
|
|
asyncio.run(client.send(json.dumps({
|
|
"type": "nomadnet.page.download",
|
|
"nomadnet_page_download": {
|
|
"status": "error",
|
|
"failure_reason": failure_reason,
|
|
"destination_hash": destination_hash.hex(),
|
|
"page_path": page_path,
|
|
},
|
|
})))
|
|
|
|
# handle page download progress
|
|
def on_page_download_progress(progress):
|
|
asyncio.run(client.send(json.dumps({
|
|
"type": "nomadnet.page.download",
|
|
"nomadnet_page_download": {
|
|
"status": "progress",
|
|
"progress": progress,
|
|
"destination_hash": destination_hash.hex(),
|
|
"page_path": page_path,
|
|
},
|
|
})))
|
|
|
|
# todo: handle page download progress
|
|
|
|
# download the page
|
|
NomadnetPageDownloader(destination_hash, page_path, on_page_download_success, on_page_download_failure, on_page_download_progress)
|
|
|
|
# unhandled type
|
|
else:
|
|
print("unhandled client message type: " + _type)
|
|
|
|
# broadcast provided data to all connected websocket clients
|
|
async def websocket_broadcast(self, data):
|
|
for websocket_client in self.websocket_clients:
|
|
await websocket_client.send(data)
|
|
|
|
# broadcasts config to all websocket clients
|
|
async def send_config_to_websocket_clients(self):
|
|
await self.websocket_broadcast(json.dumps({
|
|
"type": "config",
|
|
"config": {
|
|
"display_name": self.display_name,
|
|
"identity_hash": self.identity.hexhash,
|
|
"lxmf_address_hash": self.local_lxmf_destination.hexhash,
|
|
},
|
|
}))
|
|
|
|
# broadcasts known peers to all websocket clients
|
|
async def send_known_peers_to_websocket_clients(self):
|
|
|
|
# process known peers
|
|
known_peers = []
|
|
for destination_hash in RNS.Identity.known_destinations:
|
|
known_destination = RNS.Identity.known_destinations[destination_hash]
|
|
last_announce_timestamp = known_destination[0]
|
|
known_peers.append({
|
|
"destination_hash": destination_hash.hex(),
|
|
"app_data": self.convert_app_data_to_string(RNS.Identity.recall_app_data(destination_hash)),
|
|
"last_announce_timestamp": last_announce_timestamp,
|
|
})
|
|
|
|
# send known peers to websocket clients
|
|
await self.websocket_broadcast(json.dumps({
|
|
"type": "known_peers",
|
|
"known_peers": known_peers,
|
|
}))
|
|
|
|
# convert app data to string, or return none unable to do so
|
|
def convert_app_data_to_string(self, app_data):
|
|
|
|
# attempt to convert to utf-8 string
|
|
if app_data is not None:
|
|
try:
|
|
return app_data.decode("utf-8")
|
|
except:
|
|
# ignore failure to convert to string
|
|
pass
|
|
|
|
# unable to convert to string
|
|
return None
|
|
|
|
# convert an lxmf message to a dictionary, for sending over websocket
|
|
def convert_lxmf_message_to_dict(self, lxmf_message: LXMF.LXMessage):
|
|
|
|
# handle fields
|
|
fields = {}
|
|
message_fields = lxmf_message.get_fields()
|
|
for field_type in message_fields:
|
|
|
|
value = message_fields[field_type]
|
|
|
|
# handle file attachments field
|
|
if field_type == LXMF.FIELD_FILE_ATTACHMENTS:
|
|
|
|
# process file attachments
|
|
file_attachments = []
|
|
for file_attachment in value:
|
|
file_name = file_attachment[0]
|
|
file_bytes = base64.b64encode(file_attachment[1]).decode("utf-8")
|
|
file_attachments.append({
|
|
"file_name": file_name,
|
|
"file_bytes": file_bytes,
|
|
})
|
|
|
|
# add to fields
|
|
fields["file_attachments"] = file_attachments
|
|
|
|
# handle image field
|
|
if field_type == LXMF.FIELD_IMAGE:
|
|
image_type = value[0]
|
|
image_bytes = base64.b64encode(value[1]).decode("utf-8")
|
|
fields["image"] = {
|
|
"image_type": image_type,
|
|
"image_bytes": image_bytes,
|
|
}
|
|
|
|
return {
|
|
"hash": lxmf_message.hash.hex(),
|
|
"source_hash": lxmf_message.source_hash.hex(),
|
|
"destination_hash": lxmf_message.destination_hash.hex(),
|
|
"state": self.convert_lxmf_state_to_string(lxmf_message),
|
|
"progress": lxmf_message.progress,
|
|
"content": lxmf_message.content.decode('utf-8'),
|
|
"fields": fields,
|
|
}
|
|
|
|
# convert lxmf state to a human friendly string
|
|
def convert_lxmf_state_to_string(self, lxmf_message: LXMF.LXMessage):
|
|
|
|
# convert state to string
|
|
lxmf_message_state = "unknown"
|
|
if lxmf_message.state == LXMF.LXMessage.DRAFT:
|
|
lxmf_message_state = "draft"
|
|
elif lxmf_message.state == LXMF.LXMessage.OUTBOUND:
|
|
lxmf_message_state = "outbound"
|
|
elif lxmf_message.state == LXMF.LXMessage.SENDING:
|
|
lxmf_message_state = "sending"
|
|
elif lxmf_message.state == LXMF.LXMessage.SENT:
|
|
lxmf_message_state = "sent"
|
|
elif lxmf_message.state == LXMF.LXMessage.DELIVERED:
|
|
lxmf_message_state = "delivered"
|
|
elif lxmf_message.state == LXMF.LXMessage.FAILED:
|
|
lxmf_message_state = "failed"
|
|
|
|
return lxmf_message_state
|
|
|
|
# handle an lxmf delivery from reticulum
|
|
# NOTE: cant be async, as Reticulum doesn't await it
|
|
def on_lxmf_delivery(self, message):
|
|
try:
|
|
|
|
# send received lxmf message data to all websocket clients
|
|
asyncio.run(self.websocket_broadcast(json.dumps({
|
|
"type": "lxmf.delivery",
|
|
"lxmf_message": self.convert_lxmf_message_to_dict(message),
|
|
})))
|
|
|
|
except Exception as e:
|
|
# do nothing on error
|
|
print("lxmf_delivery error: {}".format(e))
|
|
|
|
# handle delivery status update for an outbound lxmf message
|
|
def on_lxmf_sending_state_updated(self, lxmf_message):
|
|
|
|
# send lxmf message state to all websocket clients
|
|
asyncio.run(self.websocket_broadcast(json.dumps({
|
|
"type": "lxmf_message_state_updated",
|
|
"lxmf_message": self.convert_lxmf_message_to_dict(lxmf_message),
|
|
})))
|
|
|
|
# handle delivery failed for an outbound lxmf message
|
|
def on_lxmf_sending_failed(self, lxmf_message):
|
|
# just pass this on, we don't need to do anything special
|
|
self.on_lxmf_sending_state_updated(lxmf_message)
|
|
|
|
# handle sending an lxmf message to reticulum
|
|
async def send_message(self, destination_hash, message_content):
|
|
|
|
try:
|
|
|
|
# convert destination hash to bytes
|
|
destination_hash = bytes.fromhex(destination_hash)
|
|
|
|
# FIXME: can this be removed, and just rely on the router to check paths?
|
|
# find destination identity from hash
|
|
destination_identity = RNS.Identity.recall(destination_hash)
|
|
if destination_identity is None:
|
|
|
|
# we don't know the path/identity for this destination hash, we will request it
|
|
RNS.Transport.request_path(destination_hash)
|
|
|
|
# we have to bail out of sending, since we don't have the path yet
|
|
return
|
|
|
|
# create destination for recipients lxmf delivery address
|
|
lxmf_destination = RNS.Destination(destination_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
|
|
|
|
# create lxmf message
|
|
lxmf_message = LXMF.LXMessage(lxmf_destination, self.local_lxmf_destination, message_content, desired_method=LXMF.LXMessage.DIRECT)
|
|
lxmf_message.try_propagation_on_fail = True
|
|
lxmf_message.register_delivery_callback(self.on_lxmf_sending_state_updated)
|
|
lxmf_message.register_failed_callback(self.on_lxmf_sending_failed)
|
|
|
|
# send lxmf message to be routed to destination
|
|
self.message_router.handle_outbound(lxmf_message)
|
|
|
|
# send outbound lxmf message to websocket (after passing to router so hash is available)
|
|
await self.websocket_broadcast(json.dumps({
|
|
"type": "lxmf_outbound_message_created",
|
|
"lxmf_message": self.convert_lxmf_message_to_dict(lxmf_message),
|
|
}))
|
|
|
|
except:
|
|
# FIXME send error to websocket?
|
|
print("failed to send lxmf message")
|
|
|
|
# handle an announce received from reticulum, for an lxmf address
|
|
# NOTE: cant be async, as Reticulum doesn't await it
|
|
def on_lxmf_announce_received(self, destination_hash, announced_identity, app_data):
|
|
|
|
# log received announce
|
|
RNS.log("Received an announce from " + RNS.prettyhexrep(destination_hash))
|
|
|
|
# parse app data
|
|
parsed_app_data = None
|
|
if app_data is not None:
|
|
parsed_app_data = app_data.decode("utf-8")
|
|
|
|
# send received lxmf announce to all websocket clients
|
|
asyncio.run(self.websocket_broadcast(json.dumps({
|
|
"type": "announce",
|
|
"announce": {
|
|
"destination_hash": destination_hash.hex(),
|
|
"app_data": parsed_app_data,
|
|
"last_announce_timestamp": time.time(),
|
|
},
|
|
})))
|
|
|
|
|
|
# an announce handler for lxmf.delivery aspect that just forwards to a provided callback
|
|
class LXMFAnnounceHandler:
|
|
|
|
def __init__(self, received_announce_callback):
|
|
self.aspect_filter = "lxmf.delivery"
|
|
self.received_announce_callback = received_announce_callback
|
|
|
|
# we will just pass the received announce back to the provided callback
|
|
def received_announce(self, destination_hash, announced_identity, app_data):
|
|
try:
|
|
# handle received announce
|
|
self.received_announce_callback(destination_hash, announced_identity, app_data)
|
|
except:
|
|
# ignore failure to handle received announce
|
|
pass
|
|
|
|
|
|
class NomadnetDownloader:
|
|
|
|
def __init__(self, destination_hash: bytes, path: str, on_download_success: Callable[[bytes], None], on_download_failure: Callable[[str], None], on_progress_update: Callable[[float], None], timeout: int|None = None, auto_download=True):
|
|
self.app_name = "nomadnetwork"
|
|
self.aspects = "node"
|
|
self.destination_hash = destination_hash
|
|
self.path = path
|
|
self.timeout = timeout
|
|
self.on_download_success = on_download_success
|
|
self.on_download_failure = on_download_failure
|
|
self.on_progress_update = on_progress_update
|
|
if auto_download:
|
|
self.download()
|
|
|
|
# setup link to destination and request download
|
|
def download(self):
|
|
|
|
# request path to destination
|
|
RNS.Transport.request_path(self.destination_hash)
|
|
|
|
# find existing identity
|
|
identity = RNS.Identity.recall(self.destination_hash)
|
|
if identity is None:
|
|
self.on_download_failure("identity not found")
|
|
return
|
|
|
|
# create destination to nomadnet node
|
|
destination = RNS.Destination(
|
|
identity,
|
|
RNS.Destination.OUT,
|
|
RNS.Destination.SINGLE,
|
|
self.app_name,
|
|
self.aspects,
|
|
)
|
|
|
|
# create link to destination
|
|
RNS.Link(destination, established_callback=self.link_established)
|
|
|
|
# link to destination was established, we should now request the download
|
|
def link_established(self, link):
|
|
|
|
# request download over link
|
|
link.request(
|
|
self.path,
|
|
data=None,
|
|
response_callback=self.on_response,
|
|
failed_callback=self.on_failed,
|
|
progress_callback=self.on_progress,
|
|
timeout=self.timeout,
|
|
)
|
|
|
|
# handle successful download
|
|
def on_response(self, request_receipt):
|
|
print("file_received")
|
|
self.on_download_success(request_receipt.response)
|
|
|
|
# handle failure
|
|
def on_failed(self, request_receipt=None):
|
|
self.on_download_failure("request_failed")
|
|
|
|
# handle download progress
|
|
def on_progress(self, request_receipt):
|
|
self.on_progress_update(request_receipt.progress)
|
|
|
|
|
|
class NomadnetPageDownloader(NomadnetDownloader):
|
|
|
|
def __init__(self, destination_hash: bytes, page_path: str, on_page_download_success: Callable[[str], None], on_page_download_failure: Callable[[str], None], on_progress_update: Callable[[float], None], timeout: int|None = None, auto_download=True):
|
|
self.on_page_download_success = on_page_download_success
|
|
self.on_page_download_failure = on_page_download_failure
|
|
super().__init__(destination_hash, page_path, self.on_download_success, self.on_download_failure, on_progress_update, timeout, auto_download)
|
|
|
|
# page download was successful, decode the response and send to provided callback
|
|
def on_download_success(self, response_bytes):
|
|
micron_markup_response = response_bytes.decode("utf-8")
|
|
self.on_page_download_success(micron_markup_response)
|
|
|
|
# page download failed, send error to provided callback
|
|
def on_download_failure(self, failure_reason):
|
|
self.on_page_download_failure(failure_reason)
|
|
|
|
|
|
class NomadnetFileDownloader(NomadnetDownloader):
|
|
|
|
def __init__(self, destination_hash: bytes, page_path: str, on_file_download_success: Callable[[str, bytes], None], on_file_download_failure: Callable[[str], None], on_progress_update: Callable[[float], None], timeout: int|None = None, auto_download=True):
|
|
self.on_file_download_success = on_file_download_success
|
|
self.on_file_download_failure = on_file_download_failure
|
|
super().__init__(destination_hash, page_path, self.on_download_success, self.on_download_failure, on_progress_update, timeout, auto_download)
|
|
|
|
# file download was successful, decode the response and send to provided callback
|
|
def on_download_success(self, response):
|
|
file_name: str = response[0]
|
|
file_data: bytes = response[1]
|
|
self.on_file_download_success(file_name, file_data)
|
|
|
|
# page download failed, send error to provided callback
|
|
def on_download_failure(self, failure_reason):
|
|
self.on_file_download_failure(failure_reason)
|
|
|
|
|
|
async def main():
|
|
|
|
# parse command line args
|
|
parser = argparse.ArgumentParser(description="ReticulumWebChat")
|
|
parser.add_argument("--host", nargs='?', default="0.0.0.0", type=str, help="The address the web server should listen on.")
|
|
parser.add_argument("--port", nargs='?', default="8000", type=int, help="The port the web server should listen on.")
|
|
parser.add_argument("--identity-file", type=str, help="Path to a Reticulum Identity file to use as your LXMF address.")
|
|
parser.add_argument("--identity-base64", type=str, help="A base64 encoded Reticulum Identity to use as your LXMF address.")
|
|
parser.add_argument("--generate-identity-file", type=str, help="Generates and saves a new Reticulum Identity to the provided file path and then exits.")
|
|
parser.add_argument("--generate-identity-base64", action='store_true', help="Outputs a randomly generated Reticulum Identity as base64 and then exits.")
|
|
parser.add_argument("--reticulum-config-dir", type=str, help="Path to a Reticulum config directory for the RNS stack to use (e.g: ~/.reticulum)")
|
|
parser.add_argument("--webchat-config-file", type=str, help="Path to a ReticulumWebChat config file for saving user preferences.")
|
|
args = parser.parse_args()
|
|
|
|
# util to generate reticulum identity and save to file without using rnid
|
|
if args.generate_identity_file is not None:
|
|
|
|
# do not overwrite existing files, otherwise user could lose existing keys
|
|
if os.path.exists(args.generate_identity_file):
|
|
print("DANGER: the provided identity file path already exists, not overwriting!")
|
|
return
|
|
|
|
# generate a new identity and save to provided file path
|
|
identity = RNS.Identity(create_keys=True)
|
|
with open(args.generate_identity_file, "wb") as file:
|
|
file.write(identity.get_private_key())
|
|
|
|
print("A new Reticulum Identity has been saved to: {}".format(args.generate_identity_file))
|
|
return
|
|
|
|
# util to generate reticulum identity as base64 without using rnid
|
|
if args.generate_identity_base64 is True:
|
|
identity = RNS.Identity(create_keys=True)
|
|
print(base64.b64encode(identity.get_private_key()).decode("utf-8"))
|
|
return
|
|
|
|
# use provided identity, or fallback to a random one
|
|
if args.identity_file is not None:
|
|
identity = RNS.Identity(create_keys=False)
|
|
identity.load(args.identity_file)
|
|
print("Reticulum Identity has been loaded from file.")
|
|
print(identity)
|
|
elif args.identity_base64 is not None:
|
|
identity = RNS.Identity(create_keys=False)
|
|
identity.load_private_key(base64.b64decode(args.identity_base64))
|
|
print("Reticulum Identity has been loaded from base64.")
|
|
print(identity)
|
|
else:
|
|
identity = RNS.Identity(create_keys=True)
|
|
print("Reticulum Identity has been randomly generated.")
|
|
print(identity)
|
|
|
|
# init app
|
|
reticulum_webchat = ReticulumWebChat(args.webchat_config_file, args.reticulum_config_dir, identity)
|
|
await reticulum_webchat.run(args.host, args.port)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|