feat(tests): add comprehensive tests for AnnounceDAO filtering functionality and enhance robustness checks for propagation nodes endpoint
Some checks failed
CI / build-frontend (push) Successful in 1m30s
CI / test-backend (push) Successful in 4s
CI / test-lang (push) Successful in 1m28s
CI / test-backend (pull_request) Successful in 45s
Build and Publish Docker Image / build (pull_request) Has been skipped
CI / test-lang (pull_request) Successful in 1m8s
Benchmarks / benchmark (push) Has been cancelled
Build Test / Build and Test (push) Has been cancelled
CI / lint (push) Has been cancelled
Build Test / Build and Test (pull_request) Has been cancelled
Tests / test (push) Has been cancelled
CI / lint (pull_request) Has been cancelled
Build and Publish Docker Image / build-dev (pull_request) Has been cancelled
Benchmarks / benchmark (pull_request) Has been cancelled
CI / build-frontend (pull_request) Has been cancelled
Tests / test (pull_request) Has been cancelled

This commit is contained in:
2026-01-03 19:26:35 -06:00
parent c9c2aeac68
commit d209c0c9ab
4 changed files with 175 additions and 4 deletions

View File

@@ -0,0 +1,94 @@
import os
import tempfile
import pytest
from meshchatx.src.backend.database.provider import DatabaseProvider
from meshchatx.src.backend.database.schema import DatabaseSchema
from meshchatx.src.backend.database.announces import AnnounceDAO
@pytest.fixture
def temp_db():
fd, path = tempfile.mkstemp()
os.close(fd)
yield path
if os.path.exists(path):
os.remove(path)
@pytest.fixture
def announce_dao(temp_db):
provider = DatabaseProvider(temp_db)
schema = DatabaseSchema(provider)
schema.initialize()
dao = AnnounceDAO(provider)
yield dao
provider.close()
def test_get_filtered_announces_identity_hash(announce_dao):
# Setup: Insert some dummy announces
announce_dao.upsert_announce(
{
"destination_hash": "dest1",
"aspect": "lxmf.propagation",
"identity_hash": "ident1",
"identity_public_key": "pub1",
"app_data": "data1",
"rssi": -50,
"snr": 10,
"quality": 1.0,
}
)
announce_dao.upsert_announce(
{
"destination_hash": "dest2",
"aspect": "lxmf.delivery",
"identity_hash": "ident1",
"identity_public_key": "pub1",
"app_data": "data2",
"rssi": -50,
"snr": 10,
"quality": 1.0,
}
)
announce_dao.upsert_announce(
{
"destination_hash": "dest3",
"aspect": "lxmf.delivery",
"identity_hash": "ident2",
"identity_public_key": "pub2",
"app_data": "data3",
"rssi": -50,
"snr": 10,
"quality": 1.0,
}
)
# Test filtering by identity_hash
results = announce_dao.get_filtered_announces(identity_hash="ident1")
assert len(results) == 2
assert all(r["identity_hash"] == "ident1" for r in results)
# Test filtering by identity_hash and aspect
results = announce_dao.get_filtered_announces(
identity_hash="ident1", aspect="lxmf.propagation"
)
assert len(results) == 1
assert results[0]["destination_hash"] == "dest1"
# Test filtering by destination_hash
results = announce_dao.get_filtered_announces(destination_hash="dest2")
assert len(results) == 1
assert results[0]["identity_hash"] == "ident1"
def test_get_filtered_announces_robustness(announce_dao):
# Test with non-existent identity_hash
results = announce_dao.get_filtered_announces(identity_hash="non_existent")
assert len(results) == 0
# Test with multiple filters that yield no results
results = announce_dao.get_filtered_announces(
identity_hash="ident1", aspect="non_existent_aspect"
)
assert len(results) == 0

View File

@@ -41,6 +41,7 @@ async def test_app_info_extended(mock_rns_minimal, temp_dir):
patch("meshchatx.meshchat.generate_ssl_certificate"),
patch("psutil.Process") as mock_process,
patch("psutil.net_io_counters") as mock_net_io,
patch("importlib.metadata.version", return_value="1.2.3"),
patch("meshchatx.meshchat.LXST") as mock_lxst,
):
mock_lxst.__version__ = "1.2.3"

View File

@@ -62,9 +62,9 @@ def test_anomaly_flooding(handler, db):
persistent_handler.flooding_threshold = 5
for i in range(10):
logger.info(f"Message {i}")
logger.warning(f"Message {i}")
time.sleep(0.2)
# Wait for flush
logger.debug("Force flush")
logs = persistent_handler.get_logs(limit=20)
@@ -78,9 +78,9 @@ def test_anomaly_repeat(handler, db):
persistent_handler.repeat_threshold = 3
for _ in range(5):
logger.info("Same message")
logger.warning("Same message")
time.sleep(0.2)
# Wait for flush
logger.debug("Force flush")
logs = persistent_handler.get_logs(limit=20)

View File

@@ -0,0 +1,76 @@
import shutil
import tempfile
import pytest
import json
from unittest.mock import MagicMock, patch
from meshchatx.meshchat import ReticulumMeshChat
import asyncio
import RNS
@pytest.fixture
def temp_dir():
dir_path = tempfile.mkdtemp()
yield dir_path
shutil.rmtree(dir_path)
@pytest.fixture
def mock_rns_minimal():
with (
patch("RNS.Reticulum") as mock_rns,
patch("RNS.Transport"),
patch("LXMF.LXMRouter"),
patch("meshchatx.meshchat.get_file_path", return_value="/tmp/mock_path"),
patch("meshchatx.meshchat.generate_ssl_certificate"),
):
mock_rns_instance = mock_rns.return_value
mock_rns_instance.configpath = "/tmp/mock_config"
mock_rns_instance.is_connected_to_shared_instance = False
# Use a real identity to satisfy RNS.Destination.hash
real_id = RNS.Identity()
# We can still mock parts of it if needed, but RNS expects
# isinstance(identity, RNS.Identity) to pass or bytes of specific length
yield real_id
@pytest.mark.asyncio
async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir):
app_instance = ReticulumMeshChat(
identity=mock_rns_minimal,
storage_dir=temp_dir,
reticulum_config_dir=temp_dir,
)
# Find the propagation nodes handler
handler = None
for route in app_instance.get_routes():
if route.path == "/api/v1/lxmf/propagation-nodes" and route.method == "GET":
handler = route.handler
break
assert handler is not None
# Test with valid limit
request = MagicMock()
request.query = {"limit": "10"}
response = await handler(request)
assert response.status == 200
data = json.loads(response.body)
assert "lxmf_propagation_nodes" in data
# Test with invalid limit (should not crash)
request.query = {"limit": "invalid"}
response = await handler(request)
assert response.status == 200
data = json.loads(response.body)
assert "lxmf_propagation_nodes" in data
# Test with missing limit (should not crash)
request.query = {}
response = await handler(request)
assert response.status == 200
data = json.loads(response.body)
assert "lxmf_propagation_nodes" in data