Skip to content
2 changes: 2 additions & 0 deletions conf/default/cuckoo.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,5 @@ analysis = 0
mongo = no
# Clean orphan files in mongodb
unused_files_in_mongodb = no
# Deduplicated files
files = no
81 changes: 78 additions & 3 deletions lib/cuckoo/common/cleaners_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ def free_space_monitor(path=False, return_value=False, processing=False, analysi
cleanup_dict["delete_older_than"] = config.cleaner.analysis
if config.cleaner.unused_files_in_mongodb:
cleanup_dict["delete_unused_file_data_in_mongo"] = 1
if config.cleaner.get("files"):
cleanup_dict["delete_files_items_older_than"] = config.cleaner.get("files")

need_space, space_available = False, 0
# Calculate the free disk space in megabytes.
Expand Down Expand Up @@ -349,7 +351,6 @@ def cuckoo_clean_failed_tasks():
# This need to init a console logger handler, because the standard
# logger (init_logging()) logs to a file which will be deleted.
create_structure()

# ToDo multi status
tasks_list = db.list_tasks(status=f"{TASK_FAILED_ANALYSIS}|{TASK_FAILED_PROCESSING}|{TASK_FAILED_REPORTING}|{TASK_RECOVERED}")
# ToDo rewrite for bulk delete
Expand Down Expand Up @@ -683,6 +684,77 @@ def cape_clean_tlp():
delete_bulk_tasks_n_folders(tlp_tasks, False)


def files_clean_before(timerange: str):
"""
Clean up files in storage/files that are not referenced by any analysis
and are older than the specified time range.
"""
older_than = convert_into_time(timerange)
files_folder = os.path.join(CUCKOO_ROOT, "storage", "files")
analyses_folder = os.path.join(CUCKOO_ROOT, "storage", "analyses")

if not path_exists(files_folder):
return

# 1. Build set of referenced hashes
referenced = set()
used_mongo = False

if is_reporting_db_connected() and repconf.mongodb.enabled and "mongo_find" in globals():
try:
# Query all _id (SHA256) from files collection
cursor = mongo_find("files", {}, {"_id": 1})
for doc in cursor:
referenced.add(doc["_id"])
used_mongo = True
log.info("Loaded %d referenced files from MongoDB", len(referenced))
except Exception as e:
log.error("Failed to query MongoDB for files: %s. Falling back to filesystem scan.", e)

if not used_mongo and path_exists(analyses_folder):
log.info("Scanning analysis folders for file references...")
with os.scandir(analyses_folder) as it:
for entry in it:
if not entry.is_dir():
continue
for subdir in ("selfextracted", "files", "CAPE", "procdump"):
check_dir = os.path.join(entry.path, subdir)
if path_exists(check_dir):
with os.scandir(check_dir) as se_it:
for se_entry in se_it:
if se_entry.is_symlink():
try:
target = os.readlink(se_entry.path)
# Check if it points to storage/files
if os.path.abspath(target).startswith(os.path.abspath(files_folder)):
referenced.add(os.path.basename(target))
except OSError:
pass

# 2. Iterate storage/files and clean
for root, _, filenames in os.walk(files_folder, topdown=False):
for sha256 in filenames:
if sha256 in referenced:
continue

file_path = os.path.join(root, sha256)
try:
st_ctime = path_get_date(file_path)
# Correct logic: delete if OLDER than limit (<)
if datetime.fromtimestamp(st_ctime) < older_than:
path_delete(file_path)
except Exception as e:
log.warning("Error checking/deleting file %s: %s", file_path, e)

# Try to remove empty directories (except the root files_folder)
if root != files_folder:
try:
os.rmdir(root)
except OSError:
# Directory not empty or other error
pass


def binaries_clean_before(timerange: str):
# In case if "delete_bin_copy = off" we might need to clean binaries
# find storage/binaries/ -name "*" -type f -mtime 5 -delete
Expand Down Expand Up @@ -783,11 +855,14 @@ def execute_cleanup(args: dict, init_log=True):
if args.get("delete_tmp_items_older_than"):
tmp_clean_before(args["delete_tmp_items_older_than"])

if args.get("delete_unused_file_data_in_mongo"):
delete_unused_file_data_in_mongo()

if args.get("delete_binaries_items_older_than"):
binaries_clean_before(args["delete_binaries_items_older_than"])

if args.get("delete_unused_file_data_in_mongo"):
delete_unused_file_data_in_mongo()
if args.get("delete_files_items_older_than"):
files_clean_before(args["delete_files_items_older_than"])

if args.get("cleanup_files_collection_by_id"):
cleanup_files_collection_by_id(args["cleanup_files_collection_by_id"])
Expand Down
51 changes: 35 additions & 16 deletions lib/cuckoo/common/integrations/file_extra_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
path_read_file,
path_write_file,
)
from lib.cuckoo.common.utils import get_options, is_text_file
from lib.cuckoo.common.utils import get_files_storage_path, get_options, is_text_file

