184 lines
6.8 KiB
Python
184 lines
6.8 KiB
Python
import unittest
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
import secrets
|
|
from unittest.mock import MagicMock
|
|
from meshchatx.src.backend.database import Database
|
|
from meshchatx.src.backend.announce_manager import AnnounceManager
|
|
|
|
|
|
class TestPerformanceBottlenecks(unittest.TestCase):
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.db_path = os.path.join(self.test_dir, "perf_bottleneck.db")
|
|
self.db = Database(self.db_path)
|
|
self.db.initialize()
|
|
self.announce_manager = AnnounceManager(self.db)
|
|
self.reticulum_mock = MagicMock()
|
|
self.reticulum_mock.get_packet_rssi.return_value = -50
|
|
self.reticulum_mock.get_packet_snr.return_value = 5.0
|
|
self.reticulum_mock.get_packet_q.return_value = 3
|
|
|
|
def tearDown(self):
|
|
self.db.close()
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_message_pagination_performance(self):
|
|
"""Test performance of message pagination with a large dataset."""
|
|
num_messages = 10000
|
|
peer_hash = secrets.token_hex(16)
|
|
|
|
print(f"\nSeeding {num_messages} messages for pagination test...")
|
|
with self.db.provider:
|
|
for i in range(num_messages):
|
|
msg = {
|
|
"hash": secrets.token_hex(16),
|
|
"source_hash": peer_hash,
|
|
"destination_hash": "my_hash",
|
|
"peer_hash": peer_hash,
|
|
"state": "delivered",
|
|
"progress": 1.0,
|
|
"is_incoming": 1,
|
|
"method": "direct",
|
|
"delivery_attempts": 1,
|
|
"title": f"Msg {i}",
|
|
"content": "Content",
|
|
"fields": "{}",
|
|
"timestamp": time.time() - i,
|
|
"rssi": -50,
|
|
"snr": 5.0,
|
|
"quality": 3,
|
|
"is_spam": 0,
|
|
}
|
|
self.db.messages.upsert_lxmf_message(msg)
|
|
|
|
# Benchmark different offsets
|
|
offsets = [0, 1000, 5000, 9000]
|
|
limit = 50
|
|
for offset in offsets:
|
|
start = time.time()
|
|
msgs = self.db.messages.get_conversation_messages(
|
|
peer_hash, limit=limit, offset=offset
|
|
)
|
|
duration = (time.time() - start) * 1000
|
|
print(f"Fetch {limit} messages at offset {offset}: {duration:.2f}ms")
|
|
self.assertEqual(len(msgs), limit)
|
|
self.assertLess(duration, 50, f"Pagination at offset {offset} is too slow!")
|
|
|
|
def test_announce_flood_bottleneck(self):
|
|
"""Simulate a flood of incoming announces and measure processing time."""
|
|
num_announces = 500
|
|
identities = [MagicMock() for _ in range(num_announces)]
|
|
for i, ident in enumerate(identities):
|
|
ident.hash = MagicMock()
|
|
ident.hash.hex.return_value = secrets.token_hex(16)
|
|
ident.get_public_key.return_value = b"public_key"
|
|
|
|
print(f"\nSimulating flood of {num_announces} announces...")
|
|
start_total = time.time()
|
|
|
|
# We simulate what meshchat.on_lxmf_announce_received does
|
|
with self.db.provider:
|
|
for i in range(num_announces):
|
|
dest_hash = secrets.token_hex(16)
|
|
aspect = "lxmf.delivery"
|
|
app_data = b"app_data"
|
|
packet_hash = b"packet_hash"
|
|
|
|
# This is the synchronous part that could be a bottleneck
|
|
self.announce_manager.upsert_announce(
|
|
self.reticulum_mock,
|
|
identities[i],
|
|
dest_hash,
|
|
aspect,
|
|
app_data,
|
|
packet_hash,
|
|
)
|
|
|
|
# Simulate the fetch after upsert which is also done in the app
|
|
self.db.announces.get_announce_by_hash(dest_hash)
|
|
|
|
duration_total = time.time() - start_total
|
|
avg_duration = (duration_total / num_announces) * 1000
|
|
print(
|
|
f"Processed {num_announces} announces in {duration_total:.2f}s (Avg: {avg_duration:.2f}ms/announce)"
|
|
)
|
|
|
|
self.assertLess(avg_duration, 20, "Announce processing is too slow!")
|
|
|
|
def test_announce_pagination_performance(self):
|
|
"""Test performance of announce pagination with search and filtering."""
|
|
num_announces = 5000
|
|
print(f"\nSeeding {num_announces} announces for pagination test...")
|
|
with self.db.provider:
|
|
for i in range(num_announces):
|
|
data = {
|
|
"destination_hash": secrets.token_hex(16),
|
|
"aspect": "lxmf.delivery" if i % 2 == 0 else "lxst.telephony",
|
|
"identity_hash": secrets.token_hex(16),
|
|
"identity_public_key": "pubkey",
|
|
"app_data": "data",
|
|
"rssi": -50,
|
|
"snr": 5.0,
|
|
"quality": 3,
|
|
}
|
|
self.db.announces.upsert_announce(data)
|
|
|
|
# Benchmark filtered search with pagination
|
|
start = time.time()
|
|
results = self.announce_manager.get_filtered_announces(
|
|
aspect="lxmf.delivery", limit=50, offset=1000
|
|
)
|
|
duration = (time.time() - start) * 1000
|
|
print(f"Filtered announce pagination (offset 1000): {duration:.2f}ms")
|
|
self.assertEqual(len(results), 50)
|
|
self.assertLess(duration, 50, "Announce pagination is too slow!")
|
|
|
|
def test_concurrent_announce_handling(self):
|
|
"""Test how the database handles concurrent announce insertions from multiple threads."""
|
|
import threading
|
|
|
|
num_threads = 10
|
|
announces_per_thread = 50
|
|
|
|
def insert_announces():
|
|
for _ in range(announces_per_thread):
|
|
dest_hash = secrets.token_hex(16)
|
|
ident = MagicMock()
|
|
ident.hash.hex.return_value = secrets.token_hex(16)
|
|
ident.get_public_key.return_value = b"pubkey"
|
|
|
|
self.announce_manager.upsert_announce(
|
|
self.reticulum_mock,
|
|
ident,
|
|
dest_hash,
|
|
"lxmf.delivery",
|
|
b"data",
|
|
b"packet",
|
|
)
|
|
|
|
threads = [
|
|
threading.Thread(target=insert_announces) for _ in range(num_threads)
|
|
]
|
|
|
|
print(
|
|
f"\nRunning {num_threads} threads inserting {announces_per_thread} announces each..."
|
|
)
|
|
start = time.time()
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
duration = time.time() - start
|
|
|
|
print(
|
|
f"Concurrent insertion took {duration:.2f}s for {num_threads * announces_per_thread} announces"
|
|
)
|
|
self.assertLess(duration, 2.0, "Concurrent announce insertion is too slow!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|