Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions admin/test/scripts/make_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,17 @@ def get_last_commit_info(file_path):
try:
# Get the last commit hash
git_log = subprocess.check_output(['git', 'log', '-n', '1', '--pretty=format:%H|%an|%ae|%ad|%s', '--', file_path],
universal_newlines=True)
commit_hash, author_name, author_email, commit_date, commit_message = git_log.strip().split('|')
universal_newlines=True).strip()
if not git_log:
# File is not yet in git history
return {
'hash': 'Uncommitted',
'author_name': 'N/A',
'author_email': 'N/A',
'date': datetime.now().isoformat(),
'message': 'File not yet committed to git'
}
commit_hash, author_name, author_email, commit_date, commit_message = git_log.split('|')

# Convert the commit date to ISO format
commit_date = datetime.strptime(commit_date, '%a %b %d %H:%M:%S %Y %z').isoformat()
Expand Down
143 changes: 140 additions & 3 deletions admin/test/scripts/test_module.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
"""
This script provides a testing framework for iLEAPP artifact modules.
It allows for running individual artifacts or entire modules against pre-defined
test cases (stored as .zip files containing forensic evidence) and comparing
the output against expected results.

Design Goals:
- Standalone Execution: Can be run from the command line for manual testing.
- IDE Integration: Designed to work as a task within the VS Code IDE for a
streamlined developer experience.
- CI/CD Ready: Structured to be eventually integrated into continuous integration
pipelines for automated regression testing.
"""

import sys
import os
import zipfile
Expand All @@ -20,14 +34,40 @@


def mock_logdevinfo(message):
"""
Mocks the logdevinfo function to print to the console.

Args:
message (str): The message to log.
"""
print(f"[LOGDEVINFO] {message}")


def mock_logfunc(message):
"""
Mocks the logfunc function to print to the console.

Args:
message (str): The message to log.
"""
print(f"[LOGFUNC] {message}")


def process_artifact(zip_path, module_name, artifact_name, artifact_data, target_os_version=None):
"""
Processes a specific artifact from a given zip file.

Args:
zip_path (Path): Path to the zip file containing test data.
module_name (str): Name of the artifact module.
artifact_name (str): Name of the artifact function to test.
artifact_data (dict): Metadata about the artifact from the test case.
target_os_version (str, optional): OS version to mock for the test.

Returns:
tuple: (data_headers, data_list, run_time, last_commit_info,
check_in_media_call_count, check_in_media_embedded_call_count)
"""
module = importlib.import_module(f'scripts.artifacts.{module_name}')

# Get the function to test
Expand Down Expand Up @@ -201,22 +241,60 @@ def mocked_lava_get_full_media_info(media_ref_id):


def calculate_data_size(data_list):
"""
Calculates the total size of data in a list of rows.

Args:
data_list (list): A list of rows, where each row is a list or tuple of items.

Returns:
int: Total size in bytes when encoded as UTF-8.
"""
return sum(len(str(item).encode('utf-8')) for row in data_list for item in row)


def load_test_cases(module_name):
"""
Loads test cases for a module from its JSON metadata file.

Args:
module_name (str): Name of the module.

Returns:
dict: Test cases data.
"""
cases_file = Path(f'admin/test/cases/testdata.{module_name}.json')
with open(cases_file, 'r', encoding='utf-8') as f:
return json.load(f)


def get_artifact_names(module_name, test_cases):
"""
Retrieves all artifact names defined in the test cases for a module.

Args:
module_name (str): Name of the module.
test_cases (dict): Test cases data.

Returns:
list: List of artifact names.
"""
artifact_names = set()
for case in test_cases.values():
artifact_names.update(case['artifacts'].keys())
return list(artifact_names)


def select_case(test_cases):
"""
Prompts the user to select a test case from the available ones.

Args:
test_cases (dict): Test cases data.

Returns:
str: The selected case name, or 'all' for all cases.
"""
sorted_cases = sorted(test_cases.keys())
valid_cases = []
invalid_cases = []
Expand Down Expand Up @@ -269,7 +347,18 @@ def select_case(test_cases):
print("\nExiting...")
sys.exit(0)


def select_artifact(artifact_names, test_cases):
"""
Prompts the user to select an artifact to test.

Args:
artifact_names (list): List of all artifact names in the module.
test_cases (dict): Test cases data.

Returns:
str: The selected artifact name, or 'all' for all artifacts.
"""
artifacts_with_data = []
artifacts_without_data = []
sorted_artifacts = sorted(artifact_names)
Expand Down Expand Up @@ -319,12 +408,33 @@ def select_artifact(artifact_names, test_cases):
print("\nExiting...")
sys.exit(0)


def convert_to_unix_time(value):
"""
Converts a datetime or date object to a Unix timestamp.

Args:
value: The object to convert.

Returns:
int or object: Unix timestamp if input was datetime/date, otherwise original value.
"""
if isinstance(value, (datetime, date)):
return int(value.timestamp())
return value


def process_data(headers, data):
"""
Processes artifact output data for comparison, handling datetime conversions.

Args:
headers (list): Artifact output headers.
data (list): Artifact output data rows.

Returns:
tuple: (processed_headers, processed_data)
"""
datetime_indices = [i for i, header in enumerate(headers) if isinstance(header, tuple)
and header[1].lower() in ['datetime', 'date']]

Expand All @@ -344,7 +454,16 @@ def process_data(headers, data):

return processed_headers, processed_data


def main(module_name, artifact_name=None, case_number=None):
"""
Main entry point for testing module artifacts.

Args:
module_name (str): Name of the module to test.
artifact_name (str, optional): Name of the artifact to test.
case_number (str, optional): Case number to test.
"""
try:
test_cases = load_test_cases(module_name)
artifact_names = get_artifact_names(module_name, test_cases)
Expand Down Expand Up @@ -445,14 +564,32 @@ def main(module_name, artifact_name=None, case_number=None):
print("\nExiting...")
sys.exit(0)


def get_last_commit_info(file_path):
"""
Retrieves the last git commit information for a given file.

Args:
file_path (str): Path to the file.

Returns:
dict: Dictionary containing commit hash, author, date, and message.
"""
try:
# Get the last commit hash
git_log = subprocess.check_output(
['git', 'log', '-n', '1', '--pretty=format:%H|%an|%ae|%ad|%s', '--', file_path],
universal_newlines=True)
commit_hash, author_name, author_email, commit_date, commit_message = \
git_log.strip().split('|')
universal_newlines=True).strip()
if not git_log:
# File is not yet in git history
return {
'hash': 'Uncommitted',
'author_name': 'N/A',
'author_email': 'N/A',
'date': datetime.now().isoformat(),
'message': 'File not yet committed to git'
}
commit_hash, author_name, author_email, commit_date, commit_message = git_log.split('|')

# Convert the commit date to ISO format
commit_date = datetime.strptime(
Expand Down
Loading