try:
from sflock import unpack
Expand Down Expand Up @@ -385,22 +385,41 @@ def _extracted_files_metadata(
file_info["path"] = dest_path
file_info["guest_paths"] = [file_info["name"]]
file_info["name"] = os.path.basename(dest_path)
# Define the new central storage for all files (extracted, dropped, etc.)
master_file_path = get_files_storage_path(file_info["sha256"])
files_storage_dir = os.path.dirname(master_file_path)

# 1. Ensure file is in central storage
if not path_exists(master_file_path):
path_mkdir(files_storage_dir, exist_ok=True)
shutil.move(full_path, master_file_path)
elif path_exists(full_path):
# We already have it, delete the temp duplicate
path_delete(full_path)

# 2. Create symlink in analysis folder (or copy if link fails)
if not path_exists(dest_path):
shutil.move(full_path, dest_path)
print(
json.dumps(
{
"path": os.path.join("files", file_info["sha256"]),
"filepath": file_info["name"],
"pids": [],
"ppids": [],
"metadata": "",
"category": "files",
},
ensure_ascii=False,
),
file=f,
)
try:
os.symlink(master_file_path, dest_path)
except OSError:
# Fallback to copy on error
shutil.copy(master_file_path, dest_path)

# Update files.json for UI/Reporting to correctly reference the symlinked file
print(
json.dumps(
{
"path": file_info["sha256"], # Store just the SHA256
"filepath": file_info["name"],
"pids": [],
"ppids": [],
"metadata": "",
"category": "selfextracted",
},
ensure_ascii=False,
),
file=f,
)
file_info["data"] = is_text_file(file_info, destination_folder, processing_conf.CAPE.buffer)
metadata.append(file_info)

Expand Down
12 changes: 12 additions & 0 deletions lib/cuckoo/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,18 @@ def get_memdump_path(memdump_id, analysis_folder=False):
)


def get_files_storage_path(sha256: str) -> str:
"""
Get the path to the storage/files directory for a given SHA256.
Uses sharding (e.g., storage/files/ab/cd/abcdef...) to avoid
too many files in a single directory.
"""
if not sha256 or len(sha256) < 4:
return os.path.join(CUCKOO_ROOT, "storage", "files", sha256)

return os.path.join(CUCKOO_ROOT, "storage", "files", sha256[:2], sha256[2:4], sha256)


