From 9ecdd157f3d83f1ac009caf81d1aba730c81bb99 Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Sat, 3 Jan 2026 18:30:40 -0600 Subject: [PATCH] refactor(tests): update backend integrity tests to include metadata in manifest and add new debug log tests for persistent logging functionality --- tests/backend/test_backend_integrity.py | 20 +++-- tests/backend/test_debug_logs.py | 107 ++++++++++++++++++++++++ tests/backend/test_integrity.py | 18 ++++ 3 files changed, 139 insertions(+), 6 deletions(-) create mode 100644 tests/backend/test_debug_logs.py diff --git a/tests/backend/test_backend_integrity.py b/tests/backend/test_backend_integrity.py index 2be7225..2b5ad63 100644 --- a/tests/backend/test_backend_integrity.py +++ b/tests/backend/test_backend_integrity.py @@ -31,14 +31,21 @@ class TestBackendIntegrity(unittest.TestCase): shutil.rmtree(self.test_dir) def generate_manifest(self): - manifest = {} + manifest = { + "_metadata": { + "version": 1, + "date": "2026-01-03", + "time": "12:00:00", + }, + "files": {}, + } for root, _, files in os.walk(self.build_dir): for file in files: full_path = Path(root) / file rel_path = str(full_path.relative_to(self.build_dir)) with open(full_path, "rb") as f: hash = hashlib.sha256(f.read()).hexdigest() - manifest[rel_path] = hash + manifest["files"][rel_path] = hash manifest_path = self.electron_dir / "backend-manifest.json" with open(manifest_path, "w") as f: @@ -51,9 +58,10 @@ class TestBackendIntegrity(unittest.TestCase): with open(manifest_path, "r") as f: manifest = json.load(f) - self.assertEqual(len(manifest), 2) - self.assertIn("ReticulumMeshChatX", manifest) - self.assertIn("lib/some_lib.so", manifest) + self.assertEqual(len(manifest["files"]), 2) + self.assertIn("ReticulumMeshChatX", manifest["files"]) + self.assertIn("lib/some_lib.so", manifest["files"]) + self.assertIn("_metadata", manifest) def test_tampering_detection_logic(self): """Test that modifying a file changes its hash (logic check).""" @@ -61,7 +69,7 @@ class TestBackendIntegrity(unittest.TestCase): with open(manifest_path, "r") as f: manifest = json.load(f) - old_hash = manifest["ReticulumMeshChatX"] + old_hash = manifest["files"]["ReticulumMeshChatX"] # Tamper with open(self.build_dir / "ReticulumMeshChatX", "w") as f: diff --git a/tests/backend/test_debug_logs.py b/tests/backend/test_debug_logs.py new file mode 100644 index 0000000..4892785 --- /dev/null +++ b/tests/backend/test_debug_logs.py @@ -0,0 +1,107 @@ +import time +import pytest +import logging +from meshchatx.src.backend.persistent_log_handler import PersistentLogHandler +from meshchatx.src.backend.database import Database + + +@pytest.fixture +def db(tmp_path): + db_file = tmp_path / "test_logs.db" + database = Database(str(db_file)) + database.initialize() + return database + + +@pytest.fixture +def handler(db): + handler = PersistentLogHandler(database=db, flush_interval=0.1) + logger = logging.getLogger("test_logger") + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + return handler, logger + + +def test_log_insertion(handler, db): + persistent_handler, logger = handler + logger.info("Test message") + + # Wait for flush + time.sleep(0.2) + logger.info("Trigger flush") # emit triggers flush if interval passed + + logs = persistent_handler.get_logs(limit=10) + assert len(logs) >= 2 + # Logs are descending by timestamp, so newer is first + messages = [l["message"] for l in logs] + assert "Test message" in messages + assert "Trigger flush" in messages + + +def test_search_and_filter(handler, db): + persistent_handler, logger = handler + logger.info("Hello world") + logger.error("Something went wrong") + + time.sleep(0.2) + logger.debug("Force flush") + + # Search + results = persistent_handler.get_logs(search="world") + assert len(results) == 1 + assert "Hello world" in results[0]["message"] + + # Filter by level + results = persistent_handler.get_logs(level="ERROR") + assert len(results) == 1 + assert "Something went wrong" in results[0]["message"] + + +def test_anomaly_flooding(handler, db): + persistent_handler, logger = handler + persistent_handler.flooding_threshold = 5 + + for i in range(10): + logger.info(f"Message {i}") + + time.sleep(0.2) + logger.debug("Force flush") + + logs = persistent_handler.get_logs(limit=20) + anomalies = [l for l in logs if l["is_anomaly"]] + assert len(anomalies) > 0 + assert any(l["anomaly_type"] == "flooding" for l in anomalies) + + +def test_anomaly_repeat(handler, db): + persistent_handler, logger = handler + persistent_handler.repeat_threshold = 3 + + for _ in range(5): + logger.info("Same message") + + time.sleep(0.2) + logger.debug("Force flush") + + logs = persistent_handler.get_logs(limit=20) + anomalies = [l for l in logs if l["is_anomaly"]] + assert len(anomalies) > 0 + assert any(l["anomaly_type"] == "repeat" for l in anomalies) + + +def test_log_cleanup(handler, db): + persistent_handler, logger = handler + + # Insert many logs + for i in range(100): + logger.info(f"Log {i}") + + time.sleep(0.2) + logger.debug("Trigger cleanup") + + # Force cleanup with small limit + db.debug_logs.cleanup_old_logs(max_logs=10) + + count = db.debug_logs.get_total_count() + assert count <= 11 # 10 + the trigger log + diff --git a/tests/backend/test_integrity.py b/tests/backend/test_integrity.py index ab3b602..3277a7f 100644 --- a/tests/backend/test_integrity.py +++ b/tests/backend/test_integrity.py @@ -53,6 +53,24 @@ class TestIntegrityManager(unittest.TestCase): is_ok, issues = self.manager.check_integrity() self.assertFalse(is_ok) self.assertTrue(any("Database modified" in i for i in issues)) + self.assertTrue(any("Last integrity snapshot" in i for i in issues)) + + def test_identity_mismatch(self): + """Test detection of identity mismatch in manifest.""" + self.manager.identity_hash = "original_hash" + self.manager.save_manifest() + + # Change identity hash + self.manager.identity_hash = "new_hash" + + # Tamper a file to trigger issues list which includes the metadata check + with open(self.db_path, "a") as f: + f.write("tampered") + + is_ok, issues = self.manager.check_integrity() + self.assertFalse(is_ok) + self.assertTrue(any("Identity mismatch" in i for i in issues)) + self.assertTrue(any("Manifest belongs to: original_hash" in i for i in issues)) def test_identity_tampered(self): """Test detection of identity file modification."""