refactor(tests): update backend integrity tests to include metadata in manifest and add new debug log tests for persistent logging functionality

This commit is contained in:
2026-01-03 18:30:40 -06:00
parent 842c4a3938
commit 9ecdd157f3
3 changed files with 139 additions and 6 deletions

View File

@@ -31,14 +31,21 @@ class TestBackendIntegrity(unittest.TestCase):
shutil.rmtree(self.test_dir)
def generate_manifest(self):
manifest = {}
manifest = {
"_metadata": {
"version": 1,
"date": "2026-01-03",
"time": "12:00:00",
},
"files": {},
}
for root, _, files in os.walk(self.build_dir):
for file in files:
full_path = Path(root) / file
rel_path = str(full_path.relative_to(self.build_dir))
with open(full_path, "rb") as f:
hash = hashlib.sha256(f.read()).hexdigest()
manifest[rel_path] = hash
manifest["files"][rel_path] = hash
manifest_path = self.electron_dir / "backend-manifest.json"
with open(manifest_path, "w") as f:
@@ -51,9 +58,10 @@ class TestBackendIntegrity(unittest.TestCase):
with open(manifest_path, "r") as f:
manifest = json.load(f)
self.assertEqual(len(manifest), 2)
self.assertIn("ReticulumMeshChatX", manifest)
self.assertIn("lib/some_lib.so", manifest)
self.assertEqual(len(manifest["files"]), 2)
self.assertIn("ReticulumMeshChatX", manifest["files"])
self.assertIn("lib/some_lib.so", manifest["files"])
self.assertIn("_metadata", manifest)
def test_tampering_detection_logic(self):
"""Test that modifying a file changes its hash (logic check)."""
@@ -61,7 +69,7 @@ class TestBackendIntegrity(unittest.TestCase):
with open(manifest_path, "r") as f:
manifest = json.load(f)
old_hash = manifest["ReticulumMeshChatX"]
old_hash = manifest["files"]["ReticulumMeshChatX"]
# Tamper
with open(self.build_dir / "ReticulumMeshChatX", "w") as f:

View File

@@ -0,0 +1,107 @@
import time
import pytest
import logging
from meshchatx.src.backend.persistent_log_handler import PersistentLogHandler
from meshchatx.src.backend.database import Database
@pytest.fixture
def db(tmp_path):
db_file = tmp_path / "test_logs.db"
database = Database(str(db_file))
database.initialize()
return database
@pytest.fixture
def handler(db):
handler = PersistentLogHandler(database=db, flush_interval=0.1)
logger = logging.getLogger("test_logger")
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return handler, logger
def test_log_insertion(handler, db):
persistent_handler, logger = handler
logger.info("Test message")
# Wait for flush
time.sleep(0.2)
logger.info("Trigger flush") # emit triggers flush if interval passed
logs = persistent_handler.get_logs(limit=10)
assert len(logs) >= 2
# Logs are descending by timestamp, so newer is first
messages = [l["message"] for l in logs]
assert "Test message" in messages
assert "Trigger flush" in messages
def test_search_and_filter(handler, db):
persistent_handler, logger = handler
logger.info("Hello world")
logger.error("Something went wrong")
time.sleep(0.2)
logger.debug("Force flush")
# Search
results = persistent_handler.get_logs(search="world")
assert len(results) == 1
assert "Hello world" in results[0]["message"]
# Filter by level
results = persistent_handler.get_logs(level="ERROR")
assert len(results) == 1
assert "Something went wrong" in results[0]["message"]
def test_anomaly_flooding(handler, db):
persistent_handler, logger = handler
persistent_handler.flooding_threshold = 5
for i in range(10):
logger.info(f"Message {i}")
time.sleep(0.2)
logger.debug("Force flush")
logs = persistent_handler.get_logs(limit=20)
anomalies = [l for l in logs if l["is_anomaly"]]
assert len(anomalies) > 0
assert any(l["anomaly_type"] == "flooding" for l in anomalies)
def test_anomaly_repeat(handler, db):
persistent_handler, logger = handler
persistent_handler.repeat_threshold = 3
for _ in range(5):
logger.info("Same message")
time.sleep(0.2)
logger.debug("Force flush")
logs = persistent_handler.get_logs(limit=20)
anomalies = [l for l in logs if l["is_anomaly"]]
assert len(anomalies) > 0
assert any(l["anomaly_type"] == "repeat" for l in anomalies)
def test_log_cleanup(handler, db):
persistent_handler, logger = handler
# Insert many logs
for i in range(100):
logger.info(f"Log {i}")
time.sleep(0.2)
logger.debug("Trigger cleanup")
# Force cleanup with small limit
db.debug_logs.cleanup_old_logs(max_logs=10)
count = db.debug_logs.get_total_count()
assert count <= 11 # 10 + the trigger log

View File

@@ -53,6 +53,24 @@ class TestIntegrityManager(unittest.TestCase):
is_ok, issues = self.manager.check_integrity()
self.assertFalse(is_ok)
self.assertTrue(any("Database modified" in i for i in issues))
self.assertTrue(any("Last integrity snapshot" in i for i in issues))
def test_identity_mismatch(self):
"""Test detection of identity mismatch in manifest."""
self.manager.identity_hash = "original_hash"
self.manager.save_manifest()
# Change identity hash
self.manager.identity_hash = "new_hash"
# Tamper a file to trigger issues list which includes the metadata check
with open(self.db_path, "a") as f:
f.write("tampered")
is_ok, issues = self.manager.check_integrity()
self.assertFalse(is_ok)
self.assertTrue(any("Identity mismatch" in i for i in issues))
self.assertTrue(any("Manifest belongs to: original_hash" in i for i in issues))
def test_identity_tampered(self):
"""Test detection of identity file modification."""