diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 5b7a5c2..b43e131 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -74,7 +74,7 @@ class LXMRouter: def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None, propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, enforce_ratchets=False, - enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False): + enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False, management_identities = []): random.seed(os.urandom(10)) @@ -140,6 +140,8 @@ def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_ identity = RNS.Identity() self.identity = identity + self.management_identities = management_identities + self.management_identities.append(identity.hash) self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation") self.control_destination = None self.client_propagation_messages_received = 0 @@ -339,7 +341,7 @@ def set_inbound_stamp_cost(self, destination_hash, stamp_cost): delivery_destination.stamp_cost = stamp_cost else: return False - + return True return False @@ -506,7 +508,7 @@ def enable_propagation(self): except Exception as e: RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) - + et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st)) RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE) RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE) @@ -582,7 +584,7 @@ def enable_propagation(self): self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL) self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control") - self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash]) + self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.management_identities) if self.message_storage_limit != None: limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit) @@ -636,7 +638,7 @@ def set_message_storage_limit(self, kilobytes = None, megabytes = None, gigabyte self.message_storage_limit = int(limit_bytes) else: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) - + except Exception as e: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) @@ -669,7 +671,7 @@ def set_information_storage_limit(self, kilobytes = None, megabytes = None, giga self.information_storage_limit = int(limit_bytes) else: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) - + except Exception as e: raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes)) @@ -750,7 +752,7 @@ def compile_stats(self): def stats_get_request(self, path, data, request_id, remote_identity, requested_at): if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY - elif remote_identity.hash != self.identity.hash: + elif remote_identity.hash not in self.management_identities: return LXMPeer.ERROR_NO_ACCESS else: return self.compile_stats() @@ -846,7 +848,7 @@ def clean_links(self): for link in inactive_links: self.active_propagation_links.remove(link) link.teardown() - + except Exception as e: RNS.log("An error occurred while cleaning inbound propagation links. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -889,7 +891,7 @@ def clean_transient_id_caches(self): def update_stamp_cost(self, destination_hash, stamp_cost): RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG) self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost] - + def job(): self.save_outbound_stamp_costs() threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start() @@ -902,7 +904,7 @@ def get_wanted_inbound_peers(self): def get_announce_app_data(self, destination_hash): if destination_hash in self.delivery_destinations: delivery_destination = self.delivery_destinations[destination_hash] - + display_name = None if delivery_destination.display_name != None: display_name = delivery_destination.display_name.encode("utf-8") @@ -928,7 +930,7 @@ def get_weight(self, transient_id): priority_weight = 0.1 else: priority_weight = 1.0 - + weight = priority_weight * age_weight * lxm_size return weight @@ -950,7 +952,7 @@ def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY): if validity_left > LXMessage.TICKET_RENEW: RNS.log(f"Found generated ticket for {RNS.prettyhexrep(destination_hash)} with {RNS.prettytime(validity_left)} of validity left, re-using this one", RNS.LOG_DEBUG) return [expires, ticket] - + else: self.available_tickets["inbound"][destination_hash] = {} @@ -1018,7 +1020,7 @@ def clean_message_store(self): else: RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING) removed_entries[transient_id] = filepath - + removed_count = 0 for transient_id in removed_entries: try: @@ -1062,15 +1064,15 @@ def clean_message_store(self): if os.path.isfile(filepath): os.unlink(filepath) - + self.propagation_entries.pop(transient_id) bytes_cleaned += entry[3] RNS.log("Removed "+RNS.prettyhexrep(transient_id)+" with weight "+str(w[1])+" to clear up "+RNS.prettysize(entry[3])+", now cleaned "+RNS.prettysize(bytes_cleaned)+" out of "+RNS.prettysize(bytes_needed)+" needed", RNS.LOG_EXTREME) - + except Exception as e: RNS.log("Error while cleaning LXMF message from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) - + finally: i += 1 @@ -1120,7 +1122,7 @@ def save_node_stats(self): except Exception as e: RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) - + def clean_outbound_stamp_costs(self): try: @@ -1132,7 +1134,7 @@ def clean_outbound_stamp_costs(self): for destination_hash in expired: self.outbound_stamp_costs.pop(destination_hash) - + except Exception as e: RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR) RNS.trace_exception(e) @@ -1173,7 +1175,7 @@ def clean_available_tickets(self): for inbound_ticket in expired_inbound: self.available_tickets["inbound"][destination_hash].pop(inbound_ticket) - + except Exception as e: RNS.log(f"Error while cleaning available tickets. The contained exception was: {e}", RNS.LOG_ERROR) RNS.trace_exception(e) @@ -1210,7 +1212,7 @@ def reload_available_tickets(self): if not "last_deliveries" in self.available_tickets: RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR) self.available_tickets["last_deliveries"] = {} - + except Exception as e: RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR) @@ -1290,7 +1292,7 @@ def __str__(self): ### Message Download ################################## ####################################################### - + def request_messages_path_job(self): job_thread = threading.Thread(target=self.__request_messages_path_job) job_thread.setDaemon(True) @@ -1306,21 +1308,21 @@ def __request_messages_path_job(self): else: RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG) self.acknowledge_sync_completion(failure_state=LXMRouter.PR_NO_PATH) - + def identity_allowed(self, identity): if self.auth_required: if identity.hash in self.allowed_list: return True else: return False - + else: return True def message_get_request(self, path, data, request_id, remote_identity, requested_at): if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY - + elif not self.identity_allowed(remote_identity): return LXMPeer.ERROR_NO_ACCESS @@ -1357,7 +1359,7 @@ def message_get_request(self, path, data, request_id, remote_identity, requested self.propagation_entries.pop(transient_id) os.unlink(filepath) RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG) - + except Exception as e: RNS.log("Error while processing message purge request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -1400,7 +1402,7 @@ def message_get_request(self, path, data, request_id, remote_identity, requested self.client_propagation_messages_served += len(response_messages) return response_messages - + except Exception as e: RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG) return None @@ -1512,7 +1514,7 @@ def has_message(self, transient_id): return True else: return False - + def cancel_outbound(self, message_id): try: if message_id in self.pending_deferred_stamps: @@ -1600,7 +1602,7 @@ def get_outbound_progress(self, lxm_hash): for lxm_id in self.pending_deferred_stamps: if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: return self.pending_deferred_stamps[lxm_id].progress - + return None def get_outbound_lxm_stamp_cost(self, lxm_hash): @@ -1611,7 +1613,7 @@ def get_outbound_lxm_stamp_cost(self, lxm_hash): for lxm_id in self.pending_deferred_stamps: if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: return self.pending_deferred_stamps[lxm_id].stamp_cost - + return None @@ -1788,7 +1790,7 @@ def peer(self, destination_hash, timestamp, propagation_transfer_limit, wanted_i peer.last_heard = time.time() peer.propagation_transfer_limit = propagation_transfer_limit RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE) - + else: if len(self.peers) < self.max_peers: peer = LXMPeer(self, destination_hash) @@ -1826,7 +1828,7 @@ def rotate_peers(self): if len(untested_peers) >= rotation_headroom: RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG) return - + fully_synced_peers = {} for peer_id in peers: peer = peers[peer_id] @@ -1916,7 +1918,7 @@ def sync_peers(self): reverse=True )[0:min(LXMRouter.FASTEST_N_RANDOM_POOL, len(waiting_peers))] peer_pool.extend(fastest_peers) - + unknown_speed_peers = [p for p in waiting_peers if p.sync_transfer_rate == 0] if len(unknown_speed_peers) > 0: peer_pool.extend( @@ -1928,11 +1930,11 @@ def sync_peers(self): ) RNS.log("Selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG) - + elif len(unresponsive_peers) > 0: RNS.log("No active peers available, randomly selecting peer to sync from "+str(len(unresponsive_peers))+" unresponsive peers.", RNS.LOG_DEBUG) peer_pool = unresponsive_peers - + if len(peer_pool) > 0: selected_index = random.randint(0,len(peer_pool)-1) selected_peer = peer_pool[selected_index] @@ -2084,7 +2086,7 @@ def propagation_resource_concluded(self, resource): else: RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) - + except Exception as e: RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) RNS.trace_exception(e) @@ -2175,7 +2177,7 @@ def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, else: lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==") transient_id = RNS.Identity.full_hash(lxmf_data) - + router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True) if router_propagation_result != False: RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG) @@ -2225,7 +2227,7 @@ def process_deferred_stamps(self): self.pending_deferred_stamps.pop(selected_message_id) if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback): selected_lxm.failed_callback(lxmessage) - + return RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG) @@ -2338,7 +2340,7 @@ def rediscover_job(): if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: delivery_destination_hash = lxmessage.get_destination().hash direct_link = None - + if delivery_destination_hash in self.direct_links: # An established direct link already exists to # the destination, so we'll try to use it for diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index 48885b2..6bd8303 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -78,7 +78,7 @@ def apply_config(): active_configuration["peer_announce_interval"] = lxmd_config["lxmf"].as_int("announce_interval")*60 else: active_configuration["peer_announce_interval"] = None - + if "lxmf" in lxmd_config and "delivery_transfer_max_accepted_size" in lxmd_config["lxmf"]: active_configuration["delivery_transfer_max_accepted_size"] = lxmd_config["lxmf"].as_float("delivery_transfer_max_accepted_size") if active_configuration["delivery_transfer_max_accepted_size"] < 0.38: @@ -102,6 +102,27 @@ def apply_config(): else: active_configuration["auth_required"] = False + if "propagation" in lxmd_config \ + and "remote_management_enabled" in lxmd_config["propagation"] \ + and "remote_management_identities" in lxmd_config["propagation"] \ + and lxmd_config["propagation"].as_bool("remote_management_enabled"): + + idents = lxmd_config["propagation"].as_list("remote_management_identities") + allowed_hashes = [] + for hexhash in idents: + dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 + if len(hexhash) != dest_len: + raise ValueError(f"Identity hash length for remote management ACL {hexhash} is invalid, must be {dest_len} hexadecimal characters ({dest_len//2} bytes).") + try: + ident_hash = bytes.fromhex(hexhash) + except Exception as e: + raise ValueError(f"Invalid identity hash for remote management ACL: {hexhash}") + allowed_hashes.append(ident_hash) + active_configuration["remote_management_identities"] = allowed_hashes + + else: + active_configuration["remote_management_identities"] = [] + if "propagation" in lxmd_config and "announce_at_start" in lxmd_config["propagation"]: active_configuration["node_announce_at_start"] = lxmd_config["propagation"].as_bool("announce_at_start") else: @@ -128,14 +149,14 @@ def apply_config(): active_configuration["message_storage_limit"] = 0.005 else: active_configuration["message_storage_limit"] = 500 - + if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]: active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size") if active_configuration["propagation_transfer_max_accepted_size"] < 0.38: active_configuration["propagation_transfer_max_accepted_size"] = 0.38 else: active_configuration["propagation_transfer_max_accepted_size"] = 256 - + if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") else: @@ -278,7 +299,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo RNS.log("Could not parse the configuration at "+configpath, RNS.LOG_ERROR) RNS.log("Check your configuration file for errors!", RNS.LOG_ERROR) RNS.panic() - + apply_config() RNS.log("Configuration loaded from "+configpath, RNS.LOG_VERBOSE) @@ -287,7 +308,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness - + # Start Reticulum RNS.log("Substantiating Reticulum...") reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest) @@ -315,7 +336,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo RNS.log("Could not create and save a new Primary Identity", RNS.LOG_ERROR) RNS.log("The contained exception was: %s" % (str(e)), RNS.LOG_ERROR) exit(2) - + # Start LXMF message_router = LXMF.LXMRouter( identity = identity, @@ -326,8 +347,9 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo delivery_limit = active_configuration["delivery_transfer_max_accepted_size"], max_peers = active_configuration["max_peers"], static_peers = active_configuration["static_peers"], - from_static_only = active_configuration["from_static_only"]) - + from_static_only = active_configuration["from_static_only"], + management_identities = active_configuration["remote_management_identities"]) + message_router.register_delivery_callback(lxmf_delivery) for destination_hash in active_configuration["ignored_lxmf_destinations"]: @@ -348,7 +370,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo if len(active_configuration["allowed_identities"]) == 0: RNS.log("Clint authentication was enabled, but no identity hashes could be loaded from "+str(allowedpath)+". Nobody will be able to sync messages from this propagation node.", RNS.LOG_WARNING) - + for identity_hash in active_configuration["allowed_identities"]: message_router.allow(identity_hash) @@ -368,7 +390,8 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo message_router.enable_propagation() RNS.log("LXMF Propagation Node started on "+RNS.prettyhexrep(message_router.propagation_destination.hash)) - + if len(message_router.management_identities) > 1: + RNS.log(f"Propagation Node remote management is enabled for {len(message_router.management_identities)-1} identities") RNS.log("Started lxmd version {version}".format(version=__version__), RNS.LOG_NOTICE) threading.Thread(target=deferred_start_jobs, daemon=True).start() @@ -379,7 +402,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo def jobs(): global active_configuration, last_peer_announce, last_node_announce global message_router, lxmf_destination - + while True: try: if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None: @@ -491,13 +514,13 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = identity = RNS.Identity.from_file(identity_path) if identity == None: RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR) - exit(4) + exit(4) if targetloglevel == None: targetloglevel = 3 if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness - + reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest) response = query_status(identity, timeout=timeout, exit_on_fail=True) @@ -616,7 +639,7 @@ def main(): parser.add_argument("--identity", action="store", default=None, help="path to identity used for query request", type=str) parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit") parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__)) - + args = parser.parse_args() if args.exampleconfig: @@ -738,6 +761,18 @@ def main(): auth_required = no +# It is possible to allow remote management of lxmf +# propagation nodes using various utilities, such as +# lxmd --status. You will need to specify a comma +# separated list of one or more Reticulum Identity +# hashes for authenticating the queries from client +# programs. For this purpose, you can use existing +# identity files, or generate new ones with the rnid utility. +# The node's own identity is always allowed regardless +# of these configuration parameters. + +# remote_management_enabled = no +# remote_management_identities = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf [lxmf]