Files
2025-06-16 13:25:55 +02:00

163 lines
5.9 KiB
Python
Executable File

#!/usr/bin/env python3
# nomadForum - a forum on the NomadNetwork
# Copyright (C) 2023-2025 AutumnSpark1226
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import socket
import os
import string
import LXMF
import RNS
from threading import Thread
from time import sleep
import main
# Set the path of the Unix socket
socket_path = '/tmp/nomadForum_socket'
propagation_node_address = "f141f039b3b88b7a2d5c6048c7adaafb" # This is the LXMF propagation node hosted on SparkN0de. I recommend using a propagation node running on the same host
message_queue = []
queue_processing_running = False
reticulum = None
router = None
ident = None
source = None
identity_path = main.storage_path + "/notification_identity"
router_storage_path = main.storage_path + "/router_storage"
def setup_lxmf():
global reticulum, router, source, ident
reticulum = RNS.Reticulum()
if os.path.isfile(identity_path):
ident = RNS.Identity.from_file(identity_path)
if ident is not None:
print("Loaded identity")
else:
print("Could not load identity")
exit(1)
else:
ident = RNS.Identity()
ident.to_file(identity_path)
print("Created new identity")
if not os.path.isdir(router_storage_path):
os.makedirs(router_storage_path)
print("created LXMF storage path")
router = LXMF.LXMRouter(identity=ident, storagepath=router_storage_path)
# set propagation node
router.set_outbound_propagation_node(bytes.fromhex(propagation_node_address))
source = router.register_delivery_identity(ident, display_name=main.forum_name + " Nofications")
# source = router.register_delivery_identity(ident, display_name="Test") # for testing only
router.announce(source.hash)
print("announcement sent")
def delivery_callback(lxm):
if lxm.state == LXMF.LXMessage.FAILED and hasattr(lxm, "try_propagation_on_fail") and lxm.try_propagation_on_fail:
lxm.try_propagation_on_fail = None
lxm.delivery_attempts = 0
del lxm.next_delivery_attempt
lxm.packed = None
lxm.desired_method = LXMF.LXMessage.PROPAGATED
router.handle_outbound(lxm)
def send_notification(details: []) -> None:
recipient_hash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", bytes.fromhex(details[0]))
# wait for path
timeout_counter = 0
if not RNS.Transport.has_path(recipient_hash):
RNS.Transport.request_path(recipient_hash)
while not RNS.Transport.has_path(recipient_hash):
sleep(0.1)
timeout_counter += 1
if timeout_counter > 1000:
print("Could not find recipient: " + details[0])
return
recipient_identity = RNS.Identity.recall(recipient_hash)
dest = RNS.Destination(recipient_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
lxm = LXMF.LXMessage(dest, source, details[1], "New notification", desired_method=LXMF.LXMessage.DIRECT)
if router.get_outbound_propagation_node() != None:
lxm.try_propagation_on_fail = True
lxm.register_delivery_callback(delivery_callback)
lxm.register_failed_callback(delivery_callback)
router.handle_outbound(lxm)
def process_queue():
global queue_processing_running
queue_processing_running = True
length = len(message_queue)
for _ in range(length):
send_notification(message_queue.pop(0))
# call the function again to process new entries
if len(message_queue) > 0:
process_queue()
queue_processing_running = False
def run_server():
# try to remove already existing socket
try:
os.unlink(socket_path)
except OSError:
if os.path.exists(socket_path):
raise
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(socket_path)
server.listen(1)
try:
while True:
connection, client_address = server.accept()
continue_receiving = True
while continue_receiving:
destination = connection.recv(32).decode()
message = connection.recv(4096).decode()
if destination and message and len(destination) == 32 and set(destination).issubset(set(string.hexdigits)):
message_queue.append([destination, message])
connection.send("ok".encode())
else:
connection.send("error".encode())
status = connection.recv(32).decode()
continue_receiving = status == "continue"
if continue_receiving:
connection.send("sendNext".encode())
connection.close()
if not queue_processing_running:
thread = Thread(target=process_queue)
thread.start()
finally:
connection.close()
os.unlink(socket_path)
# this method will be called by the program that needs to send a notification
def add_notification(message: []) -> None:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(socket_path)
client.send(message[0].encode())
client.send(message[1].encode())
response = client.recv(128).decode()
if response != "ok":
print("something went wrong")
client.send("end".encode())
client.close()
if __name__ == '__main__':
setup_lxmf()
run_server()