Add active transfer management in RNS FileSync
- Introduced a mechanism to track active outgoing file and delta transfers, preventing duplicate requests. - Implemented cleanup logic to remove transfer entries upon completion or failure. - Enhanced logging to provide feedback on transfer status and permission checks. - Ensured thread safety with locks around active transfer modifications.
This commit is contained in:
@@ -51,6 +51,9 @@ transfer_stats = {
|
||||
}
|
||||
transfer_stats_lock = threading.Lock()
|
||||
|
||||
active_outgoing_transfers = {}
|
||||
active_outgoing_transfers_lock = threading.Lock()
|
||||
|
||||
|
||||
class Colors:
|
||||
"""ANSI color codes for terminal output."""
|
||||
@@ -1321,6 +1324,14 @@ def handle_file_request(data, link):
|
||||
if not filepath:
|
||||
return
|
||||
|
||||
transfer_key = (id(link), filepath)
|
||||
|
||||
with active_outgoing_transfers_lock:
|
||||
if transfer_key in active_outgoing_transfers:
|
||||
RNS.log(f"Transfer already in progress for {filepath}, ignoring duplicate request", RNS.LOG_DEBUG)
|
||||
return
|
||||
active_outgoing_transfers[transfer_key] = time.time()
|
||||
|
||||
try:
|
||||
remote_identity = link.get_remote_identity()
|
||||
if remote_identity and whitelist_enabled:
|
||||
@@ -1329,6 +1340,8 @@ def handle_file_request(data, link):
|
||||
f"Peer {RNS.prettyhexrep(remote_identity.hash)} does not have read permission for {filepath}",
|
||||
RNS.LOG_WARNING,
|
||||
)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
return
|
||||
except Exception as e:
|
||||
RNS.log(f"Error checking permissions: {e}", RNS.LOG_DEBUG)
|
||||
@@ -1337,6 +1350,8 @@ def handle_file_request(data, link):
|
||||
|
||||
if not os.path.exists(full_path):
|
||||
RNS.log(f"Peer requested non-existent file: {filepath}", RNS.LOG_WARNING)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
return
|
||||
|
||||
try:
|
||||
@@ -1385,25 +1400,39 @@ def handle_file_request(data, link):
|
||||
if time.time() - start_wait > timeout:
|
||||
RNS.log(f"File {filepath} resource acceptance timeout", RNS.LOG_ERROR)
|
||||
resource.cancel()
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
return
|
||||
if resource.status == RNS.Resource.FAILED:
|
||||
RNS.log(f"File {filepath} resource failed", RNS.LOG_ERROR)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
return
|
||||
time.sleep(0.1)
|
||||
|
||||
if resource.status == RNS.Resource.REJECTED:
|
||||
RNS.log(f"File {filepath} resource was rejected by peer", RNS.LOG_WARNING)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
return
|
||||
|
||||
RNS.log(f"File {filepath} resource accepted, transfer in progress...", RNS.LOG_VERBOSE)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
def cleanup_transfer():
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
|
||||
threading.Timer(5.0, cleanup_transfer).start()
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Error creating resource for {filepath}: {e}", RNS.LOG_ERROR)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Error sending file {filepath}: {e}", RNS.LOG_ERROR)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
|
||||
|
||||
def handle_delta_request(data, link):
|
||||
@@ -1420,6 +1449,14 @@ def handle_delta_request(data, link):
|
||||
if not filepath:
|
||||
return
|
||||
|
||||
transfer_key = (id(link), filepath, "delta")
|
||||
|
||||
with active_outgoing_transfers_lock:
|
||||
if transfer_key in active_outgoing_transfers:
|
||||
RNS.log(f"Delta transfer already in progress for {filepath}, ignoring duplicate request", RNS.LOG_DEBUG)
|
||||
return
|
||||
active_outgoing_transfers[transfer_key] = time.time()
|
||||
|
||||
try:
|
||||
remote_identity = link.get_remote_identity()
|
||||
if remote_identity and whitelist_enabled:
|
||||
@@ -1428,6 +1465,8 @@ def handle_delta_request(data, link):
|
||||
f"Peer {RNS.prettyhexrep(remote_identity.hash)} does not have read permission for {filepath}",
|
||||
RNS.LOG_WARNING,
|
||||
)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
return
|
||||
except Exception as e:
|
||||
RNS.log(f"Error checking permissions: {e}", RNS.LOG_DEBUG)
|
||||
@@ -1438,6 +1477,8 @@ def handle_delta_request(data, link):
|
||||
RNS.log(
|
||||
f"Peer requested delta for non-existent file: {filepath}", RNS.LOG_WARNING,
|
||||
)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
return
|
||||
|
||||
try:
|
||||
@@ -1452,6 +1493,8 @@ def handle_delta_request(data, link):
|
||||
|
||||
if len(blocks_to_send) == len(local_blocks):
|
||||
RNS.log(f"No common blocks, sending full file: {filepath}", RNS.LOG_VERBOSE)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
handle_file_request({"path": filepath}, link)
|
||||
return
|
||||
|
||||
@@ -1522,9 +1565,14 @@ def handle_delta_request(data, link):
|
||||
packet.send()
|
||||
|
||||
RNS.log(f"Delta sent for {filepath}", RNS.LOG_VERBOSE)
|
||||
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Error sending delta for {filepath}: {e}", RNS.LOG_ERROR)
|
||||
with active_outgoing_transfers_lock:
|
||||
active_outgoing_transfers.pop(transfer_key, None)
|
||||
|
||||
|
||||
def resource_callback(resource):
|
||||
@@ -1537,19 +1585,33 @@ def resource_callback(resource):
|
||||
True if resource should be accepted, False otherwise.
|
||||
|
||||
"""
|
||||
filepath = None
|
||||
if resource.metadata and "filepath" in resource.metadata:
|
||||
try:
|
||||
filepath = resource.metadata["filepath"].decode("utf-8")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
sender_identity = resource.link.get_remote_identity()
|
||||
sender_hash = RNS.prettyhexrep(sender_identity.hash) if sender_identity else "unknown"
|
||||
|
||||
if sender_identity:
|
||||
if whitelist_enabled:
|
||||
if not check_permission(sender_identity.hash, "write"):
|
||||
RNS.log(
|
||||
f"Rejecting resource from {RNS.prettyhexrep(sender_identity.hash)} - no write permission",
|
||||
RNS.LOG_VERBOSE,
|
||||
f"Rejecting resource from {sender_hash} - no write permission" + (f" for {filepath}" if filepath else ""),
|
||||
RNS.LOG_WARNING,
|
||||
)
|
||||
return False
|
||||
RNS.log(f"Accepting resource from {sender_hash}" + (f" for {filepath}" if filepath else ""), RNS.LOG_VERBOSE)
|
||||
return True
|
||||
|
||||
return not whitelist_enabled
|
||||
if not whitelist_enabled:
|
||||
RNS.log(f"Accepting resource (no whitelist)" + (f" for {filepath}" if filepath else ""), RNS.LOG_VERBOSE)
|
||||
return True
|
||||
|
||||
RNS.log(f"Rejecting resource - no sender identity and whitelist enabled" + (f" for {filepath}" if filepath else ""), RNS.LOG_WARNING)
|
||||
return False
|
||||
|
||||
|
||||
def resource_started(resource):
|
||||
|
||||
Reference in New Issue
Block a user