mirror of
https://github.com/torlando-tech/columba.git
synced 2025-12-22 05:37:07 +00:00
Add 84 new tests to improve coverage of remaining uncovered code paths: New files: - test_wrapper_config.py: Tests for _remove_autointerface_from_config(), _setup_interface() - test_wrapper_announce_polling.py: Tests for poll_received_announces() Updated files: - test_announce_handler.py: Add _announce_handler() internal logic tests (table extraction, LXMF parsing, stamp costs, Kotlin bridge notification) - test_opportunistic_timeout.py: Add threading tests for timer lifecycle - test_wrapper_peer_identity.py: Add error handling path tests Total tests: 431 → 515 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
749 lines
29 KiB
Python
749 lines
29 KiB
Python
"""
|
|
Test suite for announce handler functionality
|
|
Tests that the announce handler properly integrates with RNS.Transport.register_announce_handler
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import unittest
|
|
from unittest.mock import Mock, MagicMock, patch
|
|
|
|
# Add parent directory to path to import reticulum_wrapper
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
# Mock RNS and LXMF before importing reticulum_wrapper
|
|
sys.modules['RNS'] = MagicMock()
|
|
sys.modules['RNS.vendor'] = MagicMock()
|
|
sys.modules['RNS.vendor.platformutils'] = MagicMock()
|
|
sys.modules['LXMF'] = MagicMock()
|
|
|
|
# Now import after mocking
|
|
import reticulum_wrapper
|
|
|
|
|
|
class TestAnnounceHandler(unittest.TestCase):
|
|
"""Test the announce handler registration and functionality"""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
# Create a temporary storage path
|
|
import tempfile
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
"""Clean up test fixtures"""
|
|
import shutil
|
|
if os.path.exists(self.temp_dir):
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
def test_handler_has_aspect_filter_attribute(self):
|
|
"""
|
|
Test that announce handler has required aspect_filter attribute.
|
|
|
|
Per Reticulum docs: "Must be an object with an aspect_filter attribute"
|
|
This is REQUIRED for RNS.Transport.register_announce_handler to work.
|
|
"""
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Test one of the handlers from the handlers dict
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
self.assertTrue(
|
|
hasattr(handler, 'aspect_filter'),
|
|
"Announce handler must have 'aspect_filter' attribute for RNS to call it"
|
|
)
|
|
|
|
def test_handler_has_received_announce_method(self):
|
|
"""
|
|
Test that announce handler has required received_announce() callable.
|
|
|
|
Per Reticulum docs: "and a received_announce(destination_hash,
|
|
announced_identity, app_data) callable"
|
|
"""
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Test one of the handlers from the handlers dict
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
|
|
# The handler should have received_announce method
|
|
self.assertTrue(
|
|
hasattr(handler, 'received_announce'),
|
|
"Announce handler must have 'received_announce' method"
|
|
)
|
|
|
|
# It should be callable
|
|
self.assertTrue(
|
|
callable(getattr(handler, 'received_announce', None)),
|
|
"received_announce must be callable"
|
|
)
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_handler_calls_underlying_callback(self, mock_rns):
|
|
"""
|
|
Test that when received_announce is called, it invokes the actual handler.
|
|
"""
|
|
# Mock RNS.Transport.hops_to to return 1
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Mock the internal handler to track calls
|
|
wrapper._announce_handler = Mock()
|
|
|
|
# Create a new handler with the mocked callback
|
|
test_handler = reticulum_wrapper.AnnounceHandler("lxmf.delivery", wrapper._announce_handler)
|
|
|
|
# Simulate RNS calling our handler
|
|
test_dest_hash = b'test_destination_hash_bytes'
|
|
test_identity = Mock()
|
|
test_app_data = b'test_app_data'
|
|
|
|
# This simulates what RNS.Transport would do
|
|
test_handler.received_announce(
|
|
test_dest_hash,
|
|
test_identity,
|
|
test_app_data
|
|
)
|
|
|
|
# Verify our internal handler was called
|
|
wrapper._announce_handler.assert_called_once()
|
|
|
|
def test_aspect_filter_matches_handler_aspect(self):
|
|
"""
|
|
Test that each handler has its correct aspect_filter set.
|
|
|
|
The implementation uses multiple handlers, each filtering for a specific aspect.
|
|
This allows proper routing of announces to the correct handlers.
|
|
"""
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Verify each handler has the correct aspect filter
|
|
expected_aspects = ["lxmf.delivery", "lxmf.propagation", "call.audio", "nomadnetwork.node"]
|
|
|
|
for aspect in expected_aspects:
|
|
handler = wrapper._announce_handlers[aspect]
|
|
self.assertEqual(
|
|
handler.aspect_filter,
|
|
aspect,
|
|
f"Handler for {aspect} should have matching aspect_filter"
|
|
)
|
|
|
|
def test_handler_structure_compatible_with_rns(self):
|
|
"""
|
|
Integration test: Verify handler structure is compatible with RNS requirements.
|
|
|
|
This tests the EXACT requirements from Reticulum's documentation.
|
|
"""
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Test all handlers for RNS compatibility
|
|
for aspect, handler in wrapper._announce_handlers.items():
|
|
# Check all RNS requirements
|
|
checks = {
|
|
'has_aspect_filter': hasattr(handler, 'aspect_filter'),
|
|
'has_received_announce': hasattr(handler, 'received_announce'),
|
|
'received_announce_callable': callable(getattr(handler, 'received_announce', None)),
|
|
}
|
|
|
|
failures = [check for check, passed in checks.items() if not passed]
|
|
|
|
self.assertEqual(
|
|
[],
|
|
failures,
|
|
f"Handler for {aspect} fails RNS compatibility checks: {failures}"
|
|
)
|
|
|
|
|
|
class TestAnnounceHandlerIntegration(unittest.TestCase):
|
|
"""Integration tests for announce handling flow"""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
import tempfile
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
"""Clean up test fixtures"""
|
|
import shutil
|
|
if os.path.exists(self.temp_dir):
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_announce_handler_stores_pending_announces(self, mock_rns):
|
|
"""
|
|
Test that when an announce is received, it's stored in pending_announces.
|
|
|
|
This tests the full flow:
|
|
1. RNS calls handler.received_announce()
|
|
2. Handler calls wrapper._announce_handler()
|
|
3. Announce is added to pending_announces queue
|
|
"""
|
|
# Mock RNS.Transport.hops_to to return 1
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Ensure pending_announces is empty
|
|
self.assertEqual(len(wrapper.pending_announces), 0)
|
|
|
|
# Simulate an announce being received
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_identity.hash = b'test_identity_hash'
|
|
test_app_data = b'test_app_data'
|
|
|
|
# Use one of the handlers from the handlers dict
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
|
|
# Call the handler (simulating what RNS would do)
|
|
handler.received_announce(
|
|
test_dest_hash,
|
|
test_identity,
|
|
test_app_data
|
|
)
|
|
|
|
# Verify announce was stored
|
|
self.assertEqual(
|
|
len(wrapper.pending_announces),
|
|
1,
|
|
"Announce should be added to pending_announces queue"
|
|
)
|
|
|
|
# Verify announce structure
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertEqual(stored_announce['destination_hash'], test_dest_hash)
|
|
self.assertEqual(stored_announce['app_data'], test_app_data)
|
|
|
|
|
|
class TestAnnounceTableExtraction(unittest.TestCase):
|
|
"""Test announce table extraction logic (lines 1225-1236)"""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
import tempfile
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
"""Clean up test fixtures"""
|
|
import shutil
|
|
if os.path.exists(self.temp_dir):
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_extract_interface_from_announce_table(self, mock_rns):
|
|
"""Test extracting receiving interface from RNS.Transport.announce_table"""
|
|
# Setup mock announce_table with proper structure
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
|
|
# Mock packet with receiving_interface
|
|
mock_packet = Mock()
|
|
mock_interface = Mock()
|
|
mock_interface.__str__ = Mock(return_value="TCPInterface[Testnet/127.0.0.1:4242]")
|
|
mock_packet.receiving_interface = mock_interface
|
|
|
|
# announce_table entry structure: IDX_AT_PACKET is at index 5
|
|
announce_entry = [None, None, None, None, None, mock_packet]
|
|
|
|
mock_rns.Transport.announce_table = {test_dest_hash: announce_entry}
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Trigger announce handler
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'test_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify interface was extracted
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertEqual(stored_announce['interface'], "TCPInterface[Testnet/127.0.0.1:4242]")
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_handle_missing_announce_table(self, mock_rns):
|
|
"""Test handler gracefully handles missing announce_table"""
|
|
# RNS.Transport has no announce_table attribute
|
|
mock_rns.Transport.announce_table = None
|
|
delattr(mock_rns.Transport, 'announce_table')
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'test_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
|
|
# Should not raise exception
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify announce was still stored (with None interface)
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertIsNone(stored_announce['interface'])
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_handle_missing_packet_in_announce_entry(self, mock_rns):
|
|
"""Test handler handles announce_table entry without packet"""
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
|
|
# announce_table entry with only 5 elements (no packet at index 5)
|
|
announce_entry = [None, None, None, None, None]
|
|
|
|
mock_rns.Transport.announce_table = {test_dest_hash: announce_entry}
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'test_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Should not crash, interface should be None
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertIsNone(stored_announce['interface'])
|
|
|
|
|
|
class TestLXMFDisplayNameExtraction(unittest.TestCase):
|
|
"""Test LXMF display name extraction logic (lines 1243-1279)"""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
import tempfile
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
"""Clean up test fixtures"""
|
|
import shutil
|
|
if os.path.exists(self.temp_dir):
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
@patch('reticulum_wrapper.LXMF')
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_extract_display_name_for_lxmf_delivery(self, mock_rns, mock_lxmf):
|
|
"""Test extracting display name for lxmf.delivery aspect"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
mock_lxmf.display_name_from_app_data.return_value = "Alice"
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'app_data_with_name'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify LXMF.display_name_from_app_data was called
|
|
mock_lxmf.display_name_from_app_data.assert_called_once_with(test_app_data)
|
|
|
|
# Verify display name was stored
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertEqual(stored_announce['display_name'], "Alice")
|
|
|
|
@patch('reticulum_wrapper.LXMF')
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_extract_display_name_for_lxmf_propagation(self, mock_rns, mock_lxmf):
|
|
"""Test extracting display name for lxmf.propagation aspect"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
mock_lxmf.pn_name_from_app_data.return_value = "Propagation Node Alpha"
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'app_data_with_pn_name'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.propagation"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify LXMF.pn_name_from_app_data was called
|
|
mock_lxmf.pn_name_from_app_data.assert_called_once_with(test_app_data)
|
|
|
|
# Verify display name was stored
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertEqual(stored_announce['display_name'], "Propagation Node Alpha")
|
|
|
|
@patch('reticulum_wrapper.LXMF')
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_handle_lxmf_name_extraction_exception(self, mock_rns, mock_lxmf):
|
|
"""Test handler gracefully handles LXMF name extraction exceptions"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
mock_lxmf.display_name_from_app_data.side_effect = Exception("LXMF parsing error")
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'malformed_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
|
|
# Should not raise exception
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify announce was stored with None display_name
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertIsNone(stored_announce['display_name'])
|
|
|
|
@patch('reticulum_wrapper.LXMF', None)
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_handle_missing_lxmf_module(self, mock_rns):
|
|
"""Test handler handles missing LXMF module gracefully"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Should not crash, display_name should be None
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertIsNone(stored_announce['display_name'])
|
|
|
|
|
|
class TestStampCostExtraction(unittest.TestCase):
|
|
"""Test stamp cost/flexibility/peering cost extraction (lines 1262-1279)"""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
import tempfile
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
"""Clean up test fixtures"""
|
|
import shutil
|
|
if os.path.exists(self.temp_dir):
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
@patch('reticulum_wrapper.LXMF')
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_extract_stamp_cost_for_delivery(self, mock_rns, mock_lxmf):
|
|
"""Test extracting stamp cost for lxmf.delivery aspect"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
mock_lxmf.stamp_cost_from_app_data.return_value = 8
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'app_data_with_stamp'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify LXMF.stamp_cost_from_app_data was called
|
|
mock_lxmf.stamp_cost_from_app_data.assert_called_once_with(test_app_data)
|
|
|
|
# Verify stamp cost was stored
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertEqual(stored_announce['stamp_cost'], 8)
|
|
|
|
@patch('reticulum_wrapper.LXMF')
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_extract_propagation_node_stamp_costs(self, mock_rns, mock_lxmf):
|
|
"""Test extracting stamp cost, flexibility, and peering cost for propagation nodes"""
|
|
# Setup RNS.vendor.umsgpack mock before creating wrapper
|
|
# This needs to be set up BEFORE the import happens in _announce_handler
|
|
mock_umsgpack = MagicMock()
|
|
mock_umsgpack.unpackb.return_value = [
|
|
None, None, None, None, None,
|
|
[16, 2, 4] # [stamp_cost, flexibility, peering_cost]
|
|
]
|
|
mock_rns.vendor.umsgpack = mock_umsgpack
|
|
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
mock_lxmf.pn_stamp_cost_from_app_data.return_value = 16
|
|
mock_lxmf.pn_announce_data_is_valid.return_value = True
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'propagation_node_app_data'
|
|
|
|
# Patch umsgpack in the context where it's imported (inside the function)
|
|
with patch('RNS.vendor.umsgpack.unpackb', return_value=[
|
|
None, None, None, None, None,
|
|
[16, 2, 4] # [stamp_cost, flexibility, peering_cost]
|
|
]):
|
|
handler = wrapper._announce_handlers["lxmf.propagation"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify LXMF.pn_stamp_cost_from_app_data was called
|
|
mock_lxmf.pn_stamp_cost_from_app_data.assert_called_once_with(test_app_data)
|
|
|
|
# Verify all costs were stored
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertEqual(stored_announce['stamp_cost'], 16)
|
|
self.assertEqual(stored_announce['stamp_cost_flexibility'], 2)
|
|
self.assertEqual(stored_announce['peering_cost'], 4)
|
|
|
|
@patch('reticulum_wrapper.LXMF')
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_handle_umsgpack_unpacking_error(self, mock_rns, mock_lxmf):
|
|
"""Test handler gracefully handles umsgpack unpacking errors"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
mock_lxmf.pn_stamp_cost_from_app_data.return_value = 16
|
|
mock_lxmf.pn_announce_data_is_valid.return_value = True
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'malformed_propagation_data'
|
|
|
|
# Patch umsgpack to raise exception during unpack
|
|
with patch('RNS.vendor.umsgpack.unpackb', side_effect=Exception("Invalid msgpack data")):
|
|
handler = wrapper._announce_handlers["lxmf.propagation"]
|
|
|
|
# Should not raise exception
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify announce was stored (flexibility and peering_cost should be None)
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertIsNone(stored_announce['stamp_cost_flexibility'])
|
|
self.assertIsNone(stored_announce['peering_cost'])
|
|
|
|
@patch('reticulum_wrapper.LXMF')
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_handle_stamp_cost_extraction_exception(self, mock_rns, mock_lxmf):
|
|
"""Test handler gracefully handles stamp cost extraction exceptions"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
mock_lxmf.stamp_cost_from_app_data.side_effect = Exception("Stamp cost parsing error")
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'malformed_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
|
|
# Should not raise exception
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify announce was stored with None stamp_cost
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
stored_announce = wrapper.pending_announces[0]
|
|
self.assertIsNone(stored_announce['stamp_cost'])
|
|
|
|
|
|
class TestKotlinBridgeNotification(unittest.TestCase):
|
|
"""Test Kotlin bridge notification logic (lines 1298-1308)"""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
import tempfile
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
"""Clean up test fixtures"""
|
|
import shutil
|
|
if os.path.exists(self.temp_dir):
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_kotlin_bridge_notified_on_announce(self, mock_rns):
|
|
"""Test that kotlin_reticulum_bridge.notifyAnnounceReceived is called"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Set up mock Kotlin bridge
|
|
mock_bridge = Mock()
|
|
wrapper.kotlin_reticulum_bridge = mock_bridge
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'test_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify bridge was notified
|
|
mock_bridge.notifyAnnounceReceived.assert_called_once()
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_no_crash_with_none_bridge(self, mock_rns):
|
|
"""Test that handler doesn't crash when kotlin_reticulum_bridge is None"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
wrapper.kotlin_reticulum_bridge = None
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'test_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
|
|
# Should not raise exception
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify announce was still stored
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_bridge_notification_exception_handled(self, mock_rns):
|
|
"""Test that exceptions from bridge notification are caught and logged"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Set up mock bridge that raises exception
|
|
mock_bridge = Mock()
|
|
mock_bridge.notifyAnnounceReceived.side_effect = Exception("Bridge communication error")
|
|
wrapper.kotlin_reticulum_bridge = mock_bridge
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'test_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
|
|
# Should not raise exception (exception should be caught and logged)
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify announce was still stored despite bridge error
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
|
|
|
|
class TestCallbackExceptionHandling(unittest.TestCase):
|
|
"""Test callback exception handling logic (lines 1310-1316)"""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
import tempfile
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
"""Clean up test fixtures"""
|
|
import shutil
|
|
if os.path.exists(self.temp_dir):
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_callback_exception_does_not_crash_handler(self, mock_rns):
|
|
"""Test that exceptions in registered callbacks don't crash the handler"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Register a callback that raises an exception
|
|
failing_callback = Mock(side_effect=Exception("Callback processing error"))
|
|
wrapper.announce_callbacks.append(failing_callback)
|
|
|
|
# Also register a working callback to verify it still gets called
|
|
working_callback = Mock()
|
|
wrapper.announce_callbacks.append(working_callback)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'test_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
|
|
# Should not raise exception despite failing callback
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify both callbacks were called
|
|
failing_callback.assert_called_once()
|
|
working_callback.assert_called_once()
|
|
|
|
# Verify announce was still stored
|
|
self.assertEqual(len(wrapper.pending_announces), 1)
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_multiple_callbacks_all_called(self, mock_rns):
|
|
"""Test that all registered callbacks are called even if one fails"""
|
|
mock_rns.Transport.hops_to.return_value = 1
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
# Register multiple callbacks
|
|
callback1 = Mock()
|
|
callback2 = Mock(side_effect=Exception("Callback 2 error"))
|
|
callback3 = Mock()
|
|
|
|
wrapper.announce_callbacks.extend([callback1, callback2, callback3])
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey')
|
|
test_app_data = b'test_app_data'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify all callbacks were called despite callback2 failing
|
|
callback1.assert_called_once()
|
|
callback2.assert_called_once()
|
|
callback3.assert_called_once()
|
|
|
|
@patch('reticulum_wrapper.RNS')
|
|
def test_callback_receives_correct_announce_event(self, mock_rns):
|
|
"""Test that callbacks receive the correct announce event structure"""
|
|
mock_rns.Transport.hops_to.return_value = 2
|
|
|
|
wrapper = reticulum_wrapper.ReticulumWrapper(self.temp_dir)
|
|
|
|
callback = Mock()
|
|
wrapper.announce_callbacks.append(callback)
|
|
|
|
test_dest_hash = b'test_dest_hash_123'
|
|
test_identity = Mock()
|
|
test_identity.get_public_key = Mock(return_value=b'test_pubkey_456')
|
|
test_app_data = b'test_app_data_789'
|
|
|
|
handler = wrapper._announce_handlers["lxmf.delivery"]
|
|
handler.received_announce(test_dest_hash, test_identity, test_app_data)
|
|
|
|
# Verify callback was called with announce_event
|
|
callback.assert_called_once()
|
|
announce_event = callback.call_args[0][0]
|
|
|
|
# Verify event structure
|
|
self.assertEqual(announce_event['destination_hash'], test_dest_hash)
|
|
self.assertEqual(announce_event['public_key'], b'test_pubkey_456')
|
|
self.assertEqual(announce_event['app_data'], test_app_data)
|
|
self.assertEqual(announce_event['aspect'], 'lxmf.delivery')
|
|
self.assertEqual(announce_event['hops'], 2)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Run tests with verbose output
|
|
unittest.main(verbosity=2)
|