Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 40 additions & 38 deletions LXMF/LXMRouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class LXMRouter:

def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None,
propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, enforce_ratchets=False,
enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False):
enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False, management_identities = []):

random.seed(os.urandom(10))

Expand Down Expand Up @@ -140,6 +140,8 @@ def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_
identity = RNS.Identity()

self.identity = identity
self.management_identities = management_identities
self.management_identities.append(identity.hash)
self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.control_destination = None
self.client_propagation_messages_received = 0
Expand Down Expand Up @@ -339,7 +341,7 @@ def set_inbound_stamp_cost(self, destination_hash, stamp_cost):
delivery_destination.stamp_cost = stamp_cost
else:
return False

return True

return False
Expand Down Expand Up @@ -506,7 +508,7 @@ def enable_propagation(self):

except Exception as e:
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)

et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st))
RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE)
RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
Expand Down Expand Up @@ -582,7 +584,7 @@ def enable_propagation(self):
self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)

self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash])
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.management_identities)

if self.message_storage_limit != None:
limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit)
Expand Down Expand Up @@ -636,7 +638,7 @@ def set_message_storage_limit(self, kilobytes = None, megabytes = None, gigabyte
self.message_storage_limit = int(limit_bytes)
else:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))

except Exception as e:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))

Expand Down Expand Up @@ -669,7 +671,7 @@ def set_information_storage_limit(self, kilobytes = None, megabytes = None, giga
self.information_storage_limit = int(limit_bytes)
else:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))

except Exception as e:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))

Expand Down Expand Up @@ -750,7 +752,7 @@ def compile_stats(self):
def stats_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
elif remote_identity.hash != self.identity.hash:
elif remote_identity.hash not in self.management_identities:
return LXMPeer.ERROR_NO_ACCESS
else:
return self.compile_stats()
Expand Down Expand Up @@ -846,7 +848,7 @@ def clean_links(self):
for link in inactive_links:
self.active_propagation_links.remove(link)
link.teardown()

except Exception as e:
RNS.log("An error occurred while cleaning inbound propagation links. The contained exception was: "+str(e), RNS.LOG_ERROR)

Expand Down Expand Up @@ -889,7 +891,7 @@ def clean_transient_id_caches(self):
def update_stamp_cost(self, destination_hash, stamp_cost):
RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG)
self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost]

def job():
self.save_outbound_stamp_costs()
threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start()
Expand All @@ -902,7 +904,7 @@ def get_wanted_inbound_peers(self):
def get_announce_app_data(self, destination_hash):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]

display_name = None
if delivery_destination.display_name != None:
display_name = delivery_destination.display_name.encode("utf-8")
Expand All @@ -928,7 +930,7 @@ def get_weight(self, transient_id):
priority_weight = 0.1
else:
priority_weight = 1.0

weight = priority_weight * age_weight * lxm_size

return weight
Expand All @@ -950,7 +952,7 @@ def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY):
if validity_left > LXMessage.TICKET_RENEW:
RNS.log(f"Found generated ticket for {RNS.prettyhexrep(destination_hash)} with {RNS.prettytime(validity_left)} of validity left, re-using this one", RNS.LOG_DEBUG)
return [expires, ticket]

else:
self.available_tickets["inbound"][destination_hash] = {}

Expand Down Expand Up @@ -1018,7 +1020,7 @@ def clean_message_store(self):
else:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
removed_entries[transient_id] = filepath

removed_count = 0
for transient_id in removed_entries:
try:
Expand Down Expand Up @@ -1062,15 +1064,15 @@ def clean_message_store(self):

if os.path.isfile(filepath):
os.unlink(filepath)

self.propagation_entries.pop(transient_id)
bytes_cleaned += entry[3]

RNS.log("Removed "+RNS.prettyhexrep(transient_id)+" with weight "+str(w[1])+" to clear up "+RNS.prettysize(entry[3])+", now cleaned "+RNS.prettysize(bytes_cleaned)+" out of "+RNS.prettysize(bytes_needed)+" needed", RNS.LOG_EXTREME)

except Exception as e:
RNS.log("Error while cleaning LXMF message from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)

finally:
i += 1

Expand Down Expand Up @@ -1120,7 +1122,7 @@ def save_node_stats(self):

except Exception as e:
RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)


def clean_outbound_stamp_costs(self):
try:
Expand All @@ -1132,7 +1134,7 @@ def clean_outbound_stamp_costs(self):

for destination_hash in expired:
self.outbound_stamp_costs.pop(destination_hash)

except Exception as e:
RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
Expand Down Expand Up @@ -1173,7 +1175,7 @@ def clean_available_tickets(self):

for inbound_ticket in expired_inbound:
self.available_tickets["inbound"][destination_hash].pop(inbound_ticket)

