refactor(meshchat): update map to render tiles faster (online), message handling by adding context support to forwarding and delivery methods; improve LXMF message processing and router initialization
Some checks failed
CI / test-backend (push) Successful in 4s
CI / build-frontend (push) Successful in 1m54s
CI / test-lang (push) Successful in 1m53s
CI / test-backend (pull_request) Successful in 21s
Build and Publish Docker Image / build (pull_request) Has been skipped
CI / test-lang (pull_request) Successful in 49s
OSV-Scanner PR Scan / scan-pr (pull_request) Successful in 23s
CI / lint (push) Successful in 9m46s
CI / build-frontend (pull_request) Successful in 9m44s
CI / lint (pull_request) Successful in 9m47s
Build Test / Build and Test (pull_request) Successful in 13m17s
Benchmarks / benchmark (push) Successful in 14m34s
Benchmarks / benchmark (pull_request) Successful in 14m42s
Build and Publish Docker Image / build-dev (pull_request) Successful in 13m53s
Tests / test (push) Failing after 26m51s
Tests / test (pull_request) Failing after 24m53s
Build Test / Build and Test (push) Successful in 45m21s
Some checks failed
CI / test-backend (push) Successful in 4s
CI / build-frontend (push) Successful in 1m54s
CI / test-lang (push) Successful in 1m53s
CI / test-backend (pull_request) Successful in 21s
Build and Publish Docker Image / build (pull_request) Has been skipped
CI / test-lang (pull_request) Successful in 49s
OSV-Scanner PR Scan / scan-pr (pull_request) Successful in 23s
CI / lint (push) Successful in 9m46s
CI / build-frontend (pull_request) Successful in 9m44s
CI / lint (pull_request) Successful in 9m47s
Build Test / Build and Test (pull_request) Successful in 13m17s
Benchmarks / benchmark (push) Successful in 14m34s
Benchmarks / benchmark (pull_request) Successful in 14m42s
Build and Publish Docker Image / build-dev (pull_request) Successful in 13m53s
Tests / test (push) Failing after 26m51s
Tests / test (pull_request) Failing after 24m53s
Build Test / Build and Test (push) Successful in 45m21s
This commit is contained in:
@@ -9091,7 +9091,7 @@ class ReticulumMeshChat:
|
||||
self.db_upsert_lxmf_message(lxmf_message, is_spam=is_spam, context=ctx)
|
||||
|
||||
# handle forwarding
|
||||
self.handle_forwarding(lxmf_message)
|
||||
self.handle_forwarding(lxmf_message, context=ctx)
|
||||
|
||||
# handle telemetry
|
||||
try:
|
||||
@@ -9200,8 +9200,12 @@ class ReticulumMeshChat:
|
||||
print(f"lxmf_delivery error: {e}")
|
||||
|
||||
# handles lxmf message forwarding logic
|
||||
def handle_forwarding(self, lxmf_message: LXMF.LXMessage):
|
||||
def handle_forwarding(self, lxmf_message: LXMF.LXMessage, context=None):
|
||||
try:
|
||||
ctx = context or self.current_context
|
||||
if not ctx:
|
||||
return
|
||||
|
||||
source_hash = lxmf_message.source_hash.hex()
|
||||
destination_hash = lxmf_message.destination_hash.hex()
|
||||
|
||||
@@ -9227,7 +9231,7 @@ class ReticulumMeshChat:
|
||||
file_attachments_field = LxmfFileAttachmentsField(attachments)
|
||||
|
||||
# check if this message is for an alias identity (REPLY PATH)
|
||||
mapping = self.database.messages.get_forwarding_mapping(
|
||||
mapping = ctx.database.messages.get_forwarding_mapping(
|
||||
alias_hash=destination_hash,
|
||||
)
|
||||
|
||||
@@ -9246,13 +9250,14 @@ class ReticulumMeshChat:
|
||||
image_field=image_field,
|
||||
audio_field=audio_field,
|
||||
file_attachments_field=file_attachments_field,
|
||||
context=ctx,
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
# check if this message matches a forwarding rule (FORWARD PATH)
|
||||
# we check for rules that apply to the destination of this message
|
||||
rules = self.database.misc.get_forwarding_rules(
|
||||
rules = ctx.database.misc.get_forwarding_rules(
|
||||
identity_hash=destination_hash,
|
||||
active_only=True,
|
||||
)
|
||||
@@ -9266,7 +9271,7 @@ class ReticulumMeshChat:
|
||||
continue
|
||||
|
||||
# find or create mapping for this (Source, Final Recipient) pair
|
||||
mapping = self.forwarding_manager.get_or_create_mapping(
|
||||
mapping = ctx.forwarding_manager.get_or_create_mapping(
|
||||
source_hash,
|
||||
rule["forward_to_hash"],
|
||||
destination_hash,
|
||||
@@ -9287,6 +9292,7 @@ class ReticulumMeshChat:
|
||||
image_field=image_field,
|
||||
audio_field=audio_field,
|
||||
file_attachments_field=file_attachments_field,
|
||||
context=ctx,
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
@@ -9296,9 +9302,9 @@ class ReticulumMeshChat:
|
||||
traceback.print_exc()
|
||||
|
||||
# handle delivery status update for an outbound lxmf message
|
||||
def on_lxmf_sending_state_updated(self, lxmf_message):
|
||||
def on_lxmf_sending_state_updated(self, lxmf_message, context=None):
|
||||
# upsert lxmf message to database
|
||||
self.db_upsert_lxmf_message(lxmf_message)
|
||||
self.db_upsert_lxmf_message(lxmf_message, context=context)
|
||||
|
||||
# send lxmf message state to all websocket clients
|
||||
AsyncUtils.run_async(
|
||||
@@ -9317,20 +9323,26 @@ class ReticulumMeshChat:
|
||||
)
|
||||
|
||||
# handle delivery failed for an outbound lxmf message
|
||||
def on_lxmf_sending_failed(self, lxmf_message):
|
||||
def on_lxmf_sending_failed(self, lxmf_message, context=None):
|
||||
# check if this failed message should fall back to sending via a propagation node
|
||||
if (
|
||||
lxmf_message.state == LXMF.LXMessage.FAILED
|
||||
and hasattr(lxmf_message, "try_propagation_on_fail")
|
||||
and lxmf_message.try_propagation_on_fail
|
||||
):
|
||||
self.send_failed_message_via_propagation_node(lxmf_message)
|
||||
self.send_failed_message_via_propagation_node(lxmf_message, context=context)
|
||||
|
||||
# update state
|
||||
self.on_lxmf_sending_state_updated(lxmf_message)
|
||||
|
||||
# sends a previously failed message via a propagation node
|
||||
def send_failed_message_via_propagation_node(self, lxmf_message: LXMF.LXMessage):
|
||||
def send_failed_message_via_propagation_node(
|
||||
self, lxmf_message: LXMF.LXMessage, context=None
|
||||
):
|
||||
ctx = context or self.current_context
|
||||
if not ctx:
|
||||
return
|
||||
|
||||
# reset internal message state
|
||||
lxmf_message.packed = None
|
||||
lxmf_message.delivery_attempts = 0
|
||||
@@ -9343,12 +9355,12 @@ class ReticulumMeshChat:
|
||||
|
||||
# resend message
|
||||
source_hash = lxmf_message.source_hash.hex()
|
||||
router = self.message_router
|
||||
router = ctx.message_router
|
||||
if (
|
||||
self.forwarding_manager
|
||||
and source_hash in self.forwarding_manager.forwarding_routers
|
||||
ctx.forwarding_manager
|
||||
and source_hash in ctx.forwarding_manager.forwarding_routers
|
||||
):
|
||||
router = self.forwarding_manager.forwarding_routers[source_hash]
|
||||
router = ctx.forwarding_manager.forwarding_routers[source_hash]
|
||||
router.handle_outbound(lxmf_message)
|
||||
|
||||
# upserts the provided lxmf message to the database
|
||||
@@ -9393,7 +9405,12 @@ class ReticulumMeshChat:
|
||||
title: str = "",
|
||||
sender_identity_hash: str = None,
|
||||
no_display: bool = False,
|
||||
context=None,
|
||||
) -> LXMF.LXMessage:
|
||||
ctx = context or self.current_context
|
||||
if not ctx:
|
||||
raise RuntimeError("No identity context available for sending message")
|
||||
|
||||
# convert destination hash to bytes
|
||||
destination_hash_bytes = bytes.fromhex(destination_hash)
|
||||
|
||||
@@ -9442,7 +9459,7 @@ class ReticulumMeshChat:
|
||||
# send messages over a direct link by default
|
||||
desired_delivery_method = LXMF.LXMessage.DIRECT
|
||||
if (
|
||||
not self.message_router.delivery_link_available(destination_hash_bytes)
|
||||
not ctx.message_router.delivery_link_available(destination_hash_bytes)
|
||||
and RNS.Identity.current_ratchet_id(destination_hash_bytes) is not None
|
||||
):
|
||||
# since there's no link established to the destination, it's faster to send opportunistically
|
||||
@@ -9452,14 +9469,14 @@ class ReticulumMeshChat:
|
||||
desired_delivery_method = LXMF.LXMessage.OPPORTUNISTIC
|
||||
|
||||
# determine which identity to send from
|
||||
source_destination = self.local_lxmf_destination
|
||||
source_destination = ctx.local_lxmf_destination
|
||||
if sender_identity_hash is not None:
|
||||
if (
|
||||
self.forwarding_manager
|
||||
ctx.forwarding_manager
|
||||
and sender_identity_hash
|
||||
in self.forwarding_manager.forwarding_destinations
|
||||
in ctx.forwarding_manager.forwarding_destinations
|
||||
):
|
||||
source_destination = self.forwarding_manager.forwarding_destinations[
|
||||
source_destination = ctx.forwarding_manager.forwarding_destinations[
|
||||
sender_identity_hash
|
||||
]
|
||||
else:
|
||||
@@ -9476,7 +9493,7 @@ class ReticulumMeshChat:
|
||||
desired_method=desired_delivery_method,
|
||||
)
|
||||
lxmf_message.try_propagation_on_fail = (
|
||||
self.config.auto_send_failed_messages_to_propagation_node.get()
|
||||
ctx.config.auto_send_failed_messages_to_propagation_node.get()
|
||||
)
|
||||
|
||||
lxmf_message.fields = {}
|
||||
@@ -9541,28 +9558,33 @@ class ReticulumMeshChat:
|
||||
]
|
||||
|
||||
# update last sent icon hash for this destination
|
||||
self.database.misc.update_last_sent_icon_hash(
|
||||
ctx.database.misc.update_last_sent_icon_hash(
|
||||
destination_hash, current_icon_hash
|
||||
)
|
||||
|
||||
# register delivery callbacks
|
||||
lxmf_message.register_delivery_callback(self.on_lxmf_sending_state_updated)
|
||||
lxmf_message.register_failed_callback(self.on_lxmf_sending_failed)
|
||||
lxmf_message.register_delivery_callback(
|
||||
lambda msg: self.on_lxmf_sending_state_updated(msg, context=ctx)
|
||||
)
|
||||
lxmf_message.register_failed_callback(
|
||||
lambda msg: self.on_lxmf_sending_failed(msg, context=ctx)
|
||||
)
|
||||
|
||||
# determine which router to use
|
||||
router = self.message_router
|
||||
router = ctx.message_router
|
||||
if (
|
||||
sender_identity_hash is not None
|
||||
and sender_identity_hash in self.forwarding_manager.forwarding_routers
|
||||
and ctx.forwarding_manager
|
||||
and sender_identity_hash in ctx.forwarding_manager.forwarding_routers
|
||||
):
|
||||
router = self.forwarding_manager.forwarding_routers[sender_identity_hash]
|
||||
router = ctx.forwarding_manager.forwarding_routers[sender_identity_hash]
|
||||
|
||||
# send lxmf message to be routed to destination
|
||||
router.handle_outbound(lxmf_message)
|
||||
|
||||
# upsert lxmf message to database
|
||||
if not no_display:
|
||||
self.db_upsert_lxmf_message(lxmf_message)
|
||||
self.db_upsert_lxmf_message(lxmf_message, context=ctx)
|
||||
|
||||
# tell all websocket clients that old failed message was deleted so it can remove from ui
|
||||
if not no_display:
|
||||
@@ -9583,7 +9605,9 @@ class ReticulumMeshChat:
|
||||
# otherwise other incoming websocket packets will not be processed until sending is complete
|
||||
# which results in the next message not showing up until the first message is finished
|
||||
if not no_display:
|
||||
AsyncUtils.run_async(self.handle_lxmf_message_progress(lxmf_message))
|
||||
AsyncUtils.run_async(
|
||||
self.handle_lxmf_message_progress(lxmf_message, context=ctx)
|
||||
)
|
||||
|
||||
return lxmf_message
|
||||
|
||||
@@ -9642,16 +9666,20 @@ class ReticulumMeshChat:
|
||||
print(f"Failed to respond to telemetry request: {e}")
|
||||
|
||||
# updates lxmf message in database and broadcasts to websocket until it's delivered, or it fails
|
||||
async def handle_lxmf_message_progress(self, lxmf_message):
|
||||
async def handle_lxmf_message_progress(self, lxmf_message, context=None):
|
||||
# FIXME: there's no register_progress_callback on the lxmf message, so manually send progress until delivered, propagated or failed
|
||||
# we also can't use on_lxmf_sending_state_updated method to do this, because of async/await issues...
|
||||
ctx = context or self.current_context
|
||||
if not ctx:
|
||||
return
|
||||
|
||||
should_update_message = True
|
||||
while should_update_message:
|
||||
# wait 1 second between sending updates
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# upsert lxmf message to database (as we want to update the progress in database too)
|
||||
self.db_upsert_lxmf_message(lxmf_message)
|
||||
self.db_upsert_lxmf_message(lxmf_message, context=ctx)
|
||||
|
||||
# send update to websocket clients
|
||||
await self.websocket_broadcast(
|
||||
@@ -9816,9 +9844,11 @@ class ReticulumMeshChat:
|
||||
)
|
||||
|
||||
# resend all failed messages that were intended for this destination
|
||||
if self.config.auto_resend_failed_messages_when_announce_received.get():
|
||||
if ctx.config.auto_resend_failed_messages_when_announce_received.get():
|
||||
AsyncUtils.run_async(
|
||||
self.resend_failed_messages_for_destination(destination_hash.hex()),
|
||||
self.resend_failed_messages_for_destination(
|
||||
destination_hash.hex(), context=ctx
|
||||
),
|
||||
)
|
||||
|
||||
# handle an announce received from reticulum, for an lxmf propagation node address
|
||||
@@ -9874,9 +9904,15 @@ class ReticulumMeshChat:
|
||||
)
|
||||
|
||||
# resends all messages that previously failed to send to the provided destination hash
|
||||
async def resend_failed_messages_for_destination(self, destination_hash: str):
|
||||
async def resend_failed_messages_for_destination(
|
||||
self, destination_hash: str, context=None
|
||||
):
|
||||
ctx = context or self.current_context
|
||||
if not ctx:
|
||||
return
|
||||
|
||||
# get messages that failed to send to this destination
|
||||
failed_messages = self.database.messages.get_failed_messages_for_destination(
|
||||
failed_messages = ctx.database.messages.get_failed_messages_for_destination(
|
||||
destination_hash,
|
||||
)
|
||||
|
||||
@@ -9915,7 +9951,7 @@ class ReticulumMeshChat:
|
||||
file_attachments_field = LxmfFileAttachmentsField(file_attachments)
|
||||
|
||||
# don't resend message with attachments if not allowed
|
||||
if not self.config.allow_auto_resending_failed_messages_with_attachments.get():
|
||||
if not ctx.config.allow_auto_resending_failed_messages_with_attachments.get():
|
||||
if (
|
||||
image_field is not None
|
||||
or audio_field is not None
|
||||
@@ -9933,10 +9969,11 @@ class ReticulumMeshChat:
|
||||
image_field=image_field,
|
||||
audio_field=audio_field,
|
||||
file_attachments_field=file_attachments_field,
|
||||
context=ctx,
|
||||
)
|
||||
|
||||
# remove original failed message from database
|
||||
self.database.messages.delete_lxmf_message_by_hash(
|
||||
ctx.database.messages.delete_lxmf_message_by_hash(
|
||||
failed_message["hash"],
|
||||
)
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import base64
|
||||
import os
|
||||
|
||||
import LXMF
|
||||
import RNS
|
||||
|
||||
from .database import Database
|
||||
from .meshchat_utils import create_lxmf_router
|
||||
|
||||
|
||||
class ForwardingManager:
|
||||
@@ -34,7 +34,7 @@ class ForwardingManager:
|
||||
)
|
||||
os.makedirs(router_storage_path, exist_ok=True)
|
||||
|
||||
router = LXMF.LXMRouter(
|
||||
router = create_lxmf_router(
|
||||
identity=alias_identity,
|
||||
storagepath=router_storage_path,
|
||||
)
|
||||
@@ -79,7 +79,7 @@ class ForwardingManager:
|
||||
)
|
||||
os.makedirs(router_storage_path, exist_ok=True)
|
||||
|
||||
router = LXMF.LXMRouter(
|
||||
router = create_lxmf_router(
|
||||
identity=alias_identity,
|
||||
storagepath=router_storage_path,
|
||||
)
|
||||
|
||||
@@ -2,7 +2,6 @@ import os
|
||||
import asyncio
|
||||
import threading
|
||||
import RNS
|
||||
import LXMF
|
||||
from meshchatx.src.backend.database import Database
|
||||
from meshchatx.src.backend.integrity_manager import IntegrityManager
|
||||
from meshchatx.src.backend.config_manager import ConfigManager
|
||||
@@ -21,6 +20,7 @@ from meshchatx.src.backend.rnpath_handler import RNPathHandler
|
||||
from meshchatx.src.backend.rnprobe_handler import RNProbeHandler
|
||||
from meshchatx.src.backend.translator_handler import TranslatorHandler
|
||||
from meshchatx.src.backend.forwarding_manager import ForwardingManager
|
||||
from meshchatx.src.backend.meshchat_utils import create_lxmf_router
|
||||
from meshchatx.src.backend.announce_handler import AnnounceHandler
|
||||
from meshchatx.src.backend.community_interfaces import CommunityInterfacesManager
|
||||
|
||||
@@ -168,7 +168,7 @@ class IdentityContext:
|
||||
|
||||
# 4. Initialize LXMF Router
|
||||
propagation_stamp_cost = self.config.lxmf_propagation_node_stamp_cost.get()
|
||||
self.message_router = LXMF.LXMRouter(
|
||||
self.message_router = create_lxmf_router(
|
||||
identity=self.identity,
|
||||
storagepath=self.lxmf_router_path,
|
||||
propagation_cost=propagation_stamp_cost,
|
||||
|
||||
@@ -1,11 +1,39 @@
|
||||
import base64
|
||||
import json
|
||||
import signal
|
||||
import threading
|
||||
|
||||
import LXMF
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
from LXMF import LXMRouter
|
||||
|
||||
|
||||
def create_lxmf_router(identity, storagepath, propagation_cost=None):
|
||||
"""
|
||||
Creates an LXMF.LXMRouter instance safely, avoiding signal handler crashes
|
||||
when called from non-main threads.
|
||||
"""
|
||||
if threading.current_thread() != threading.main_thread():
|
||||
# signal.signal can only be called from the main thread in Python
|
||||
# We monkeypatch it temporarily to avoid the ValueError
|
||||
original_signal = signal.signal
|
||||
try:
|
||||
signal.signal = lambda s, h: None
|
||||
return LXMF.LXMRouter(
|
||||
identity=identity,
|
||||
storagepath=storagepath,
|
||||
propagation_cost=propagation_cost,
|
||||
)
|
||||
finally:
|
||||
signal.signal = original_signal
|
||||
else:
|
||||
return LXMF.LXMRouter(
|
||||
identity=identity,
|
||||
storagepath=storagepath,
|
||||
propagation_cost=propagation_cost,
|
||||
)
|
||||
|
||||
|
||||
def parse_bool_query_param(value: str | None) -> bool:
|
||||
if value is None:
|
||||
return False
|
||||
|
||||
@@ -1520,7 +1520,10 @@ export default {
|
||||
throw new Error(`HTTP ${response.status}`);
|
||||
}
|
||||
const blob = await response.blob();
|
||||
tile.getImage().src = URL.createObjectURL(blob);
|
||||
const url = URL.createObjectURL(blob);
|
||||
tile.getImage().src = url;
|
||||
// Cleanup to prevent memory leaks
|
||||
setTimeout(() => URL.revokeObjectURL(url), 10000);
|
||||
} catch {
|
||||
tile.setState(3);
|
||||
}
|
||||
@@ -1535,7 +1538,9 @@ export default {
|
||||
try {
|
||||
const cached = await TileCache.getTile(src);
|
||||
if (cached) {
|
||||
tile.getImage().src = URL.createObjectURL(cached);
|
||||
const url = URL.createObjectURL(cached);
|
||||
tile.getImage().src = url;
|
||||
setTimeout(() => URL.revokeObjectURL(url), 10000);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1544,8 +1549,12 @@ export default {
|
||||
throw new Error(`HTTP ${response.status}`);
|
||||
}
|
||||
const blob = await response.blob();
|
||||
await TileCache.setTile(src, blob);
|
||||
tile.getImage().src = URL.createObjectURL(blob);
|
||||
const url = URL.createObjectURL(blob);
|
||||
tile.getImage().src = url;
|
||||
setTimeout(() => URL.revokeObjectURL(url), 10000);
|
||||
|
||||
// Background cache write to avoid blocking UI
|
||||
TileCache.setTile(src, blob).catch(() => {});
|
||||
} catch {
|
||||
originalTileLoadFunction(tile, src);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user