Skip to content
Open
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 156 additions & 91 deletions silnlp/common/clean_projects.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import argparse
import concurrent.futures
import logging
import shutil
import sys
Expand All @@ -11,11 +12,12 @@
from tqdm import tqdm

# --- Global Constants ---
PROJECTS_FOLDER_DEFAULT = "M:/Paratext/projects"
PROJECTS_FOLDER_DEFAULT = "M:/Paratext/projects"
logger = logging.getLogger(__name__)
SETTINGS_FILENAME = "Settings.xml"

# --- Configuration for Deletion/Keep Rules ---
# These are matched with lower cased versions of the filename, they must be listed in lower case here.

FILES_TO_DELETE_BY_NAME_CI = {
"allclustercorrections.txt",
Expand Down Expand Up @@ -75,8 +77,7 @@
"bookNames.xml",
"canons.xml",
"lexicon.xml",
"TermRenderings.xml",

"termrenderings.xml",
}

EXTENSIONS_TO_KEEP_CI = {
Expand All @@ -93,9 +94,7 @@


def has_settings_file(project_folder: Path) -> bool:
return (project_folder / SETTINGS_FILENAME).is_file() or (
project_folder / SETTINGS_FILENAME.lower()
).is_file()
return (project_folder / SETTINGS_FILENAME).is_file() or (project_folder / SETTINGS_FILENAME.lower()).is_file()


class ProjectCleaner:
Expand All @@ -109,29 +108,24 @@ def __init__(self, project_path: Path, args):
self.files_to_delete = set()
self.folders_to_delete = set()
self.parsing_errors = []
self.log_buffer: list[str] = [] # Buffer to store log messages for this project
self.log_prefix = f"[{self.project_path.name}] "

def _log_info(self, message: str):
full_message = f"{self.log_prefix}{message}"
logger.info(full_message)
if self.args.verbose > 0:
print(full_message)
self.log_buffer.append(full_message)

def _log_action(self, action: str, item_path: Path):
full_message = (
f"{self.log_prefix}{action}: {item_path.relative_to(self.project_path)}"
)
logger.info(full_message)
if self.args.verbose > 0:
print(full_message)
full_message = f"{self.log_prefix}{action}: {item_path.relative_to(self.project_path)}"
self.log_buffer.append(full_message)

def _parse_settings(self):
settings_file_path = self.project_path / SETTINGS_FILENAME
if not settings_file_path.exists():
settings_file_path = self.project_path / SETTINGS_FILENAME.lower()
if not settings_file_path.exists():
warning_msg = f"Warning: {SETTINGS_FILENAME} not found."
if self.args.verbose:
if self.args.verbose > 0: # Condition to buffer this warning
self._log_info(warning_msg)
self.parsing_errors.append(f"{SETTINGS_FILENAME} not found.")
return
Expand All @@ -141,35 +135,36 @@ def _parse_settings(self):
project_settings = parser.parse()
self.project_settings = project_settings

full_suffix = project_settings.file_name_suffix.upper()
self.scripture_file_extension = Path(full_suffix).suffix
if not self.scripture_file_extension:
self.scripture_file_extension = ""
# Log raw settings related to file naming now that self.project_settings is assigned.
self._log_info(
f"Determined scripture file extension: {self.scripture_file_extension}"
f"Settings - FileNamePrePart:'{self.project_settings.file_name_prefix}' "
f"PostPart:'{self.project_settings.file_name_suffix}' "
f"BookNameForm:'{self.project_settings.file_name_form}'"
)

if project_settings.biblical_terms_file_name:
terms_file_path = (
self.project_path / project_settings.biblical_terms_file_name
)
if terms_file_path.is_file():
self.biblical_terms_files.add(terms_file_path)
self._log_info(
f"Found BiblicalTermsListSetting file: {terms_file_path.name}"
)
else:
warning_msg = f"Warning: BiblicalTermsListSetting file not found at expected path: {terms_file_path}"
if self.args.verbose:
self._log_info(warning_msg)
self.parsing_errors.append(
f"BiblicalTermsListSetting file not found: {terms_file_path.name}"
)
except Exception as e:
error_msg = f"Error parsing {SETTINGS_FILENAME}: {e}"
if self.args.verbose:
if self.args.verbose > 0: # Condition to buffer this error message
self._log_info(error_msg)
self.parsing_errors.append(error_msg)
# Log that specific settings details could not be retrieved
self._log_info(
f"Settings - Couldn't log naming details (PrePart, PostPart, BookNameForm) due to parsing error: {e}"
)

# The following code correctly uses self.project_settings,
# which will be None if parsing failed, and thus these blocks will be skipped.

if project_settings.biblical_terms_file_name:
terms_file_path = self.project_path / project_settings.biblical_terms_file_name
if terms_file_path.is_file():
self.biblical_terms_files.add(terms_file_path)
self._log_info(f"Found BiblicalTermsListSetting file: {terms_file_path.name}")
else:
warning_msg = f"Warning: BiblicalTermsListSetting file not found at expected path: {terms_file_path}"
if self.args.verbose > 0: # Condition to buffer this warning
self._log_info(warning_msg)
self.parsing_errors.append(f"BiblicalTermsListSetting file not found: {terms_file_path.name}")

def analyze_project_contents(self):
self._parse_settings()
Expand All @@ -195,32 +190,22 @@ def analyze_project_contents(self):

# Scripture files are identified using ParatextProjectSettings.get_book_id()
if self.project_settings:
for (
item
) in (
self.project_path.iterdir()
): # Scripture files are typically at the project root
for item in self.project_path.iterdir(): # Scripture files are typically at the project root
if item.is_file():
book_id = self.project_settings.get_book_id(item.name)
if book_id is not None:
self.files_to_keep.add(item)
if self.args.verbose > 1:
self._log_info(
f"Kept scripture file (via get_book_id): {item.name}"
)
self._log_info(f"Kept scripture file (via get_book_id): {item.name}")
elif self.args.verbose > 0:
self._log_info(
"Project settings not available; cannot use get_book_id for scripture identification."
)
self._log_info("Project settings not available; cannot use get_book_id for scripture identification.")

for item in all_items_in_project:
if item.is_file() and item.suffix.lower() in EXTENSIONS_TO_KEEP_CI:
self.files_to_keep.add(item)

if self.args.verbose > 1:
self._log_info(
f"Identified {len(self.files_to_keep)} files to keep initially."
)
self._log_info(f"Identified {len(self.files_to_keep)} files to keep initially.")

# --- Pass 2: Identify files to DELETE ---
for item_path in all_items_in_project:
Expand All @@ -235,17 +220,12 @@ def analyze_project_contents(self):
if item_name_lower in FILES_TO_DELETE_BY_NAME_CI:
delete_file = True
reason = "specific name"
elif any(
item_path.match(pattern) for pattern in FILES_TO_DELETE_BY_PATTERN
):
elif any(item_path.match(pattern) for pattern in FILES_TO_DELETE_BY_PATTERN):
delete_file = True
reason = "pattern match"
elif any(
sub_str in item_name_lower
for sub_str in FILENAME_SUBSTRINGS_TO_DELETE_CI
):
elif any(sub_str in item_name_lower for sub_str in FILENAME_SUBSTRINGS_TO_DELETE_CI):
delete_file = True
reason = "substring match"
reason = "substring match"
elif item_suffix_lower in EXTENSIONS_TO_DELETE_CI:
delete_file = True
reason = f"extension ({item_suffix_lower})"
Expand All @@ -268,9 +248,7 @@ def analyze_project_contents(self):
if delete_file:
self.files_to_delete.add(item_path)
if self.args.verbose > 1:
self._log_info(
f"Marked for deletion ({reason}): {item_path.relative_to(self.project_path)}"
)
self._log_info(f"Marked for deletion ({reason}): {item_path.relative_to(self.project_path)}")

# --- Pass 3: Identify folders to DELETE ---
for item in self.project_path.iterdir():
Expand Down Expand Up @@ -320,6 +298,20 @@ def execute_cleanup(self):
self._log_info("Cleanup execution finished.")


# --- Helper for concurrent project cleaning ---
def process_single_project_for_cleaning(
project_path: Path, current_args: argparse.Namespace
) -> tuple[str, list[str], list[str]]:
"""
Creates a ProjectCleaner instance, analyzes, and cleans a single project.
Returns the project name, a list of log messages, and a list of parsing errors.
"""
cleaner = ProjectCleaner(project_path, current_args)
cleaner.analyze_project_contents()
cleaner.execute_cleanup()
return project_path.name, cleaner.log_buffer, cleaner.parsing_errors


# --- Main Function ---
def main():
parser = argparse.ArgumentParser(
Expand All @@ -344,9 +336,7 @@ def main():
default=0,
help="Increase output verbosity. -v for project-level info, -vv for file-level decisions.",
)
parser.add_argument(
"--log-file", help="Path to a file to log actions and verbose information."
)
parser.add_argument("--log-file", help="Path to a file to log actions and verbose information.")
args = parser.parse_args()

# --- Configure Logging ---
Expand All @@ -368,44 +358,73 @@ def main():
file_handler.setLevel(logging.INFO)
logger.addHandler(file_handler)

if args.verbose > 0:
print(f"Starting cleanup process for projects in: {args.projects_root}")
if args.dry_run:
print("DRY RUN mode enabled.")
print(f"Starting cleanup process for projects in: {args.projects_root}")
if args.dry_run:
print("DRY RUN mode enabled.")
logger.info(
f"Starting cleanup process. Projects root: {args.projects_root}. Dry run: {args.dry_run}. Verbose: {args.verbose}."
f"Starting cleanup process for: {args.projects_root}. Dry run: {args.dry_run}. Verbose: {args.verbose}."
)

projects_root_path = Path(args.projects_root)
if not projects_root_path.is_dir():
print(f"Error: Projects root folder not found: {args.projects_root}")
sys.exit(1)

all_folders = [folder for folder in projects_root_path.iterdir() if folder.is_dir()]
found_total_msg = f"Found {len(all_folders)} folders in {args.projects_root}."
# Initial scan for all items to determine directories
initial_items = list(projects_root_path.glob("*"))
all_folders = []
if args.verbose > 0:
print(f"Scanning {len(initial_items)} items in {args.projects_root} to find directories...")

for item in tqdm(initial_items, desc=f"Scanning {args.projects_root}", unit="item", disable=args.verbose > 0):
if item.is_dir():
all_folders.append(item)

test = False
max_workers = 10

found_total_msg = f"Found {len(all_folders)} total directories in {args.projects_root}."
logger.info(found_total_msg)
if args.verbose > 0:
print(found_total_msg)

project_folders = []
non_project_folders = []
for folder in tqdm(
all_folders, desc="Scanning folders", unit="folder", disable=args.verbose > 0
):
if has_settings_file(folder):
project_folders.append(folder)
else:
non_project_folders.append(folder)

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:

# Submit tasks for each folder
future_to_folder = {executor.submit(has_settings_file, folder): folder for folder in all_folders}

# Iterate over completed tasks using tqdm, add mininterval for smoother updates
# if individual has_settings_file calls are very fast.
for future in tqdm(
concurrent.futures.as_completed(future_to_folder),
total=len(all_folders),
desc="Identifying project folders",
unit="folder",
disable=args.verbose > 0,
):
folder = future_to_folder[future]
try:
is_project = future.result()
if is_project:
project_folders.append(folder)
else:
non_project_folders.append(folder)
except Exception as exc:
logger.error(f"Error checking folder {folder}: {exc}")
if args.verbose > 0:
print(f"Error checking folder {folder}: {exc}")
non_project_folders.append(folder)

found_msg = f"Found {len(project_folders)} project folders."
logger.info(found_msg)
if args.verbose > 0:
print(found_msg)

if non_project_folders:
non_project_msg = (
f"Found {len(non_project_folders)} non-project folders (will be ignored):"
)
non_project_msg = f"Found {len(non_project_folders)} non-project folders (will be ignored):"
logger.info(non_project_msg)
if args.verbose > 0:
print(non_project_msg)
Expand All @@ -422,14 +441,60 @@ def main():
print(no_projects_msg)
return

for project_path in tqdm(project_folders, desc="Cleaning projects", unit="project"):
cleaner = ProjectCleaner(project_path, args)
cleaner.analyze_project_contents()
cleaner.execute_cleanup()
if args.verbose > 0:
print(f"--- Finished processing project: {project_path.name} ---")
elif args.verbose == 0:
logger.info(f"Finished processing project: {project_path.name}")
processed_project_data: list[tuple[str, list[str], list[str], Path]] = []

# Concurrently process each project folder for cleaning
# Re-use max_workers from the previous section, or define a new one if desired.
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Store future to project_path to retrieve the original Path object for robust error messages
future_to_project_path_map = {
executor.submit(process_single_project_for_cleaning, project_path, args): project_path
for project_path in project_folders
}

for future in tqdm(
concurrent.futures.as_completed(future_to_project_path_map),
total=len(project_folders),
desc="Cleaning projects",
unit="project",
disable=args.verbose > 0, # tqdm is disabled if verbose output is on
mininterval=0.01, # More frequent updates, similar to the folder identification step
):
processed_project_path = future_to_project_path_map[future]
try:
project_name, project_logs, project_errors = future.result()
processed_project_data.append((project_name, project_logs, project_errors, processed_project_path))
except Exception as exc:
# Log critical errors during processing immediately, as they might prevent log collection
crit_error_msg = f"Critical error during processing of project {processed_project_path.name}: {exc}"
logger.error(crit_error_msg)
if args.verbose > 0:
print(crit_error_msg)
# Store a placeholder for sorted output
processed_project_data.append(
(processed_project_path.name, [], [f"Critical error: {exc}"], processed_project_path)
)

# Sort all collected data by project name
processed_project_data.sort(key=lambda x: x[0])

# Log the collected and sorted data
for project_name, project_logs, project_parsing_errors, _project_path in processed_project_data:
# Log messages collected by the cleaner
for log_msg_from_buffer in project_logs:
logger.info(log_msg_from_buffer) # Already formatted with [ProjectName] prefix by ProjectCleaner
if args.verbose > 0: # Print to console if verbose
print(log_msg_from_buffer)

# Log parsing errors, ensuring they are associated with the project
if project_parsing_errors:
for err_str in project_parsing_errors:
error_log_message = f"[{project_name}] Config Error: {err_str}"
logger.warning(error_log_message) # Use warning for parsing/config errors
if args.verbose > 0:
print(error_log_message)

logger.info(f"[{project_name}] Processing completed.") # Log overall completion for this project

final_msg = "\nCleanup process completed."
logger.info(final_msg)
Expand Down