refactor(tests): streamline test code by removing unused imports and optimizing function calls for performance benchmarks

This commit is contained in:
2026-01-03 18:43:13 -06:00
parent a1c87bebf3
commit fd41a62bc1
24 changed files with 34 additions and 79 deletions

View File

@@ -84,7 +84,7 @@ def test_db_performance():
# Test unread states for all peers
print("Testing get_conversations_unread_states()...")
start_time = time.time()
unread = db.messages.get_conversations_unread_states(peer_hashes)
_ = db.messages.get_conversations_unread_states(peer_hashes)
end_time = time.time()
print(
f"get_conversations_unread_states() for {len(peer_hashes)} peers took {end_time - start_time:.4f} seconds"
@@ -111,7 +111,7 @@ def test_db_performance():
print("Testing get_filtered_announces()...")
start_time = time.time()
anns = db.announces.get_filtered_announces(limit=100)
_ = db.announces.get_filtered_announces(limit=100)
end_time = time.time()
print(f"get_filtered_announces() took {end_time - start_time:.4f} seconds")

View File

@@ -6,7 +6,6 @@ import random
import secrets
import psutil
import gc
from unittest.mock import MagicMock
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.recovery import CrashRecovery

View File

@@ -5,8 +5,6 @@ import shutil
import tempfile
import random
import secrets
import gc
from unittest.mock import MagicMock
# Ensure we can import meshchatx
sys.path.append(os.getcwd())
@@ -14,10 +12,8 @@ sys.path.append(os.getcwd())
import json
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.identity_manager import IdentityManager
from meshchatx.src.backend.announce_manager import AnnounceManager
from meshchatx.src.backend.database.telephone import TelephoneDAO
from tests.backend.benchmarking_utils import (
MemoryTracker,
benchmark,
get_memory_usage_mb,
)

View File

@@ -1,10 +1,8 @@
import os
import shutil
import tempfile
import pytest
import json
from unittest.mock import MagicMock, patch
from aiohttp import web
from meshchatx.meshchat import ReticulumMeshChat
import RNS
import asyncio
@@ -110,7 +108,7 @@ async def test_app_shutdown_endpoint(mock_rns_minimal, temp_dir):
# We need to patch sys.exit to avoid stopping the test runner
with (
patch("sys.exit") as mock_exit,
patch("sys.exit"),
patch("asyncio.sleep", return_value=asyncio.sleep(0)),
):
response = await shutdown_handler(request)

View File

@@ -1,10 +1,7 @@
import os
import shutil
import tempfile
import json
import pytest
from unittest.mock import MagicMock, patch
from aiohttp import web
from meshchatx.meshchat import ReticulumMeshChat
import RNS
@@ -97,9 +94,6 @@ async def test_app_status_endpoints(mock_rns_minimal, temp_dir):
app_instance.config.set("changelog_seen_version", "4.0.0")
assert app_instance.config.get("changelog_seen_version") == "4.0.0"
# Mock request for app_info
mock_request = MagicMock()
# Test app_info returns these values
with ExitStack() as info_stack:
info_stack.enter_context(patch("psutil.Process"))

View File

@@ -1,5 +1,4 @@
import pytest
import asyncio
from unittest.mock import MagicMock, patch
from meshchatx.src.backend.community_interfaces import CommunityInterfacesManager
from meshchatx.src.backend.rnstatus_handler import RNStatusHandler

View File

@@ -1,6 +1,5 @@
import unittest
import os
import sqlite3
import tempfile
import shutil
from meshchatx.src.backend.database.provider import DatabaseProvider

View File

@@ -1,7 +1,6 @@
import os
import shutil
import tempfile
from unittest.mock import MagicMock, patch
import pytest
from meshchatx.src.backend.database import Database

View File

@@ -25,15 +25,15 @@ def handler(db):
def test_log_insertion(handler, db):
persistent_handler, logger = handler
logger.info("Test message")
# Wait for flush
time.sleep(0.2)
logger.info("Trigger flush") # emit triggers flush if interval passed
logger.info("Trigger flush") # emit triggers flush if interval passed
logs = persistent_handler.get_logs(limit=10)
assert len(logs) >= 2
# Logs are descending by timestamp, so newer is first
messages = [l["message"] for l in logs]
messages = [log["message"] for log in logs]
assert "Test message" in messages
assert "Trigger flush" in messages
@@ -42,15 +42,15 @@ def test_search_and_filter(handler, db):
persistent_handler, logger = handler
logger.info("Hello world")
logger.error("Something went wrong")
time.sleep(0.2)
logger.debug("Force flush")
# Search
results = persistent_handler.get_logs(search="world")
assert len(results) == 1
assert "Hello world" in results[0]["message"]
# Filter by level
results = persistent_handler.get_logs(level="ERROR")
assert len(results) == 1
@@ -60,48 +60,47 @@ def test_search_and_filter(handler, db):
def test_anomaly_flooding(handler, db):
persistent_handler, logger = handler
persistent_handler.flooding_threshold = 5
for i in range(10):
logger.info(f"Message {i}")
time.sleep(0.2)
logger.debug("Force flush")
logs = persistent_handler.get_logs(limit=20)
anomalies = [l for l in logs if l["is_anomaly"]]
anomalies = [log for log in logs if log["is_anomaly"]]
assert len(anomalies) > 0
assert any(l["anomaly_type"] == "flooding" for l in anomalies)
assert any(log["anomaly_type"] == "flooding" for log in anomalies)
def test_anomaly_repeat(handler, db):
persistent_handler, logger = handler
persistent_handler.repeat_threshold = 3
for _ in range(5):
logger.info("Same message")
time.sleep(0.2)
logger.debug("Force flush")
logs = persistent_handler.get_logs(limit=20)
anomalies = [l for l in logs if l["is_anomaly"]]
anomalies = [log for log in logs if log["is_anomaly"]]
assert len(anomalies) > 0
assert any(l["anomaly_type"] == "repeat" for l in anomalies)
assert any(log["anomaly_type"] == "repeat" for log in anomalies)
def test_log_cleanup(handler, db):
persistent_handler, logger = handler
# Insert many logs
for i in range(100):
logger.info(f"Log {i}")
time.sleep(0.2)
logger.debug("Trigger cleanup")
# Force cleanup with small limit
db.debug_logs.cleanup_old_logs(max_logs=10)
count = db.debug_logs.get_total_count()
assert count <= 11 # 10 + the trigger log
count = db.debug_logs.get_total_count()
assert count <= 11 # 10 + the trigger log

View File

@@ -1,8 +1,6 @@
import os
import shutil
import tempfile
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch
from contextlib import ExitStack
import pytest
@@ -243,7 +241,6 @@ async def test_hotswap_identity_recovery(mock_rns, temp_dir):
# Mock setup_identity to fail first time (after hotswap start),
# but the second call (recovery) should succeed.
original_setup = app.setup_identity
app.setup_identity = MagicMock(side_effect=[Exception("Setup failed"), None])
app.teardown_identity = MagicMock()
app.websocket_broadcast = AsyncMock()

View File

@@ -1,8 +1,6 @@
import unittest
import os
import shutil
import tempfile
import json
from pathlib import Path
from meshchatx.src.backend.integrity_manager import IntegrityManager

View File

@@ -1,7 +1,5 @@
import os
import shutil
import tempfile
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
from contextlib import ExitStack
import pytest
@@ -9,7 +7,6 @@ import RNS
import LXMF
from meshchatx.meshchat import ReticulumMeshChat
from meshchatx.src.backend.lxmf_message_fields import LxmfImageField
@pytest.fixture

View File

@@ -6,7 +6,6 @@ import LXMF
from meshchatx.src.backend.lxmf_utils import (
convert_lxmf_message_to_dict,
convert_lxmf_state_to_string,
convert_lxmf_method_to_string,
convert_db_lxmf_message_to_dict,
)

View File

@@ -2,12 +2,10 @@ import unittest
import os
import shutil
import tempfile
import random
import secrets
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.identity_manager import IdentityManager
from meshchatx.src.backend.announce_manager import AnnounceManager
from tests.backend.benchmarking_utils import MemoryTracker, get_memory_usage_mb
from tests.backend.benchmarking_utils import MemoryTracker
class TestMemoryProfiling(unittest.TestCase):
@@ -111,8 +109,6 @@ class TestMemoryProfiling(unittest.TestCase):
def test_announce_manager_leaks(self):
"""Test for memory leaks in AnnounceManager during repeated updates."""
announce_manager = AnnounceManager(self.db)
with MemoryTracker("Announce Stress (2k unique announces)") as tracker:
for i in range(2000):
data = {

View File

@@ -1,5 +1,5 @@
import unittest
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock
from meshchatx.src.backend.message_handler import MessageHandler

View File

@@ -1,5 +1,5 @@
import unittest
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock
from meshchatx.src.backend.nomadnet_downloader import NomadnetDownloader

View File

@@ -3,7 +3,6 @@ import os
import shutil
import tempfile
import time
import random
import secrets
from unittest.mock import MagicMock
from meshchatx.src.backend.database import Database

View File

@@ -84,7 +84,7 @@ def test_setup_receive_destination(mock_rns, temp_dir):
handler = RNCPHandler(mock_rns["Reticulum"], mock_rns["id_instance"], temp_dir)
mock_rns["Reticulum"].identitypath = temp_dir
dest_hash = handler.setup_receive_destination(
_ = handler.setup_receive_destination(
allowed_hashes=["abc123def456"], fetch_allowed=True, fetch_jail=temp_dir
)

View File

@@ -1,12 +1,9 @@
import os
import shutil
import tempfile
import threading
from unittest.mock import MagicMock, patch
import pytest
import RNS
import LXMF
from meshchatx.meshchat import ReticulumMeshChat
@@ -85,12 +82,8 @@ def test_reticulum_meshchat_init(mock_rns, temp_dir):
patch("meshchatx.src.backend.identity_context.MapManager"),
patch("meshchatx.src.backend.identity_context.DocsManager"),
patch("meshchatx.src.backend.identity_context.NomadNetworkManager"),
patch(
"meshchatx.src.backend.identity_context.TelephoneManager"
) as mock_tel_class,
patch(
"meshchatx.src.backend.identity_context.VoicemailManager"
) as mock_vm_class,
patch("meshchatx.src.backend.identity_context.TelephoneManager"),
patch("meshchatx.src.backend.identity_context.VoicemailManager"),
patch("meshchatx.src.backend.identity_context.RingtoneManager"),
patch("meshchatx.src.backend.identity_context.RNCPHandler"),
patch("meshchatx.src.backend.identity_context.RNStatusHandler"),
@@ -206,7 +199,7 @@ def test_reticulum_meshchat_init_database_failure_recovery(mock_rns, temp_dir):
# Fail the first initialize call
mock_db_instance.initialize.side_effect = [Exception("DB Error"), None]
app = ReticulumMeshChat(
_ = ReticulumMeshChat(
identity=mock_rns["id_instance"],
storage_dir=temp_dir,
reticulum_config_dir=temp_dir,

View File

@@ -1,4 +1,3 @@
import os
import shutil
import tempfile
import base64
@@ -153,7 +152,7 @@ def test_database_integrity_recovery(mock_rns, temp_dir):
mock_config.auth_session_secret.get.return_value = "test_secret"
mock_config.display_name.get.return_value = "Test"
app = ReticulumMeshChat(
_ = ReticulumMeshChat(
identity=mock_rns["id_instance"],
storage_dir=temp_dir,
reticulum_config_dir=temp_dir,

View File

@@ -1,5 +1,3 @@
import asyncio
import time
from unittest.mock import MagicMock, patch
import pytest
@@ -26,7 +24,6 @@ async def test_initiation_status_updates(telephone_manager):
telephone_manager.on_initiation_status_callback = status_callback
destination_hash = b"\x01" * 32
destination_hash_hex = destination_hash.hex()
# Mock RNS.Identity.recall to return an identity immediately
with patch.object(RNS.Identity, "recall") as mock_recall:

View File

@@ -1,5 +1,5 @@
import unittest
from unittest.mock import MagicMock, patch, mock_open
from unittest.mock import MagicMock, patch
from meshchatx.src.backend.translator_handler import TranslatorHandler

View File

@@ -1,7 +1,6 @@
import os
import shutil
import tempfile
import threading
from unittest.mock import MagicMock, patch
import pytest

View File

@@ -1,7 +1,5 @@
import unittest
from unittest.mock import MagicMock, patch
import threading
import time
import socket
from meshchatx.src.backend.interfaces.WebsocketServerInterface import (
WebsocketServerInterface,