diff --git a/conf/default/cuckoo.conf.default b/conf/default/cuckoo.conf.default index a0da4fb527b..4ee1d3a8938 100644 --- a/conf/default/cuckoo.conf.default +++ b/conf/default/cuckoo.conf.default @@ -233,3 +233,5 @@ analysis = 0 mongo = no # Clean orphan files in mongodb unused_files_in_mongodb = no +# Deduplicated files +files = no diff --git a/lib/cuckoo/common/cleaners_utils.py b/lib/cuckoo/common/cleaners_utils.py index 14953df9bf6..f15a7f894a1 100644 --- a/lib/cuckoo/common/cleaners_utils.py +++ b/lib/cuckoo/common/cleaners_utils.py @@ -125,6 +125,8 @@ def free_space_monitor(path=False, return_value=False, processing=False, analysi cleanup_dict["delete_older_than"] = config.cleaner.analysis if config.cleaner.unused_files_in_mongodb: cleanup_dict["delete_unused_file_data_in_mongo"] = 1 + if config.cleaner.get("files"): + cleanup_dict["delete_files_items_older_than"] = config.cleaner.get("files") need_space, space_available = False, 0 # Calculate the free disk space in megabytes. @@ -349,7 +351,6 @@ def cuckoo_clean_failed_tasks(): # This need to init a console logger handler, because the standard # logger (init_logging()) logs to a file which will be deleted. create_structure() - # ToDo multi status tasks_list = db.list_tasks(status=f"{TASK_FAILED_ANALYSIS}|{TASK_FAILED_PROCESSING}|{TASK_FAILED_REPORTING}|{TASK_RECOVERED}") # ToDo rewrite for bulk delete @@ -683,6 +684,77 @@ def cape_clean_tlp(): delete_bulk_tasks_n_folders(tlp_tasks, False) +def files_clean_before(timerange: str): + """ + Clean up files in storage/files that are not referenced by any analysis + and are older than the specified time range. + """ + older_than = convert_into_time(timerange) + files_folder = os.path.join(CUCKOO_ROOT, "storage", "files") + analyses_folder = os.path.join(CUCKOO_ROOT, "storage", "analyses") + + if not path_exists(files_folder): + return + + # 1. Build set of referenced hashes + referenced = set() + used_mongo = False + + if is_reporting_db_connected() and repconf.mongodb.enabled and "mongo_find" in globals(): + try: + # Query all _id (SHA256) from files collection + cursor = mongo_find("files", {}, {"_id": 1}) + for doc in cursor: + referenced.add(doc["_id"]) + used_mongo = True + log.info("Loaded %d referenced files from MongoDB", len(referenced)) + except Exception as e: + log.error("Failed to query MongoDB for files: %s. Falling back to filesystem scan.", e) + + if not used_mongo and path_exists(analyses_folder): + log.info("Scanning analysis folders for file references...") + with os.scandir(analyses_folder) as it: + for entry in it: + if not entry.is_dir(): + continue + for subdir in ("selfextracted", "files", "CAPE", "procdump"): + check_dir = os.path.join(entry.path, subdir) + if path_exists(check_dir): + with os.scandir(check_dir) as se_it: + for se_entry in se_it: + if se_entry.is_symlink(): + try: + target = os.readlink(se_entry.path) + # Check if it points to storage/files + if os.path.abspath(target).startswith(os.path.abspath(files_folder)): + referenced.add(os.path.basename(target)) + except OSError: + pass + + # 2. Iterate storage/files and clean + for root, _, filenames in os.walk(files_folder, topdown=False): + for sha256 in filenames: + if sha256 in referenced: + continue + + file_path = os.path.join(root, sha256) + try: + st_ctime = path_get_date(file_path) + # Correct logic: delete if OLDER than limit (<) + if datetime.fromtimestamp(st_ctime) < older_than: + path_delete(file_path) + except Exception as e: + log.warning("Error checking/deleting file %s: %s", file_path, e) + + # Try to remove empty directories (except the root files_folder) + if root != files_folder: + try: + os.rmdir(root) + except OSError: + # Directory not empty or other error + pass + + def binaries_clean_before(timerange: str): # In case if "delete_bin_copy = off" we might need to clean binaries # find storage/binaries/ -name "*" -type f -mtime 5 -delete @@ -783,11 +855,14 @@ def execute_cleanup(args: dict, init_log=True): if args.get("delete_tmp_items_older_than"): tmp_clean_before(args["delete_tmp_items_older_than"]) + if args.get("delete_unused_file_data_in_mongo"): + delete_unused_file_data_in_mongo() + if args.get("delete_binaries_items_older_than"): binaries_clean_before(args["delete_binaries_items_older_than"]) - if args.get("delete_unused_file_data_in_mongo"): - delete_unused_file_data_in_mongo() + if args.get("delete_files_items_older_than"): + files_clean_before(args["delete_files_items_older_than"]) if args.get("cleanup_files_collection_by_id"): cleanup_files_collection_by_id(args["cleanup_files_collection_by_id"]) diff --git a/lib/cuckoo/common/integrations/file_extra_info.py b/lib/cuckoo/common/integrations/file_extra_info.py index 4d164c1ecf5..6af17f5a46f 100644 --- a/lib/cuckoo/common/integrations/file_extra_info.py +++ b/lib/cuckoo/common/integrations/file_extra_info.py @@ -45,7 +45,7 @@ path_read_file, path_write_file, ) -from lib.cuckoo.common.utils import get_options, is_text_file +from lib.cuckoo.common.utils import get_files_storage_path, get_options, is_text_file try: from sflock import unpack @@ -385,22 +385,41 @@ def _extracted_files_metadata( file_info["path"] = dest_path file_info["guest_paths"] = [file_info["name"]] file_info["name"] = os.path.basename(dest_path) + # Define the new central storage for all files (extracted, dropped, etc.) + master_file_path = get_files_storage_path(file_info["sha256"]) + files_storage_dir = os.path.dirname(master_file_path) + + # 1. Ensure file is in central storage + if not path_exists(master_file_path): + path_mkdir(files_storage_dir, exist_ok=True) + shutil.move(full_path, master_file_path) + elif path_exists(full_path): + # We already have it, delete the temp duplicate + path_delete(full_path) + + # 2. Create symlink in analysis folder (or copy if link fails) if not path_exists(dest_path): - shutil.move(full_path, dest_path) - print( - json.dumps( - { - "path": os.path.join("files", file_info["sha256"]), - "filepath": file_info["name"], - "pids": [], - "ppids": [], - "metadata": "", - "category": "files", - }, - ensure_ascii=False, - ), - file=f, - ) + try: + os.symlink(master_file_path, dest_path) + except OSError: + # Fallback to copy on error + shutil.copy(master_file_path, dest_path) + + # Update files.json for UI/Reporting to correctly reference the symlinked file + print( + json.dumps( + { + "path": file_info["sha256"], # Store just the SHA256 + "filepath": file_info["name"], + "pids": [], + "ppids": [], + "metadata": "", + "category": "selfextracted", + }, + ensure_ascii=False, + ), + file=f, + ) file_info["data"] = is_text_file(file_info, destination_folder, processing_conf.CAPE.buffer) metadata.append(file_info) diff --git a/lib/cuckoo/common/utils.py b/lib/cuckoo/common/utils.py index 3f2e67dd1bb..9747832e907 100644 --- a/lib/cuckoo/common/utils.py +++ b/lib/cuckoo/common/utils.py @@ -185,6 +185,18 @@ def get_memdump_path(memdump_id, analysis_folder=False): ) +def get_files_storage_path(sha256: str) -> str: + """ + Get the path to the storage/files directory for a given SHA256. + Uses sharding (e.g., storage/files/ab/cd/abcdef...) to avoid + too many files in a single directory. + """ + if not sha256 or len(sha256) < 4: + return os.path.join(CUCKOO_ROOT, "storage", "files", sha256) + + return os.path.join(CUCKOO_ROOT, "storage", "files", sha256[:2], sha256[2:4], sha256) + + def validate_referrer(url): if not url: return None diff --git a/lib/cuckoo/common/web_utils.py b/lib/cuckoo/common/web_utils.py index a8ab2cd0961..99ddecc6795 100644 --- a/lib/cuckoo/common/web_utils.py +++ b/lib/cuckoo/common/web_utils.py @@ -25,6 +25,7 @@ from lib.cuckoo.common.path_utils import path_exists, path_mkdir, path_write_file from lib.cuckoo.common.utils import ( generate_fake_name, + get_files_storage_path, get_ip_address, get_options, get_user_filename, @@ -1118,10 +1119,25 @@ def category_all_files(task_id: str, category: str, base_path: str): # analysis = es.search(index=get_analysis_index(), query=get_query_by_info_id(task_id))["hits"]["hits"][0]["_source"] if analysis: + files = [] if query_category == "CAPE": - return [os.path.join(base_path, block["sha256"]) for block in analysis.get(query_category, {}).get("payloads", [])] + for block in analysis.get(query_category, {}).get("payloads", []): + # Path in files.json now stores only the SHA256, not a relative path + sha256 = block.get("path") or block.get("sha256") + if sha256: + p = get_files_storage_path(sha256) + if path_exists(p): + files.append(p) else: - return [os.path.join(base_path, block["sha256"]) for block in analysis.get(category, [])] + for block in analysis.get(category, []): + # Path in files.json now stores only the SHA256, not a relative path + sha256 = block.get("path") or block.get("sha256") + if sha256: + p = get_files_storage_path(sha256) + if path_exists(p): + files.append(p) + + return files def validate_task(tid, status=TASK_REPORTED): diff --git a/modules/processing/CAPE.py b/modules/processing/CAPE.py index cc4671eed89..354490c5e4e 100644 --- a/modules/processing/CAPE.py +++ b/modules/processing/CAPE.py @@ -16,6 +16,7 @@ import json import logging import os +import shutil import timeit from contextlib import suppress from pathlib import Path @@ -25,12 +26,13 @@ from lib.cuckoo.common.config import Config from lib.cuckoo.common.integrations.file_extra_info import DuplicatesType, static_file_info from lib.cuckoo.common.objects import File -from lib.cuckoo.common.path_utils import path_exists +from lib.cuckoo.common.path_utils import path_exists, path_mkdir from lib.cuckoo.common.replace_patterns_utils import _clean_path from lib.cuckoo.common.utils import ( add_family_detection, convert_to_printable_and_truncate, get_clamav_consensus, + get_files_storage_path, make_bytes, texttypes, wide2str, @@ -175,6 +177,29 @@ def process_file(self, file_path, append_file, metadata: dict, *, category: str, f = File(file_path, metadata.get("metadata", "")) sha256 = f.get_sha256() + # Deduplicate dropped, procdump, CAPE, and package files to storage/files + if category in ("dropped", "procdump", "CAPE", "package", "procmemory") and not os.path.islink(file_path): + try: + master_path = get_files_storage_path(sha256) + files_storage_dir = os.path.dirname(master_path) + + if not path_exists(master_path): + path_mkdir(files_storage_dir, exist_ok=True) + # Move file + shutil.move(file_path, master_path) + else: + # Already exists, delete duplicate + os.remove(file_path) + + # Link back + try: + os.symlink(master_path, file_path) + except (OSError, AttributeError): + shutil.copy(master_path, file_path) + + except Exception as e: + log.error("Deduplication failed for %s: %s", file_path, e) + if sha256 in duplicated["sha256"]: log.debug("Skipping file that has already been processed: %s", sha256) return diff --git a/utils/dist.py b/utils/dist.py index f319bdc2055..0d734ae9a68 100644 --- a/utils/dist.py +++ b/utils/dist.py @@ -41,7 +41,7 @@ from lib.cuckoo.common.dist_db import ExitNodes, Machine, Node, Task, create_session from lib.cuckoo.common.path_utils import path_delete, path_exists, path_get_size, path_mkdir, path_mount_point, path_write_file from lib.cuckoo.common.socket_utils import send_socket_command -from lib.cuckoo.common.utils import get_options +from lib.cuckoo.common.utils import get_files_storage_path, get_options from lib.cuckoo.core.database import ( TASK_BANNED, TASK_DISTRIBUTED, @@ -304,6 +304,64 @@ def node_get_report_nfs(task_id, worker_name, main_task_id) -> bool: return True +def sync_sharded_files_nfs(worker_name, main_task_id): + """ + Synchronize deduplicated files from worker to master using sharded storage. + """ + analysis_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(main_task_id)) + files_json_path = os.path.join(analysis_path, "files.json") + + if not path_exists(files_json_path): + return + + try: + with open(files_json_path, "r") as f: + for line in f: + try: + entry = json.loads(line) + rel_path = entry.get("path") + if not rel_path or "selfextracted" not in rel_path: + continue + + # Extract SHA256 from path (e.g. selfextracted/SHA256) + sha256 = os.path.basename(rel_path) + if len(sha256) != 64: + continue + + # Master destination (sharded) + master_dest = get_files_storage_path(sha256) + + # If missing on master, fetch from worker + if not path_exists(master_dest): + worker_mount = os.path.join(CUCKOO_ROOT, dist_conf.NFS.mount_folder, str(worker_name)) + # Construct worker source path (sharded) manually relative to mount + shard_rel = os.path.join("storage", "files", sha256[:2], sha256[2:4], sha256) + worker_src = os.path.join(worker_mount, shard_rel) + + if path_exists(worker_src): + path_mkdir(os.path.dirname(master_dest), exist_ok=True) + shutil.copy2(worker_src, master_dest) + + # Ensure symlink in analysis folder is correct + link_path = os.path.join(analysis_path, rel_path) + + # If it's a broken link or doesn't exist or is a full file (we want link) + if path_exists(master_dest): + if os.path.islink(link_path): + # Check if it points to the right place? + # For now, simpler to re-link if we want to enforce local storage path + os.remove(link_path) + elif path_exists(link_path): + # It's a file, replace with link to save space + path_delete(link_path) + path_mkdir(os.path.dirname(link_path), exist_ok=True) + os.symlink(master_dest, link_path) + except (json.JSONDecodeError, OSError) as e: + log.error("Error syncing file for task %s: %s", main_task_id, e) + except Exception as e: + log.exception("Failed to sync sharded files for task %s: %s", main_task_id, e) + + def _delete_many(node, ids, nodes, db): """ Deletes multiple tasks from a specified node if the node is not the main server. @@ -955,6 +1013,8 @@ def fetch_latest_reports_nfs(self): t.main_task_id, ) + sync_sharded_files_nfs(node.name, t.main_task_id) + # this doesn't exist for some reason if path_exists(t.path): sample_sha256 = None diff --git a/web/analysis/views.py b/web/analysis/views.py index 3db18dd3250..d2bf914c8a7 100644 --- a/web/analysis/views.py +++ b/web/analysis/views.py @@ -33,7 +33,7 @@ from lib.cuckoo.common.config import Config from lib.cuckoo.common.constants import ANALYSIS_BASE_PATH, CUCKOO_ROOT from lib.cuckoo.common.path_utils import path_exists, path_get_size, path_mkdir, path_read_file, path_safe -from lib.cuckoo.common.utils import delete_folder, yara_detected +from lib.cuckoo.common.utils import delete_folder, get_files_storage_path, yara_detected from lib.cuckoo.common.web_utils import category_all_files, my_rate_minutes, my_rate_seconds, perform_search, rateblock, statistics from lib.cuckoo.core.database import TASK_PENDING, Database, Task from modules.reporting.report_doc import CHUNK_CALL_SIZE @@ -1826,6 +1826,10 @@ def file(request, category, task_id, dlfile): # Self Extracted support folder if not path_exists(path): path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id), "selfextracted", file_name) + + if not path_exists(path) and len(file_name) == 64: + path = get_files_storage_path(file_name) + elif category in ("droppedzipall", "procdumpzipall", "CAPEzipall"): if web_cfg.zipped_download.download_all: sub_cat = category.replace("zipall", "") @@ -1842,6 +1846,10 @@ def file(request, category, task_id, dlfile): path = buf if not path_exists(path): path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id), "selfextracted", file_name) + + if not path_exists(path) and len(file_name) == 64: + path = get_files_storage_path(file_name) + elif category == "networkzip": buf = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "network", file_name) path = buf