def validate_referrer(url):
if not url:
return None
Expand Down
20 changes: 18 additions & 2 deletions lib/cuckoo/common/web_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from lib.cuckoo.common.path_utils import path_exists, path_mkdir, path_write_file
from lib.cuckoo.common.utils import (
generate_fake_name,
get_files_storage_path,
get_ip_address,
get_options,
get_user_filename,
Expand Down Expand Up @@ -1118,10 +1119,25 @@ def category_all_files(task_id: str, category: str, base_path: str):
# analysis = es.search(index=get_analysis_index(), query=get_query_by_info_id(task_id))["hits"]["hits"][0]["_source"]

if analysis:
files = []
if query_category == "CAPE":
return [os.path.join(base_path, block["sha256"]) for block in analysis.get(query_category, {}).get("payloads", [])]
for block in analysis.get(query_category, {}).get("payloads", []):
# Path in files.json now stores only the SHA256, not a relative path
sha256 = block.get("path") or block.get("sha256")
if sha256:
p = get_files_storage_path(sha256)
if path_exists(p):
files.append(p)
else:
return [os.path.join(base_path, block["sha256"]) for block in analysis.get(category, [])]
for block in analysis.get(category, []):
# Path in files.json now stores only the SHA256, not a relative path
sha256 = block.get("path") or block.get("sha256")
if sha256:
p = get_files_storage_path(sha256)
if path_exists(p):
files.append(p)

return files


def validate_task(tid, status=TASK_REPORTED):
Expand Down
27 changes: 26 additions & 1 deletion modules/processing/CAPE.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import logging
import os
import shutil
import timeit
from contextlib import suppress
from pathlib import Path
Expand All @@ -25,12 +26,13 @@
from lib.cuckoo.common.config import Config
from lib.cuckoo.common.integrations.file_extra_info import DuplicatesType, static_file_info
from lib.cuckoo.common.objects import File
from lib.cuckoo.common.path_utils import path_exists
from lib.cuckoo.common.path_utils import path_exists, path_mkdir
from lib.cuckoo.common.replace_patterns_utils import _clean_path
from lib.cuckoo.common.utils import (
add_family_detection,
convert_to_printable_and_truncate,
get_clamav_consensus,
get_files_storage_path,
make_bytes,
texttypes,
wide2str,
Expand Down Expand Up @@ -175,6 +177,29 @@ def process_file(self, file_path, append_file, metadata: dict, *, category: str,
f = File(file_path, metadata.get("metadata", ""))
sha256 = f.get_sha256()

# Deduplicate dropped, procdump, CAPE, and package files to storage/files
if category in ("dropped", "procdump", "CAPE", "package", "procmemory") and not os.path.islink(file_path):
try:
master_path = get_files_storage_path(sha256)
files_storage_dir = os.path.dirname(master_path)

if not path_exists(master_path):
path_mkdir(files_storage_dir, exist_ok=True)
# Move file
shutil.move(file_path, master_path)
else:
# Already exists, delete duplicate
os.remove(file_path)

# Link back
try:
os.symlink(master_path, file_path)
except (OSError, AttributeError):
shutil.copy(master_path, file_path)

except Exception as e:
log.error("Deduplication failed for %s: %s", file_path, e)

if sha256 in duplicated["sha256"]:
log.debug("Skipping file that has already been processed: %s", sha256)
return
Expand Down
62 changes: 61 additions & 1 deletion utils/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from lib.cuckoo.common.dist_db import ExitNodes, Machine, Node, Task, create_session
from lib.cuckoo.common.path_utils import path_delete, path_exists, path_get_size, path_mkdir, path_mount_point, path_write_file
from lib.cuckoo.common.socket_utils import send_socket_command
from lib.cuckoo.common.utils import get_options
from lib.cuckoo.common.utils import get_files_storage_path, get_options
from lib.cuckoo.core.database import (
TASK_BANNED,
TASK_DISTRIBUTED,
Expand Down Expand Up @@ -304,6 +304,64 @@ def node_get_report_nfs(task_id, worker_name, main_task_id) -> bool:
return True


def sync_sharded_files_nfs(worker_name, main_task_id):
"""
Synchronize deduplicated files from worker to master using sharded storage.
"""
analysis_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(main_task_id))
files_json_path = os.path.join(analysis_path, "files.json")

if not path_exists(files_json_path):
return

try:
with open(files_json_path, "r") as f:
for line in f:
try:
entry = json.loads(line)
rel_path = entry.get("path")
if not rel_path or "selfextracted" not in rel_path:
continue

# Extract SHA256 from path (e.g. selfextracted/SHA256)
sha256 = os.path.basename(rel_path)
if len(sha256) != 64:
continue

# Master destination (sharded)
master_dest = get_files_storage_path(sha256)

# If missing on master, fetch from worker
if not path_exists(master_dest):
worker_mount = os.path.join(CUCKOO_ROOT, dist_conf.NFS.mount_folder, str(worker_name))
# Construct worker source path (sharded) manually relative to mount
shard_rel = os.path.join("storage", "files", sha256[:2], sha256[2:4], sha256)
worker_src = os.path.join(worker_mount, shard_rel)

if path_exists(worker_src):
path_mkdir(os.path.dirname(master_dest), exist_ok=True)
shutil.copy2(worker_src, master_dest)

# Ensure symlink in analysis folder is correct
link_path = os.path.join(analysis_path, rel_path)

# If it's a broken link or doesn't exist or is a full file (we want link)
if path_exists(master_dest):
if os.path.islink(link_path):
# Check if it points to the right place?
# For now, simpler to re-link if we want to enforce local storage path
os.remove(link_path)
elif path_exists(link_path):
# It's a file, replace with link to save space
path_delete(link_path)
path_mkdir(os.path.dirname(link_path), exist_ok=True)
os.symlink(master_dest, link_path)
except (json.JSONDecodeError, OSError) as e:
log.error("Error syncing file for task %s: %s", main_task_id, e)
except Exception as e:
log.exception("Failed to sync sharded files for task %s: %s", main_task_id, e)


def _delete_many(node, ids, nodes, db):
"""
Deletes multiple tasks from a specified node if the node is not the main server.
Expand Down Expand Up @@ -955,6 +1013,8 @@ def fetch_latest_reports_nfs(self):
t.main_task_id,
)

sync_sharded_files_nfs(node.name, t.main_task_id)

# this doesn't exist for some reason
if path_exists(t.path):
sample_sha256 = None
Expand Down
Loading
Loading