except Exception as e:
RNS.log(f"Error while cleaning available tickets. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
Expand Down Expand Up @@ -1210,7 +1212,7 @@ def reload_available_tickets(self):
if not "last_deliveries" in self.available_tickets:
RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["last_deliveries"] = {}

except Exception as e:
RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR)

Expand Down Expand Up @@ -1290,7 +1292,7 @@ def __str__(self):

### Message Download ##################################
#######################################################

def request_messages_path_job(self):
job_thread = threading.Thread(target=self.__request_messages_path_job)
job_thread.setDaemon(True)
Expand All @@ -1306,21 +1308,21 @@ def __request_messages_path_job(self):
else:
RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG)
self.acknowledge_sync_completion(failure_state=LXMRouter.PR_NO_PATH)

def identity_allowed(self, identity):
if self.auth_required:
if identity.hash in self.allowed_list:
return True
else:
return False

else:
return True

def message_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY

elif not self.identity_allowed(remote_identity):
return LXMPeer.ERROR_NO_ACCESS

Expand Down Expand Up @@ -1357,7 +1359,7 @@ def message_get_request(self, path, data, request_id, remote_identity, requested
self.propagation_entries.pop(transient_id)
os.unlink(filepath)
RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)

except Exception as e:
RNS.log("Error while processing message purge request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)

Expand Down Expand Up @@ -1400,7 +1402,7 @@ def message_get_request(self, path, data, request_id, remote_identity, requested
self.client_propagation_messages_served += len(response_messages)
return response_messages


except Exception as e:
RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
Expand Down Expand Up @@ -1512,7 +1514,7 @@ def has_message(self, transient_id):
return True
else:
return False

def cancel_outbound(self, message_id):
try:
if message_id in self.pending_deferred_stamps:
Expand Down Expand Up @@ -1600,7 +1602,7 @@ def get_outbound_progress(self, lxm_hash):
for lxm_id in self.pending_deferred_stamps:
if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
return self.pending_deferred_stamps[lxm_id].progress

return None

def get_outbound_lxm_stamp_cost(self, lxm_hash):
Expand All @@ -1611,7 +1613,7 @@ def get_outbound_lxm_stamp_cost(self, lxm_hash):
for lxm_id in self.pending_deferred_stamps:
if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
return self.pending_deferred_stamps[lxm_id].stamp_cost

return None


Expand Down Expand Up @@ -1788,7 +1790,7 @@ def peer(self, destination_hash, timestamp, propagation_transfer_limit, wanted_i
peer.last_heard = time.time()
peer.propagation_transfer_limit = propagation_transfer_limit
RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE)

else:
if len(self.peers) < self.max_peers:
peer = LXMPeer(self, destination_hash)
Expand Down Expand Up @@ -1826,7 +1828,7 @@ def rotate_peers(self):
if len(untested_peers) >= rotation_headroom:
RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG)
return

fully_synced_peers = {}
for peer_id in peers:
peer = peers[peer_id]
Expand Down Expand Up @@ -1916,7 +1918,7 @@ def sync_peers(self):
reverse=True
)[0:min(LXMRouter.FASTEST_N_RANDOM_POOL, len(waiting_peers))]
peer_pool.extend(fastest_peers)

unknown_speed_peers = [p for p in waiting_peers if p.sync_transfer_rate == 0]
if len(unknown_speed_peers) > 0:
peer_pool.extend(
Expand All @@ -1928,11 +1930,11 @@ def sync_peers(self):
)

RNS.log("Selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)

elif len(unresponsive_peers) > 0:
RNS.log("No active peers available, randomly selecting peer to sync from "+str(len(unresponsive_peers))+" unresponsive peers.", RNS.LOG_DEBUG)
peer_pool = unresponsive_peers

if len(peer_pool) > 0:
selected_index = random.randint(0,len(peer_pool)-1)
selected_peer = peer_pool[selected_index]
Expand Down Expand Up @@ -2084,7 +2086,7 @@ def propagation_resource_concluded(self, resource):

else:
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)

except Exception as e:
RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
RNS.trace_exception(e)
Expand Down Expand Up @@ -2175,7 +2177,7 @@ def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None,
else:
lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==")
transient_id = RNS.Identity.full_hash(lxmf_data)

router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True)
if router_propagation_result != False:
RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG)
Expand Down Expand Up @@ -2225,7 +2227,7 @@ def process_deferred_stamps(self):
self.pending_deferred_stamps.pop(selected_message_id)
if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
selected_lxm.failed_callback(lxmessage)

return

RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
Expand Down Expand Up @@ -2338,7 +2340,7 @@ def rediscover_job():
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None

if delivery_destination_hash in self.direct_links:
# An established direct link already exists to
# the destination, so we'll try to use it for
Expand Down
Loading