From a5044cf6409b38ca836380b5d0c7c7cb0f0e4c88 Mon Sep 17 00:00:00 2001 From: Cheng Penghui Date: Fri, 17 Oct 2025 16:39:50 +0800 Subject: [PATCH 1/2] Split markers augmentation to lower and upper limb --- Pose2Sim/kinematics.py | 408 +++++++++++++++++++++++++++++++-- Pose2Sim/markerAugmentation.py | 133 ++++++++++- 2 files changed, 514 insertions(+), 27 deletions(-) diff --git a/Pose2Sim/kinematics.py b/Pose2Sim/kinematics.py index 47b5d997..779e21df 100644 --- a/Pose2Sim/kinematics.py +++ b/Pose2Sim/kinematics.py @@ -35,6 +35,7 @@ ## INIT import os import sys +import copy from pathlib import Path import numpy as np np.set_printoptions(legacy='1.21') # otherwise prints np.float64(3.0) rather than 3.0 @@ -294,6 +295,26 @@ def dict_segment_ratio(scaling_root, unscaled_model, Q_coords_scaling, markers, # segment_pairs = get_kpt_pairs_from_tree(eval(pose_model)) segment_pairs = get_kpt_pairs_from_scaling(scaling_root) + + # Filter out segment pairs where markers are missing in TRC + # This is crucial for partial augmentation (e.g., upper-only or lower-only) + valid_segment_pairs = [] + skipped_pairs = [] + for (pt1, pt2) in segment_pairs: + if pt1 in markers and pt2 in markers: + valid_segment_pairs.append((pt1, pt2)) + else: + skipped_pairs.append((pt1, pt2)) + + if skipped_pairs: + logging.info(f"Skipped {len(skipped_pairs)} segment pairs due to missing markers in TRC:") + logging.info(f" First few: {skipped_pairs[:3]}{'...' if len(skipped_pairs) > 3 else ''}") + + if not valid_segment_pairs: + raise ValueError("No valid segment pairs found in TRC file. Cannot perform scaling.") + + # Use only valid segment pairs for scaling + segment_pairs = valid_segment_pairs # Get median segment lengths from Q_coords_scaling. Trimmed mean works better than mean or median trc_segment_lengths = np.array([euclidean_distance(Q_coords_scaling.iloc[:,markers.index(pt1)*3:markers.index(pt1)*3+3], @@ -304,19 +325,58 @@ def dict_segment_ratio(scaling_root, unscaled_model, Q_coords_scaling, markers, trc_segment_lengths = np.array([trimmed_mean(arr, trimmed_extrema_percent=trimmed_extrema_percent) for arr in trc_segment_lengths]) # Get model segment lengths - model_markers = [marker for marker in markers if marker in [m.getName() for m in unscaled_model.getMarkerSet()]] - model_markers_locs = [unscaled_model.getMarkerSet().get(marker).getLocationInGround(unscaled_model.getWorkingState()).to_numpy() for marker in model_markers] - model_segment_lengths = np.array([euclidean_distance(model_markers_locs[model_markers.index(pt1)], - model_markers_locs[model_markers.index(pt2)]) - for (pt1,pt2) in segment_pairs]) + # Build a mapping of marker name to model location + model_marker_locs_dict = {} + for marker_name in markers: + # Check if this marker exists in the model + try: + marker_loc = unscaled_model.getMarkerSet().get(marker_name).getLocationInGround(unscaled_model.getWorkingState()).to_numpy() + model_marker_locs_dict[marker_name] = marker_loc + except: + # If marker not found in model, skip it (e.g., virtual markers that don't exist in HALPE_26 model) + logging.debug(f"Marker {marker_name} not found in model - will skip segments using this marker") + continue + + # Calculate model segment lengths using the available markers + model_segment_lengths = [] + valid_pairs_for_model = [] + for (pt1, pt2) in segment_pairs: + if pt1 in model_marker_locs_dict and pt2 in model_marker_locs_dict: + length = euclidean_distance(model_marker_locs_dict[pt1], model_marker_locs_dict[pt2]) + model_segment_lengths.append(length) + valid_pairs_for_model.append((pt1, pt2)) + else: + logging.debug(f"Skipping segment pair ({pt1}, {pt2}) - one or both markers not found in model") + + model_segment_lengths = np.array(model_segment_lengths) + + # Update segment_pairs to only include valid pairs + if len(valid_pairs_for_model) < len(segment_pairs): + logging.info(f"Reduced segment pairs from {len(segment_pairs)} to {len(valid_pairs_for_model)} due to missing markers in model") + segment_pairs = valid_pairs_for_model + # Also need to recalculate TRC segment lengths for matching pairs + trc_segment_lengths = np.array([euclidean_distance(Q_coords_scaling.iloc[:,markers.index(pt1)*3:markers.index(pt1)*3+3], + Q_coords_scaling.iloc[:,markers.index(pt2)*3:markers.index(pt2)*3+3]) + for (pt1,pt2) in segment_pairs]) + trc_segment_lengths = np.array([trimmed_mean(arr, trimmed_extrema_percent=trimmed_extrema_percent) for arr in trc_segment_lengths]) # Calculate ratio for each segment segment_ratios = trc_segment_lengths / model_segment_lengths segment_markers_dict = dict_segment_marker_pairs(scaling_root, right_left_symmetry=right_left_symmetry) - segment_ratio_dict_temp = segment_markers_dict.copy() + + # Filter segment_markers_dict to only include valid marker pairs + # This prevents trying to calculate ratios for segments with missing markers + filtered_segment_markers_dict = {} + for key, pairs in segment_markers_dict.items(): + # Only keep pairs that are in our valid_segment_pairs + valid_pairs = [pair for pair in pairs if pair in segment_pairs] + if valid_pairs: + filtered_segment_markers_dict[key] = valid_pairs + + segment_ratio_dict_temp = filtered_segment_markers_dict.copy() segment_ratio_dict_temp.update({key: np.mean([segment_ratios[segment_pairs.index(k)] - for k in segment_markers_dict[key]]) - for key in segment_markers_dict.keys()}) + for k in filtered_segment_markers_dict[key]]) + for key in filtered_segment_markers_dict.keys()}) # Merge X, Y, Z ratios into single key segment_ratio_dict={} xyz_keys = list(set([key[:-2] for key in segment_ratio_dict_temp.keys()])) @@ -380,6 +440,166 @@ def update_scale_values(scaling_root, segment_ratio_dict): scale_set.append(new_scale) +def create_hybrid_scaling_setup(trc_file, osim_setup_dir, trc_markers_list, has_upper_virtual, has_lower_virtual): + ''' + Create a hybrid scaling setup for partial augmentation scenarios. + + Key principle: For scaling, always use ORIGINAL markers (HALPE_26) to maintain consistency. + Virtual markers will be utilized in IK stage with higher weights. + + This function only disables measurements that use markers not present in TRC. + + INPUTS: + - trc_file (Path): The path to the TRC file + - osim_setup_dir (Path): OpenSim setup directory + - trc_markers_list (list): List of markers in TRC file + - has_upper_virtual (bool): Whether upper limb virtual markers are present + - has_lower_virtual (bool): Whether lower limb virtual markers are present + + OUTPUTS: + - hybrid_scaling_path (Path): Path to the created hybrid scaling setup XML file + ''' + + # Start from HALPE_26 as base (uses ONLY original markers for scaling) + halpe_scaling_path = osim_setup_dir / 'Scaling_Setup_Pose2Sim_Halpe26.xml' + base_tree = etree.parse(halpe_scaling_path) + base_root = base_tree.getroot() + + logging.info(" Creating hybrid scaling setup based on HALPE_26 (using original markers only)...") + + # Get all Measurement elements + measurements = base_root.findall('.//Measurement') + + # Track disabled measurements + disabled_measurements = [] + + for measurement in measurements: + measurement_name = measurement.get('name') + marker_pairs = measurement.findall('.//MarkerPair') + + all_pairs_valid = True + missing_markers = [] + + for pair in marker_pairs: + markers_elem = pair.find('markers') + markers_text = markers_elem.text.strip().split() + + if len(markers_text) == 2: + pt1, pt2 = markers_text + + # Check if markers exist in TRC (should be original HALPE_26 markers) + if pt1 not in trc_markers_list: + all_pairs_valid = False + missing_markers.append(pt1) + if pt2 not in trc_markers_list: + all_pairs_valid = False + missing_markers.append(pt2) + + # Disable measurement if any marker is missing + if not all_pairs_valid: + apply_elem = measurement.find('apply') + if apply_elem is not None: + apply_elem.text = 'false' + disabled_measurements.append(f"{measurement_name} (missing: {', '.join(set(missing_markers))})") + + # Log changes + if disabled_measurements: + logging.info(f" Disabled {len(disabled_measurements)} measurements due to missing markers:") + for dm in disabled_measurements[:3]: + logging.info(f" - {dm}") + if len(disabled_measurements) > 3: + logging.info(f" ... and {len(disabled_measurements) - 3} more") + else: + logging.info(" All HALPE_26 markers present in TRC - no measurements disabled") + + # Save hybrid scaling setup + hybrid_scaling_path = trc_file.parent / f"{trc_file.stem}_hybrid_scaling_setup.xml" + etree.indent(base_tree, space='\t', level=0) + base_tree.write(str(hybrid_scaling_path), pretty_print=True, xml_declaration=True, encoding='utf-8') + + logging.info(f" Created hybrid scaling setup: {hybrid_scaling_path.name}") + logging.info(" Note: Virtual markers will be used in IK stage with higher weights") + + return hybrid_scaling_path + + +def create_hybrid_marker_set(trc_file, pose_model, osim_setup_dir): + ''' + Create a hybrid marker set that combines available LSTM virtual markers with original pose markers. + This allows partial augmentation (e.g., only upper or lower limb) to work with kinematics. + + INPUTS: + - trc_file (Path): The path to the TRC file + - pose_model (str): The pose model name + - osim_setup_dir (Path): OpenSim setup directory + + OUTPUTS: + - hybrid_markers_path (Path): Path to the created hybrid marker set XML file + ''' + + # Read markers from TRC file + _, _, _, trc_markers, _ = read_trc(trc_file) + + # Get original pose model marker set as template + # For LSTM mode, we need to use the underlying original model (HALPE_26) + original_pose_model = 'HALPE_26' if pose_model == 'LSTM' else pose_model + try: + original_markers_path = get_markers_path(original_pose_model, osim_setup_dir) + except: + logging.warning(f"Could not find marker set for {original_pose_model}, using HALPE_26") + original_markers_path = osim_setup_dir / 'Markers_Halpe26.xml' + + # Get LSTM marker set for virtual markers + lstm_markers_path = osim_setup_dir / 'Markers_LSTM.xml' + + # Parse marker sets + original_tree = etree.parse(original_markers_path) + original_root = original_tree.getroot() + + lstm_tree = etree.parse(lstm_markers_path) + lstm_root = lstm_tree.getroot() + + # Create hybrid marker set based on original template (deep copy) + hybrid_root = copy.deepcopy(original_root) + + # Get the objects container where markers are stored + hybrid_objects = hybrid_root.find('.//objects') + + # Get all LSTM virtual markers + lstm_markers_dict = {} + for marker in lstm_root.findall('.//Marker'): + marker_name = marker.get('name') + lstm_markers_dict[marker_name] = marker + + # Get existing marker names in the hybrid set (from original) + existing_marker_names = {m.get('name') for m in hybrid_objects.findall('Marker')} + + # Add LSTM virtual markers that exist in TRC but not yet in hybrid set + added_virtual_markers = [] + for trc_marker in trc_markers: + # If this is a virtual marker (_study) that exists in TRC and LSTM set, but not in hybrid set + if '_study' in trc_marker and trc_marker in lstm_markers_dict and trc_marker not in existing_marker_names: + hybrid_objects.append(copy.deepcopy(lstm_markers_dict[trc_marker])) + added_virtual_markers.append(trc_marker) + + # Create hybrid marker file path + hybrid_markers_path = trc_file.parent / f"{trc_file.stem}_hybrid_markers.xml" + + # Write hybrid marker set + hybrid_tree = etree.ElementTree(hybrid_root) + etree.indent(hybrid_tree, space='\t', level=0) + hybrid_tree.write(str(hybrid_markers_path), pretty_print=True, xml_declaration=True, encoding='utf-8') + + total_markers = len(existing_marker_names) + len(added_virtual_markers) + logging.info(f"Created hybrid marker set: {total_markers} markers total") + logging.info(f" - Original markers: {len(existing_marker_names)}") + logging.info(f" - Added virtual markers: {len(added_virtual_markers)}") + if added_virtual_markers: + logging.info(f" - Virtual markers added: {', '.join(added_virtual_markers[:5])}{'...' if len(added_virtual_markers) > 5 else ''}") + + return hybrid_markers_path + + def perform_scaling(trc_file, pose_model, kinematics_dir, osim_setup_dir, use_simple_model=False, right_left_symmetry=True, subject_height=1.75, subject_mass=70, remove_scaling_setup=True, fastest_frames_to_remove_percent=0.1,close_to_zero_speed_m=0.2, large_hip_knee_angles=45, trimmed_extrema_percent=0.5): @@ -415,8 +635,71 @@ def perform_scaling(trc_file, pose_model, kinematics_dir, osim_setup_dir, if not unscaled_model_path: raise ValueError(f"Unscaled OpenSim model not found at: {unscaled_model_path}") unscaled_model = opensim.Model(str(unscaled_model_path)) + + # Check if this is a hybrid augmentation scenario (partial LSTM markers) + # Read TRC to check available markers + _, _, _, trc_markers_list, _ = read_trc(trc_file) + + # Define standard LSTM virtual markers for lower and upper limbs + # Note: TRC files ALWAYS contain original markers + virtual markers (if augmented) + # We need to check if ALL expected virtual markers are present + lstm_lower_markers = [ + 'r.ASIS_study', 'L.ASIS_study', 'r.PSIS_study', + 'L.PSIS_study', 'r_knee_study', 'r_mknee_study', + 'r_ankle_study', 'r_mankle_study', 'r_toe_study', + 'r_5meta_study', 'r_calc_study', 'L_knee_study', + 'L_mknee_study', 'L_ankle_study', 'L_mankle_study', + 'L_toe_study', 'L_calc_study', 'L_5meta_study', + 'r_shoulder_study', 'L_shoulder_study', 'C7_study', + 'r_thigh1_study', 'r_thigh2_study', 'r_thigh3_study', + 'L_thigh1_study', 'L_thigh2_study', 'L_thigh3_study', + 'r_sh1_study', 'r_sh2_study', 'r_sh3_study', 'L_sh1_study', + 'L_sh2_study', 'L_sh3_study', 'RHJC_study', 'LHJC_study'] + + lstm_upper_markers = ["r_lelbow_study", "r_melbow_study", "r_lwrist_study", + "r_mwrist_study", "L_lelbow_study", "L_melbow_study", + "L_lwrist_study", "L_mwrist_study"] + + # Check which virtual markers actually exist in TRC + lower_markers_present = [m for m in lstm_lower_markers if m in trc_markers_list] + upper_markers_present = [m for m in lstm_upper_markers if m in trc_markers_list] + + # Calculate coverage percentage + lower_coverage = len(lower_markers_present) / len(lstm_lower_markers) if lstm_lower_markers else 0 + upper_coverage = len(upper_markers_present) / len(lstm_upper_markers) if lstm_upper_markers else 0 + + # Partial augmentation: not all virtual markers are present + # Full augmentation: both lower AND upper are complete + has_full_lower = lower_coverage >= 0.95 + has_full_upper = upper_coverage >= 0.95 + + # Full augmentation: both complete OR both empty + is_full_augmentation = (has_full_lower and has_full_upper) + + # Partial augmentation: anything else (one complete, one empty OR incomplete coverage) + is_partial_augmentation = not is_full_augmentation + + if is_partial_augmentation: + logging.info(f"Detected partial marker augmentation. Creating hybrid marker set and scaling setup...") + logging.info(f" Lower limb virtual markers: {len(lower_markers_present)}/{len(lstm_lower_markers)} ({lower_coverage*100:.1f}%)") + logging.info(f" Upper limb virtual markers: {len(upper_markers_present)}/{len(lstm_upper_markers)} ({upper_coverage*100:.1f}%)") + + # Create hybrid marker set (HALPE_26 original markers + available virtual markers) + markers_path = create_hybrid_marker_set(trc_file, pose_model, osim_setup_dir) + + # Create hybrid scaling setup that intelligently uses virtual markers where available + hybrid_scaling_path = create_hybrid_scaling_setup( + trc_file, osim_setup_dir, trc_markers_list, + has_full_upper, has_full_lower + ) + scaling_pose_model = None # Will use hybrid_scaling_path directly + else: + # Standard marker set selection (full augmentation or no augmentation) + markers_path = get_markers_path(pose_model, osim_setup_dir) + scaling_pose_model = pose_model + hybrid_scaling_path = None + # Add markers to model - markers_path = get_markers_path(pose_model, osim_setup_dir) markerset = opensim.MarkerSet(str(markers_path)) unscaled_model.set_MarkerSet(markerset) # Initialize and save model with markers @@ -425,7 +708,13 @@ def perform_scaling(trc_file, pose_model, kinematics_dir, osim_setup_dir, unscaled_model.printToXML(scaled_model_path) # Load scaling setup - scaling_path = get_scaling_setup(pose_model, osim_setup_dir) + if hybrid_scaling_path is not None: + # Use the hybrid scaling setup for partial augmentation + scaling_path = hybrid_scaling_path + else: + # Use standard scaling setup + scaling_path = get_scaling_setup(scaling_pose_model, osim_setup_dir) + scaling_tree = etree.parse(scaling_path) scaling_root = scaling_tree.getroot() scaling_path_temp = str(kinematics_dir / (trc_file.stem + '_scaling_setup.xml')) @@ -486,19 +775,106 @@ def perform_IK(trc_file, kinematics_dir, osim_setup_dir, pose_model, remove_IK_s ''' try: - # Retrieve data - ik_path = get_IK_Setup(pose_model, osim_setup_dir) - ik_path_temp = str(kinematics_dir / (trc_file.stem + '_ik_setup.xml')) + # Check if this was a partial augmentation case scaled_model_path = (kinematics_dir / (trc_file.stem + '.osim')).resolve() + + # Read TRC to check if this is partial augmentation + _, _, _, trc_markers_list, _ = read_trc(trc_file) + + # Define LSTM virtual markers + lstm_lower_markers = [ + 'r.ASIS_study', 'L.ASIS_study', 'r.PSIS_study', + 'L.PSIS_study', 'r_knee_study', 'r_mknee_study', + 'r_ankle_study', 'r_mankle_study', 'r_toe_study', + 'r_5meta_study', 'r_calc_study', 'L_knee_study', + 'L_mknee_study', 'L_ankle_study', 'L_mankle_study', + 'L_toe_study', 'L_calc_study', 'L_5meta_study', + 'r_shoulder_study', 'L_shoulder_study', 'C7_study', + 'r_thigh1_study', 'r_thigh2_study', 'r_thigh3_study', + 'L_thigh1_study', 'L_thigh2_study', 'L_thigh3_study', + 'r_sh1_study', 'r_sh2_study', 'r_sh3_study', 'L_sh1_study', + 'L_sh2_study', 'L_sh3_study', 'RHJC_study', 'LHJC_study'] + + lstm_upper_markers = ["r_lelbow_study", "r_melbow_study", "r_lwrist_study", + "r_mwrist_study", "L_lelbow_study", "L_melbow_study", + "L_lwrist_study", "L_mwrist_study"] + + # Check coverage + lower_markers_present = [m for m in lstm_lower_markers if m in trc_markers_list] + upper_markers_present = [m for m in lstm_upper_markers if m in trc_markers_list] + lower_coverage = len(lower_markers_present) / len(lstm_lower_markers) if lstm_lower_markers else 0 + upper_coverage = len(upper_markers_present) / len(lstm_upper_markers) if lstm_upper_markers else 0 + + has_full_lower = lower_coverage >= 0.95 + has_full_upper = upper_coverage >= 0.95 + is_partial_augmentation = not (has_full_lower and has_full_upper) + + # For partial augmentation with upper limb virtual markers, create hybrid IK setup + if is_partial_augmentation and has_full_upper and pose_model == 'LSTM': + logging.info(" Creating hybrid IK setup for partial upper limb augmentation...") + # Start from HALPE_26 IK setup as base + halpe_ik_path = get_IK_Setup('HALPE_26', osim_setup_dir) + ik_tree = etree.parse(halpe_ik_path) + ik_root = ik_tree.getroot() + + # Define weight adjustments for virtual markers (higher weight for more accurate virtual markers) + virtual_marker_weights = { + 'r_melbow_study': 2.0, # Virtual elbow markers get higher weight + 'L_melbow_study': 2.0, + 'r_mwrist_study': 2.0, # Virtual wrist markers get higher weight + 'L_mwrist_study': 2.0, + 'r_lelbow_study': 1.5, + 'L_lelbow_study': 1.5, + 'r_lwrist_study': 1.5, + 'L_lwrist_study': 1.5 + } + + # Lower weights for original markers that have virtual equivalents + original_marker_weights = { + 'RElbow': 0.5, # Lower weight since we have r_melbow_study + 'LElbow': 0.5, + 'RWrist': 0.5, # Lower weight since we have r_mwrist_study + 'LWrist': 0.5 + } + + # Get IKTaskSet + ik_task_set = ik_root.find('.//IKTaskSet/objects') + if ik_task_set is not None: + # Update existing marker weights + for task in ik_task_set.findall('IKMarkerTask'): + marker_name = task.get('name') + weight_elem = task.find('weight') + if marker_name in original_marker_weights and weight_elem is not None: + weight_elem.text = str(original_marker_weights[marker_name]) + + # Add virtual marker tasks + for virtual_marker, weight in virtual_marker_weights.items(): + if virtual_marker in trc_markers_list: + # Create new IKMarkerTask for virtual marker + new_task = etree.Element('IKMarkerTask', name=virtual_marker) + apply_elem = etree.SubElement(new_task, 'apply') + apply_elem.text = 'true' + weight_elem = etree.SubElement(new_task, 'weight') + weight_elem.text = str(weight) + ik_task_set.append(new_task) + + # Save temporary hybrid IK setup + ik_path_temp = str(kinematics_dir / (trc_file.stem + '_ik_setup.xml')) + else: + # Standard IK setup + ik_path = get_IK_Setup(pose_model, osim_setup_dir) + ik_tree = etree.parse(ik_path) + ik_root = ik_tree.getroot() + ik_path_temp = str(kinematics_dir / (trc_file.stem + '_ik_setup.xml')) + + # Common IK setup updates output_motion_file = Path(kinematics_dir, trc_file.stem + '.mot').resolve() + if not trc_file.exists(): raise FileNotFoundError(f"TRC file does not exist: {trc_file}") _, _, time_col, _, _ = read_trc(trc_file) start_time, end_time = time_col.iloc[0], time_col.iloc[-1] - # Update IK setup file - ik_tree = etree.parse(ik_path) - ik_root = ik_tree.getroot() ik_root.find('.//model_file').text = str(scaled_model_path) ik_root.find('.//time_range').text = f'{start_time} {end_time}' ik_root.find('.//output_motion_file').text = str(output_motion_file) diff --git a/Pose2Sim/markerAugmentation.py b/Pose2Sim/markerAugmentation.py index aaa520a9..30917bd8 100644 --- a/Pose2Sim/markerAugmentation.py +++ b/Pose2Sim/markerAugmentation.py @@ -114,10 +114,15 @@ def augment_markers_all(config_dict): pose_3d_dir = os.path.realpath(os.path.join(project_dir, 'pose-3d')) feet_on_floor = config_dict.get('markerAugmentation').get('feet_on_floor') make_c3d = config_dict.get('markerAugmentation').get('make_c3d') + shoulder_constraint_weight = config_dict.get('markerAugmentation').get('shoulder_constraint_weight', 0.3) frame_range = config_dict.get('project').get('frame_range') subject_height = config_dict.get('project').get('participant_height') subject_mass = config_dict.get('project').get('participant_mass') + # Get body segment augmentation control parameters + augment_upper_limb = config_dict.get('markerAugmentation').get('augment_upper_limb', True) + augment_lower_limb = config_dict.get('markerAugmentation').get('augment_lower_limb', True) + fastest_frames_to_remove_percent = config_dict.get('kinematics').get('fastest_frames_to_remove_percent') close_to_zero_speed = config_dict.get('kinematics').get('close_to_zero_speed_m') large_hip_knee_angles = config_dict.get('kinematics').get('large_hip_knee_angles') @@ -203,10 +208,38 @@ def augment_markers_all(config_dict): # Upper body augmenterModelType_upper = '{}_upper'.format(augmenter_model) feature_markers_upper, response_markers_upper = getMarkers_upperExtremity_noPelvis2() - augmenterModelType_all = [augmenterModelType_lower, augmenterModelType_upper] - feature_markers_all = [feature_markers_lower, feature_markers_upper] - response_markers_all = [response_markers_lower, response_markers_upper] - logging.info(f'Using Stanford {augmenterModelName} {augmenter_model} augmenter model. Feet are {"" if feet_on_floor else "not "}vertically offset to be at floor level.\n') + + # Build augmenter list based on configuration + augmenterModelType_all = [] + feature_markers_all = [] + response_markers_all = [] + + if augment_lower_limb: + augmenterModelType_all.append(augmenterModelType_lower) + feature_markers_all.append(feature_markers_lower) + response_markers_all.append(response_markers_lower) + + if augment_upper_limb: + augmenterModelType_all.append(augmenterModelType_upper) + feature_markers_all.append(feature_markers_upper) + response_markers_all.append(response_markers_upper) + + # Log which augmenters are being used + augmentation_parts = [] + if augment_lower_limb: + augmentation_parts.append('lower limb') + if augment_upper_limb: + augmentation_parts.append('upper limb') + + if len(augmentation_parts) == 0: + logging.warning(f'Both augment_upper_limb and augment_lower_limb are set to false. Skipping marker augmentation for {os.path.basename(trc_file)}.') + continue + + # Warn if partial augmentation when use_augmentation is true in kinematics + if len(augmentation_parts) < 2: + logging.warning(f'\nPartial marker augmentation detected (only {" and ".join(augmentation_parts)}).') + + logging.info(f'Using Stanford {augmenterModelName} {augmenter_model} augmenter model for {" and ".join(augmentation_parts)}. Feet are {"" if feet_on_floor else "not "}vertically offset to be at floor level.\n') # %% Process data. # Import TRC file @@ -230,7 +263,9 @@ def augment_markers_all(config_dict): header[2] = '\t'.join(part if i != 7 else str(frame_nb)+'\n' for i, part in enumerate(header[2].split('\t'))) # Verify that all feature markers are present in the TRC file. - feature_markers_joined = set(feature_markers_all[0]+feature_markers_all[1]) + feature_markers_joined = set() + for fm in feature_markers_all: + feature_markers_joined.update(fm) missing_markers = list(feature_markers_joined - set(markers)) if len(missing_markers) > 0: raise ValueError(f'Marker augmentation requires {missing_markers} markers and they are not present in the TRC file.') @@ -293,24 +328,100 @@ def augment_markers_all(config_dict): # %% Add response markers to trc_data and update markers and header. - trc_data_response = pd.DataFrame(unnorm2_outputs, columns=[m for m in response_markers for _ in range(3)]) + if shoulder_constraint_weight < 1.0: + # Apply constraints to shoulder markers (weighted blending) + shoulder_markers = ['r_shoulder_study', 'L_shoulder_study', 'C7_study', + 'r_sh1_study', 'r_sh2_study', 'r_sh3_study', + 'L_sh1_study', 'L_sh2_study', 'L_sh3_study'] + + # Create a mapping from shoulder virtual markers to original pose markers + shoulder_mapping = { + 'r_shoulder_study': 'RShoulder', + 'L_shoulder_study': 'LShoulder', + 'C7_study': None, # Will use midpoint of shoulders + 'r_sh1_study': 'RShoulder', + 'r_sh2_study': 'RShoulder', + 'r_sh3_study': 'RShoulder', + 'L_sh1_study': 'LShoulder', + 'L_sh2_study': 'LShoulder', + 'L_sh3_study': 'LShoulder' + } + + # Apply constraints to each shoulder marker + constrained_outputs = copy.deepcopy(unnorm2_outputs) + for i, marker_name in enumerate(response_markers): + if marker_name in shoulder_markers: + original_marker = shoulder_mapping[marker_name] + + if marker_name == 'C7_study': + # C7: use midpoint of two shoulders as reference + if 'RShoulder' in markers and 'LShoulder' in markers: + r_idx = markers.index('RShoulder') + l_idx = markers.index('LShoulder') + r_shoulder_pos = trc_data.iloc[:, r_idx*3:(r_idx+1)*3].values + l_shoulder_pos = trc_data.iloc[:, l_idx*3:(l_idx+1)*3].values + original_pos = (r_shoulder_pos + l_shoulder_pos) / 2 + else: + original_pos = unnorm2_outputs[:, 3*i:3*(i+1)] # fallback to LSTM + elif original_marker and original_marker in markers: + # Other shoulder markers: use corresponding original marker + orig_idx = markers.index(original_marker) + original_pos = trc_data.iloc[:, orig_idx*3:(orig_idx+1)*3].values + else: + # If original marker not found, use LSTM output + original_pos = unnorm2_outputs[:, 3*i:3*(i+1)] + + # Weighted blending: constrained = α × LSTM + (1-α) × original + lstm_pos = unnorm2_outputs[:, 3*i:3*(i+1)] + constrained_outputs[:, 3*i:3*(i+1)] = ( + shoulder_constraint_weight * lstm_pos + + (1 - shoulder_constraint_weight) * original_pos + ) + + trc_data_response = pd.DataFrame(constrained_outputs, columns=[m for m in response_markers for _ in range(3)]) + response_markers_to_add = response_markers + logging.info(f"Applying shoulder constraints with weight={shoulder_constraint_weight:.2f}") + else: + # No constraints, use LSTM output directly + trc_data_response = pd.DataFrame(unnorm2_outputs, columns=[m for m in response_markers for _ in range(3)]) + response_markers_to_add = response_markers + trc_data = pd.concat([trc_data, trc_data_response], axis=1) - - markers += response_markers + markers += response_markers_to_add header[2] = '\t'.join(part if i != 3 else str(len(markers)) for i, part in enumerate(header[2].split('\t'))) - response_markers_str = '\t\t\t'.join(response_markers) + response_markers_str = '\t\t\t'.join(response_markers_to_add) header[3] = header[3].replace('\t\t\t\n', f'\t\t\t{response_markers_str}\t\t\t\n') header[4] = ['\t\t'+'\t'.join([f'X{i+1}\tY{i+1}\tZ{i+1}' for i in range(len(markers))]) + '\t\n'][0] # %% Extract minimum y-position across response markers. This is used # to align feet and floor when visualizing. response_markers_conc = [m for resp in response_markers_all for m in resp] - min_y_pos = trc_data[response_markers_conc].iloc[:,1::3].min().min() + + # Only apply feet_on_floor if we have lower limb augmentation + # Otherwise, min_y_pos from upper limb markers (elbows, wrists) would be incorrect + has_foot_markers = any('ankle' in m.lower() or 'toe' in m.lower() or 'heel' in m.lower() or 'calc' in m.lower() + for m in response_markers_conc) + + if has_foot_markers: + min_y_pos = trc_data[response_markers_conc].iloc[:,1::3].min().min() + else: + # For partial augmentation without foot markers, use original marker foot positions + foot_markers_original = [m for m in markers if 'Ankle' in m or 'Toe' in m or 'Heel' in m] + if foot_markers_original: + min_y_pos = trc_data[foot_markers_original].iloc[:,1::3].min().min() + logging.info(f"Using original foot markers for ground reference (partial augmentation without lower limb)") + else: + min_y_pos = 0 + logging.warning(f"No foot markers found, skipping feet_on_floor adjustment") # %% If offset if feet_on_floor: - trc_data.iloc[:,1::3] = trc_data.iloc[:,1::3] - (min_y_pos-0.01) + if has_foot_markers or foot_markers_original: + trc_data.iloc[:,1::3] = trc_data.iloc[:,1::3] - (min_y_pos-0.01) + logging.info(f"Applied feet_on_floor adjustment: offset = {min_y_pos-0.01:.4f} m") + else: + logging.warning(f"Skipping feet_on_floor adjustment: no foot markers available") # %% Return augmented .trc file with open(trc_path_out, 'w') as trc_o: From 484375eb0b80e18520ae58569275fca2d3d03397 Mon Sep 17 00:00:00 2001 From: Cheng Penghui Date: Fri, 17 Oct 2025 16:43:17 +0800 Subject: [PATCH 2/2] default configure --- Pose2Sim/markerAugmentation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Pose2Sim/markerAugmentation.py b/Pose2Sim/markerAugmentation.py index 30917bd8..b97f5d7c 100644 --- a/Pose2Sim/markerAugmentation.py +++ b/Pose2Sim/markerAugmentation.py @@ -114,7 +114,7 @@ def augment_markers_all(config_dict): pose_3d_dir = os.path.realpath(os.path.join(project_dir, 'pose-3d')) feet_on_floor = config_dict.get('markerAugmentation').get('feet_on_floor') make_c3d = config_dict.get('markerAugmentation').get('make_c3d') - shoulder_constraint_weight = config_dict.get('markerAugmentation').get('shoulder_constraint_weight', 0.3) + shoulder_constraint_weight = config_dict.get('markerAugmentation').get('shoulder_constraint_weight', 1.0) frame_range = config_dict.get('project').get('frame_range') subject_height = config_dict.get('project').get('participant_height') subject_mass = config_dict.get('project').get('participant_mass')