From d209c0c9abcfcac8929ec4289cc71e489f16b515 Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Sat, 3 Jan 2026 19:26:35 -0600 Subject: [PATCH] feat(tests): add comprehensive tests for AnnounceDAO filtering functionality and enhance robustness checks for propagation nodes endpoint --- tests/backend/test_announce_dao_extended.py | 94 +++++++++++++++++++ tests/backend/test_app_endpoints.py | 1 + tests/backend/test_debug_logs.py | 8 +- .../test_propagation_nodes_robustness.py | 76 +++++++++++++++ 4 files changed, 175 insertions(+), 4 deletions(-) create mode 100644 tests/backend/test_announce_dao_extended.py create mode 100644 tests/backend/test_propagation_nodes_robustness.py diff --git a/tests/backend/test_announce_dao_extended.py b/tests/backend/test_announce_dao_extended.py new file mode 100644 index 0000000..522fb8a --- /dev/null +++ b/tests/backend/test_announce_dao_extended.py @@ -0,0 +1,94 @@ +import os +import tempfile +import pytest +from meshchatx.src.backend.database.provider import DatabaseProvider +from meshchatx.src.backend.database.schema import DatabaseSchema +from meshchatx.src.backend.database.announces import AnnounceDAO + + +@pytest.fixture +def temp_db(): + fd, path = tempfile.mkstemp() + os.close(fd) + yield path + if os.path.exists(path): + os.remove(path) + + +@pytest.fixture +def announce_dao(temp_db): + provider = DatabaseProvider(temp_db) + schema = DatabaseSchema(provider) + schema.initialize() + dao = AnnounceDAO(provider) + yield dao + provider.close() + + +def test_get_filtered_announces_identity_hash(announce_dao): + # Setup: Insert some dummy announces + announce_dao.upsert_announce( + { + "destination_hash": "dest1", + "aspect": "lxmf.propagation", + "identity_hash": "ident1", + "identity_public_key": "pub1", + "app_data": "data1", + "rssi": -50, + "snr": 10, + "quality": 1.0, + } + ) + announce_dao.upsert_announce( + { + "destination_hash": "dest2", + "aspect": "lxmf.delivery", + "identity_hash": "ident1", + "identity_public_key": "pub1", + "app_data": "data2", + "rssi": -50, + "snr": 10, + "quality": 1.0, + } + ) + announce_dao.upsert_announce( + { + "destination_hash": "dest3", + "aspect": "lxmf.delivery", + "identity_hash": "ident2", + "identity_public_key": "pub2", + "app_data": "data3", + "rssi": -50, + "snr": 10, + "quality": 1.0, + } + ) + + # Test filtering by identity_hash + results = announce_dao.get_filtered_announces(identity_hash="ident1") + assert len(results) == 2 + assert all(r["identity_hash"] == "ident1" for r in results) + + # Test filtering by identity_hash and aspect + results = announce_dao.get_filtered_announces( + identity_hash="ident1", aspect="lxmf.propagation" + ) + assert len(results) == 1 + assert results[0]["destination_hash"] == "dest1" + + # Test filtering by destination_hash + results = announce_dao.get_filtered_announces(destination_hash="dest2") + assert len(results) == 1 + assert results[0]["identity_hash"] == "ident1" + + +def test_get_filtered_announces_robustness(announce_dao): + # Test with non-existent identity_hash + results = announce_dao.get_filtered_announces(identity_hash="non_existent") + assert len(results) == 0 + + # Test with multiple filters that yield no results + results = announce_dao.get_filtered_announces( + identity_hash="ident1", aspect="non_existent_aspect" + ) + assert len(results) == 0 diff --git a/tests/backend/test_app_endpoints.py b/tests/backend/test_app_endpoints.py index b95adbf..1380fcb 100644 --- a/tests/backend/test_app_endpoints.py +++ b/tests/backend/test_app_endpoints.py @@ -41,6 +41,7 @@ async def test_app_info_extended(mock_rns_minimal, temp_dir): patch("meshchatx.meshchat.generate_ssl_certificate"), patch("psutil.Process") as mock_process, patch("psutil.net_io_counters") as mock_net_io, + patch("importlib.metadata.version", return_value="1.2.3"), patch("meshchatx.meshchat.LXST") as mock_lxst, ): mock_lxst.__version__ = "1.2.3" diff --git a/tests/backend/test_debug_logs.py b/tests/backend/test_debug_logs.py index d2b800a..370ab29 100644 --- a/tests/backend/test_debug_logs.py +++ b/tests/backend/test_debug_logs.py @@ -62,9 +62,9 @@ def test_anomaly_flooding(handler, db): persistent_handler.flooding_threshold = 5 for i in range(10): - logger.info(f"Message {i}") + logger.warning(f"Message {i}") - time.sleep(0.2) + # Wait for flush logger.debug("Force flush") logs = persistent_handler.get_logs(limit=20) @@ -78,9 +78,9 @@ def test_anomaly_repeat(handler, db): persistent_handler.repeat_threshold = 3 for _ in range(5): - logger.info("Same message") + logger.warning("Same message") - time.sleep(0.2) + # Wait for flush logger.debug("Force flush") logs = persistent_handler.get_logs(limit=20) diff --git a/tests/backend/test_propagation_nodes_robustness.py b/tests/backend/test_propagation_nodes_robustness.py new file mode 100644 index 0000000..e274232 --- /dev/null +++ b/tests/backend/test_propagation_nodes_robustness.py @@ -0,0 +1,76 @@ +import shutil +import tempfile +import pytest +import json +from unittest.mock import MagicMock, patch +from meshchatx.meshchat import ReticulumMeshChat +import asyncio +import RNS + + +@pytest.fixture +def temp_dir(): + dir_path = tempfile.mkdtemp() + yield dir_path + shutil.rmtree(dir_path) + + +@pytest.fixture +def mock_rns_minimal(): + with ( + patch("RNS.Reticulum") as mock_rns, + patch("RNS.Transport"), + patch("LXMF.LXMRouter"), + patch("meshchatx.meshchat.get_file_path", return_value="/tmp/mock_path"), + patch("meshchatx.meshchat.generate_ssl_certificate"), + ): + mock_rns_instance = mock_rns.return_value + mock_rns_instance.configpath = "/tmp/mock_config" + mock_rns_instance.is_connected_to_shared_instance = False + + # Use a real identity to satisfy RNS.Destination.hash + real_id = RNS.Identity() + + # We can still mock parts of it if needed, but RNS expects + # isinstance(identity, RNS.Identity) to pass or bytes of specific length + yield real_id + + +@pytest.mark.asyncio +async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir): + app_instance = ReticulumMeshChat( + identity=mock_rns_minimal, + storage_dir=temp_dir, + reticulum_config_dir=temp_dir, + ) + + # Find the propagation nodes handler + handler = None + for route in app_instance.get_routes(): + if route.path == "/api/v1/lxmf/propagation-nodes" and route.method == "GET": + handler = route.handler + break + + assert handler is not None + + # Test with valid limit + request = MagicMock() + request.query = {"limit": "10"} + response = await handler(request) + assert response.status == 200 + data = json.loads(response.body) + assert "lxmf_propagation_nodes" in data + + # Test with invalid limit (should not crash) + request.query = {"limit": "invalid"} + response = await handler(request) + assert response.status == 200 + data = json.loads(response.body) + assert "lxmf_propagation_nodes" in data + + # Test with missing limit (should not crash) + request.query = {} + response = await handler(request) + assert response.status == 200 + data = json.loads(response.body) + assert "lxmf_propagation_nodes" in data