From 612f157265dae3523dc2280138e243ef8197b16d Mon Sep 17 00:00:00 2001 From: "Jason K. Moore" Date: Wed, 25 Mar 2015 23:33:18 -0700 Subject: [PATCH 1/9] Basic inverse dynamics function working in Python. --- gaitanalysis/gait.py | 277 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) diff --git a/gaitanalysis/gait.py b/gaitanalysis/gait.py index ba33101..47ccf22 100644 --- a/gaitanalysis/gait.py +++ b/gaitanalysis/gait.py @@ -3,6 +3,8 @@ # standard library import os +from collections import namedtuple +import warnings # external libraries import numpy as np @@ -1036,3 +1038,278 @@ def plot_gait_cycles(gait_cycles, *col_names, **kwargs): ax.set_xlabel('Percent of Gait Cycle [%]') return axes + + +def lower_extremity_2d_inverse_dynamics(time, + marker_positions, + marker_velocities, + marker_accelerations, + force_plate_values, + g=9.81): + """Returns the 2D inverse dynamics of a single lower limb. + + Parameters + ========== + time: array_like, shape(N,) + Time stamps for the marker and force plate data in seconds. + marker_positions: array_like, shape(N, 12) + The X and Y coordinates of the six markers in meters given as + alternating columns: [X0, Y0, X1, Y1, ..., X5, Y5]. + marker_velocities: array_like, shape(N, 12) + The rate of change of the X and Y coordinates of the six markers in + meters per second. + marker_accelerations: array_like, shape(N, 12) + The rate of change of the X and Y velocities of the six markers in + meters per second per second. + force_plate_values: array_like, shape(N, 3) + Normalized loads applied to foot [Fx, Fy, Mz] in N/kg (normalized to + body mass). + g : float + Acceleration due to gravity in meters per second per second. + + Returns + ======= + joint_angles: ndarray, shape(N, 3) + Joint angles in three joints ankle, knee, hip in radians. + joint_angular_rates: ndarray, shape(N, 3) + Angular velocities in three joints (rad/s) + joint_moments: ndarray, shape(N, 3) + Moments in three joints, (Nm per kg body mass) + joint_forces: (Nsamples x 6) + Forces (Fx,Fy) in three joints, (N per kg body mass) + + Notes + ===== + + Coordinate system: + X is forward (direction of walking), Y is up + + Markers: + 0: Shoulder + 1: Greater trochanter + 2: Lateral epicondyle of knee + 3: Lateral malleolus + 4: Heel (placed at same height as marker 6) + 5: Head of 5th metatarsal + + Joints: + hip, knee, ankle + sign convention for angles and moments: hip flexion, knee flexion, + ankle plantarflexion are positive + + References + ========== + + Method: + Winter, DA (2005) Biomechanics of Human Movement. + + + """ + + num_markers = 6 + num_coordinates = 2 * num_markers + num_force_plate_channels = 3 + num_segments = 4 + num_samples = time.shape[0] + + # define the body segments + # column 1: proximal marker + # column 2: distal marker + # column 3: joint marker (at proximal end) + # column 4: mass as fraction of body mass (from Winter book) + # column 5: center of mass as fraction of length (from Winter book) + # column 6: radius of gyration as fraction of length (from Winter book) + # column 7: +1(-1) if positive angle/moment in prox, joint corresponds + # to counterclockwise(clockwise) rotation of segment + + Segment = namedtuple('Segment', ['name', + 'num', + 'prox_marker_idx', + 'dist_marker_idx', + 'prox_joint_marker_idx', + 'normalized_mass', + 'mass_center_fraction', + 'radius_of_gyration_fraction', + 'sign']) + + segments = [Segment('torso', 0, 0, 1, np.nan, np.nan, np.nan, np.nan, np.nan), + Segment('thigh', 1, 1, 2, 1, 0.1000, 0.433, 0.323, 1), + Segment('shank', 2, 2, 3, 2, 0.0465, 0.433, 0.302, -1), + Segment('foot', 3, 4, 5, 3, 0.0145, 0.500, 0.475, -1)] + + if time.shape[0] != marker_positions.shape[0]: + msg = ('The number of samples in marker data is not the same as ' + 'number of time stamps.') + raise ValueError(msg) + + if time.shape[0] != marker_velocities.shape[0]: + msg = ('The number of samples in marker data is not the same as ' + 'number of time stamps.') + raise ValueError(msg) + + if time.shape[0] != marker_accelerations.shape[0]: + msg = ('The number of samples in marker data is not the same as ' + 'number of time stamps.') + raise ValueError(msg) + + if time.shape[0] != force_plate_values.shape[0]: + msg = ('The number of samples in force plate data is not the same as ' + 'number of time stamps.') + raise ValueError(msg) + + if marker_positions.shape[1] != num_coordinates: + msg = 'The number of columns in mocap data is not correct.' + raise ValueError(msg) + + if force_plate_values.shape[1] != num_force_plate_channels: + msg = 'The number of columns in force plate data is not correct.' + raise ValueError(msg) + + seg_lengths = np.zeros(num_segments) + + seg_com_x_pos = np.zeros((num_samples, num_segments)) + seg_com_y_pos = np.zeros((num_samples, num_segments)) + seg_com_x_acc = np.zeros((num_samples, num_segments)) + seg_com_y_acc = np.zeros((num_samples, num_segments)) + + seg_theta = np.zeros((num_samples, num_segments)) + seg_omega = np.zeros((num_samples, num_segments)) + seg_alpha = np.zeros((num_samples, num_segments)) + + for i, segment in enumerate(segments): + + prox_x_idx = 2 * segment.prox_marker_idx + prox_y_idx = prox_x_idx + 2 + + prox_pos = marker_positions[:, prox_x_idx:prox_y_idx] + prox_vel = marker_velocities[:, prox_x_idx:prox_y_idx] + prox_acc = marker_accelerations[:, prox_x_idx:prox_y_idx] + + dist_x_idx = 2 * segment.dist_marker_idx + dist_y_idx = dist_x_idx + 2 + + dist_pos = marker_positions[:, dist_x_idx:dist_y_idx] + dist_vel = marker_velocities[:, dist_x_idx:dist_y_idx] + dist_acc = marker_accelerations[:, dist_x_idx:dist_y_idx] + + # vector R points from proximal to distal marker + R_pos = dist_pos - prox_pos + R_vel = dist_vel - prox_vel + R_acc = dist_acc - prox_acc + + # calculate segment center of mass position and segment orientation + # angle, and 1st and 2nd derivatives + + seg_com_pos = prox_pos + segment.mass_center_fraction * R_pos + + seg_com_x_pos[:, i] = seg_com_pos[:, 0] + seg_com_y_pos[:, i] = seg_com_pos[:, 1] + + seg_com_acc = prox_acc + segment.mass_center_fraction * R_acc + + seg_com_x_acc[:, i] = seg_com_acc[:, 0] + seg_com_y_acc[:, i] = seg_com_acc[:, 1] + + # orientation of the vector R, unwrap removes -pi to pi discontinuities + seg_theta[:, i] = np.unwrap(np.arctan2(R_pos[:, 1], R_pos[:, 0])) + + # analytical time derivative of segment angle + seg_omega[:, i] = ((R_pos[:, 0] * R_vel[:, 1] - + R_pos[:, 1] * R_vel[:, 0]) / + (R_pos[:, 1]**2 + R_pos[:, 0]**2)) + + # analytical time derivative of segment angular velocity + seg_alpha[:, i] = ((R_pos[:, 0] * R_acc[:, 1] - R_pos[:, 1] * + R_acc[:, 0]) / (R_pos[:, 1]**2 + R_pos[:, 0]**2) + - 2.0 * (R_pos[:, 0] * R_vel[:, 1] - R_pos[:, 1] + * R_vel[:, 0]) * + (R_pos[:, 1] * R_vel[:, 1] + R_pos[:, 0] * + R_vel[:, 0]) / (R_pos[:, 1]**2 + R_pos[:, 0]**2)**2) + + seg_length = np.sqrt(R_pos[:, 0]**2 + R_pos[:, 1]**2) + seg_lengths[i] = seg_length.mean() + + if np.max(np.abs(seg_length - seg_lengths[i])) > 0.1: + msg = ('Error detected while processing segment {}\n' + 'Segment length changed by more than 0.1 meters') + with warnings.catch_warnings(): + warnings.simplefilter('always') + warnings.warn(msg.format(i), Warning) + + joint_angles = np.zeros((num_samples, num_segments - 1)) + joint_angular_rates = np.zeros((num_samples, num_segments - 1)) + joint_moments = np.zeros((num_samples, num_segments - 1)) + joint_forces = np.zeros((num_samples, 2 * (num_segments - 1))) + + # do the inverse dynamics, starting at the foot, but not for the torso + # segment + for segment in reversed(segments[1:]): + + i = segment.num + + # compute vectors P and D from center of mass to distal and proximal + # joint + + prox_joint_x_idx = 2 * segment.prox_joint_marker_idx + prox_joint_y_idx = prox_joint_x_idx - 1 + + Px = marker_positions[:, prox_joint_x_idx] - seg_com_x_pos[:, i] + Py = marker_positions[:, prox_joint_y_idx] - seg_com_y_pos[:, i] + + if segment.name == 'foot': + # for the last segment, distal joint is the force plate data, + # applied to foot at global origin + + Dx = -seg_com_x_pos[:, i] + Dy = -seg_com_y_pos[:, i] + + dist_force_x = force_plate_values[:, 0] + dist_force_y = force_plate_values[:, 1] + dist_moment = force_plate_values[:, 2] + else: + # marker for distal joint of this segment (=proximal joint of + # next segment) + + dist_joint_x_idx = 2 * segments[i - 1].prox_joint_marker_idx + dist_joint_y_idx = dist_joint_x_idx - 1 + + Dx = marker_positions[:, dist_joint_x_idx] - seg_com_x_pos[:, i] + Dy = marker_positions[:, dist_joint_y_idx] - seg_com_y_pos[:, i] + + # loads at the distal joint are the opposite of the proximal + # loads in the previous segment + dist_force_x = -prox_force_x + dist_force_y = -prox_force_y + dist_moment = -prox_moment + + mass = segment.normalized_mass + radius_of_gyration = segment.radius_of_gyration_fraction * seg_lengths[i] + inertia = mass * radius_of_gyration**2 + + # solve force and moment at proximal joint from the Newton-Euler + # equations + prox_force_x = mass * seg_com_x_acc[:, i] - dist_force_x + prox_force_y = (mass * seg_com_y_acc[:, i] - + dist_force_y + mass * g) + prox_moment = (inertia * seg_alpha[:, i] - dist_moment + - (Dx * dist_force_y - Dy * dist_force_x) - + (Px * prox_force_y - Py * prox_force_x)) + + # and store proximal joint motions and loads in the output variables + + # joint index (1, 2, or 3) for the proximal joint of segment i + j = i - 1 + + joint_angles[:, j] = segment.sign * (seg_theta[:, i] - + seg_theta[:, i - 1]) + + joint_angular_rates[:, j] = segment.sign * (seg_omega[:, i] - + seg_omega[:, i - 1]) + + joint_moments[:, j] = segment.sign * prox_moment + + joint_forces[:, 2 * j - 1] = prox_force_x + + joint_forces[:, 2 * j] = prox_force_y + + return joint_angles, joint_angular_rates, joint_moments, joint_forces From cf0437ce6f3d8974bec98c49f403c117dafb580f Mon Sep 17 00:00:00 2001 From: "Jason K. Moore" Date: Fri, 27 Mar 2015 01:01:01 -0700 Subject: [PATCH 2/9] The Python inverse dynamics algorithm now gives the same results as the Octave one. --- gaitanalysis/gait.py | 124 ++++++++++++++++---------------- gaitanalysis/tests/test_gait.py | 99 ++++++++++++++++++++----- 2 files changed, 145 insertions(+), 78 deletions(-) diff --git a/gaitanalysis/gait.py b/gaitanalysis/gait.py index 47ccf22..f7c750b 100644 --- a/gaitanalysis/gait.py +++ b/gaitanalysis/gait.py @@ -1040,11 +1040,8 @@ def plot_gait_cycles(gait_cycles, *col_names, **kwargs): return axes -def lower_extremity_2d_inverse_dynamics(time, - marker_positions, - marker_velocities, - marker_accelerations, - force_plate_values, +def lower_extremity_2d_inverse_dynamics(time, marker_pos, marker_vel, + marker_acc, force_plate_values, g=9.81): """Returns the 2D inverse dynamics of a single lower limb. @@ -1052,13 +1049,13 @@ def lower_extremity_2d_inverse_dynamics(time, ========== time: array_like, shape(N,) Time stamps for the marker and force plate data in seconds. - marker_positions: array_like, shape(N, 12) + marker_pos: array_like, shape(N, 12) The X and Y coordinates of the six markers in meters given as alternating columns: [X0, Y0, X1, Y1, ..., X5, Y5]. - marker_velocities: array_like, shape(N, 12) + marker_vel: array_like, shape(N, 12) The rate of change of the X and Y coordinates of the six markers in meters per second. - marker_accelerations: array_like, shape(N, 12) + marker_acc: array_like, shape(N, 12) The rate of change of the X and Y velocities of the six markers in meters per second per second. force_plate_values: array_like, shape(N, 3) @@ -1070,13 +1067,14 @@ def lower_extremity_2d_inverse_dynamics(time, Returns ======= joint_angles: ndarray, shape(N, 3) - Joint angles in three joints ankle, knee, hip in radians. + Joint angles in three joints: hip, knee, ankle in radians. joint_angular_rates: ndarray, shape(N, 3) - Angular velocities in three joints (rad/s) - joint_moments: ndarray, shape(N, 3) - Moments in three joints, (Nm per kg body mass) + Angular velocities in three joints: hip, knee, ankle in radians per + second. + joint_torques: ndarray, shape(N, 3) + Torques in three joints: hip, knee, ankle in Nm per kg body mass. joint_forces: (Nsamples x 6) - Forces (Fx,Fy) in three joints, (N per kg body mass) + Forces (Fx, Fy) in three joints, (N per kg body mass) Notes ===== @@ -1106,21 +1104,23 @@ def lower_extremity_2d_inverse_dynamics(time, """ + # TODO : Remove the time variable, it is not needed. + num_markers = 6 num_coordinates = 2 * num_markers num_force_plate_channels = 3 num_segments = 4 num_samples = time.shape[0] - # define the body segments - # column 1: proximal marker - # column 2: distal marker - # column 3: joint marker (at proximal end) - # column 4: mass as fraction of body mass (from Winter book) - # column 5: center of mass as fraction of length (from Winter book) - # column 6: radius of gyration as fraction of length (from Winter book) - # column 7: +1(-1) if positive angle/moment in prox, joint corresponds - # to counterclockwise(clockwise) rotation of segment + # define the body segments with these properties: + # - proximal marker + # - distal marker + # - joint marker (at proximal end) + # - mass as fraction of body mass (from Winter book) + # - center of mass as fraction of length (from Winter book) + # - radius of gyration as fraction of length (from Winter book) + # - +1(-1) if positive angle/moment in prox, joint corresponds to + # counterclockwise(clockwise) rotation of segment Segment = namedtuple('Segment', ['name', 'num', @@ -1137,17 +1137,17 @@ def lower_extremity_2d_inverse_dynamics(time, Segment('shank', 2, 2, 3, 2, 0.0465, 0.433, 0.302, -1), Segment('foot', 3, 4, 5, 3, 0.0145, 0.500, 0.475, -1)] - if time.shape[0] != marker_positions.shape[0]: + if time.shape[0] != marker_pos.shape[0]: msg = ('The number of samples in marker data is not the same as ' 'number of time stamps.') raise ValueError(msg) - if time.shape[0] != marker_velocities.shape[0]: + if time.shape[0] != marker_vel.shape[0]: msg = ('The number of samples in marker data is not the same as ' 'number of time stamps.') raise ValueError(msg) - if time.shape[0] != marker_accelerations.shape[0]: + if time.shape[0] != marker_acc.shape[0]: msg = ('The number of samples in marker data is not the same as ' 'number of time stamps.') raise ValueError(msg) @@ -1157,7 +1157,7 @@ def lower_extremity_2d_inverse_dynamics(time, 'number of time stamps.') raise ValueError(msg) - if marker_positions.shape[1] != num_coordinates: + if marker_pos.shape[1] != num_coordinates: msg = 'The number of columns in mocap data is not correct.' raise ValueError(msg) @@ -1179,18 +1179,18 @@ def lower_extremity_2d_inverse_dynamics(time, for i, segment in enumerate(segments): prox_x_idx = 2 * segment.prox_marker_idx - prox_y_idx = prox_x_idx + 2 + prox_y_idx = prox_x_idx + 1 - prox_pos = marker_positions[:, prox_x_idx:prox_y_idx] - prox_vel = marker_velocities[:, prox_x_idx:prox_y_idx] - prox_acc = marker_accelerations[:, prox_x_idx:prox_y_idx] + prox_pos = marker_pos[:, prox_x_idx:prox_y_idx + 1] + prox_vel = marker_vel[:, prox_x_idx:prox_y_idx + 1] + prox_acc = marker_acc[:, prox_x_idx:prox_y_idx + 1] dist_x_idx = 2 * segment.dist_marker_idx - dist_y_idx = dist_x_idx + 2 + dist_y_idx = dist_x_idx + 1 - dist_pos = marker_positions[:, dist_x_idx:dist_y_idx] - dist_vel = marker_velocities[:, dist_x_idx:dist_y_idx] - dist_acc = marker_accelerations[:, dist_x_idx:dist_y_idx] + dist_pos = marker_pos[:, dist_x_idx:dist_y_idx + 1] + dist_vel = marker_vel[:, dist_x_idx:dist_y_idx + 1] + dist_acc = marker_acc[:, dist_x_idx:dist_y_idx + 1] # vector R points from proximal to distal marker R_pos = dist_pos - prox_pos @@ -1219,12 +1219,12 @@ def lower_extremity_2d_inverse_dynamics(time, (R_pos[:, 1]**2 + R_pos[:, 0]**2)) # analytical time derivative of segment angular velocity - seg_alpha[:, i] = ((R_pos[:, 0] * R_acc[:, 1] - R_pos[:, 1] * - R_acc[:, 0]) / (R_pos[:, 1]**2 + R_pos[:, 0]**2) - - 2.0 * (R_pos[:, 0] * R_vel[:, 1] - R_pos[:, 1] - * R_vel[:, 0]) * - (R_pos[:, 1] * R_vel[:, 1] + R_pos[:, 0] * - R_vel[:, 0]) / (R_pos[:, 1]**2 + R_pos[:, 0]**2)**2) + a_0 = R_pos[:, 0] * R_acc[:, 1] - R_pos[:, 1] * R_acc[:, 0] + a_1 = R_pos[:, 0] * R_vel[:, 1] - R_pos[:, 1] * R_vel[:, 0] + a_2 = R_pos[:, 1] * R_vel[:, 1] + R_pos[:, 0] * R_vel[:, 0] + a_3 = R_pos[:, 1]**2 + R_pos[:, 0]**2 + + seg_alpha[:, i] = a_0 / a_3 - 2.0 * a_1 * a_2 / a_3**2 seg_length = np.sqrt(R_pos[:, 0]**2 + R_pos[:, 1]**2) seg_lengths[i] = seg_length.mean() @@ -1251,10 +1251,10 @@ def lower_extremity_2d_inverse_dynamics(time, # joint prox_joint_x_idx = 2 * segment.prox_joint_marker_idx - prox_joint_y_idx = prox_joint_x_idx - 1 + prox_joint_y_idx = prox_joint_x_idx + 1 - Px = marker_positions[:, prox_joint_x_idx] - seg_com_x_pos[:, i] - Py = marker_positions[:, prox_joint_y_idx] - seg_com_y_pos[:, i] + Px = marker_pos[:, prox_joint_x_idx] - seg_com_x_pos[:, i] + Py = marker_pos[:, prox_joint_y_idx] - seg_com_y_pos[:, i] if segment.name == 'foot': # for the last segment, distal joint is the force plate data, @@ -1267,14 +1267,13 @@ def lower_extremity_2d_inverse_dynamics(time, dist_force_y = force_plate_values[:, 1] dist_moment = force_plate_values[:, 2] else: - # marker for distal joint of this segment (=proximal joint of - # next segment) - - dist_joint_x_idx = 2 * segments[i - 1].prox_joint_marker_idx - dist_joint_y_idx = dist_joint_x_idx - 1 + # The marker at the distal joint of this segment is the same as + # the marker at the proximal joint of next the segment. + dist_joint_x_idx = 2 * segments[i + 1].prox_joint_marker_idx + dist_joint_y_idx = dist_joint_x_idx + 1 - Dx = marker_positions[:, dist_joint_x_idx] - seg_com_x_pos[:, i] - Dy = marker_positions[:, dist_joint_y_idx] - seg_com_y_pos[:, i] + Dx = marker_pos[:, dist_joint_x_idx] - seg_com_x_pos[:, i] + Dy = marker_pos[:, dist_joint_y_idx] - seg_com_y_pos[:, i] # loads at the distal joint are the opposite of the proximal # loads in the previous segment @@ -1282,22 +1281,23 @@ def lower_extremity_2d_inverse_dynamics(time, dist_force_y = -prox_force_y dist_moment = -prox_moment + # solve force and moment at proximal joint from the Newton-Euler + # equations mass = segment.normalized_mass + + prox_force_x = mass * seg_com_x_acc[:, i] - dist_force_x + prox_force_y = mass * seg_com_y_acc[:, i] - dist_force_y + mass * g + radius_of_gyration = segment.radius_of_gyration_fraction * seg_lengths[i] inertia = mass * radius_of_gyration**2 - # solve force and moment at proximal joint from the Newton-Euler - # equations - prox_force_x = mass * seg_com_x_acc[:, i] - dist_force_x - prox_force_y = (mass * seg_com_y_acc[:, i] - - dist_force_y + mass * g) - prox_moment = (inertia * seg_alpha[:, i] - dist_moment - - (Dx * dist_force_y - Dy * dist_force_x) - - (Px * prox_force_y - Py * prox_force_x)) + prox_moment = (inertia * seg_alpha[:, i] - dist_moment + - (Dx * dist_force_y - Dy * dist_force_x) + - (Px * prox_force_y - Py * prox_force_x)) # and store proximal joint motions and loads in the output variables - # joint index (1, 2, or 3) for the proximal joint of segment i + # joint index (hip, knee, ankle) for the proximal joint of segment i j = i - 1 joint_angles[:, j] = segment.sign * (seg_theta[:, i] - @@ -1308,8 +1308,8 @@ def lower_extremity_2d_inverse_dynamics(time, joint_moments[:, j] = segment.sign * prox_moment - joint_forces[:, 2 * j - 1] = prox_force_x + joint_forces[:, 2 * j] = prox_force_x - joint_forces[:, 2 * j] = prox_force_y + joint_forces[:, 2 * j - 1] = prox_force_y - return joint_angles, joint_angular_rates, joint_moments, joint_forces + return joint_angles, joint_angular_rates, joint_moments, joint_forces diff --git a/gaitanalysis/tests/test_gait.py b/gaitanalysis/tests/test_gait.py index 648cd33..8880087 100644 --- a/gaitanalysis/tests/test_gait.py +++ b/gaitanalysis/tests/test_gait.py @@ -10,18 +10,14 @@ import pandas from pandas.util.testing import assert_frame_equal from nose.tools import assert_raises +from oct2py import octave +from dtk.process import (time_vector, butterworth, derivative, + coefficient_of_determination) # local -from ..gait import find_constant_speed, interpolate, GaitData -from dtk.process import time_vector - -# debugging -try: - from IPython.core.debugger import Tracer -except ImportError: - pass -else: - set_trace = Tracer() +from ..gait import (find_constant_speed, interpolate, GaitData, + lower_extremity_2d_inverse_dynamics) +from ..motek import spline_interpolate_over_missing def test_find_constant_speed(): @@ -218,8 +214,8 @@ def test_grf_landmarks(self, plot=False): def test_plot_landmarks(self): gait_data = GaitData(self.data_frame) gait_data.grf_landmarks('Right Vertical GRF', - 'Left Vertical GRF', - threshold=self.threshold) + 'Left Vertical GRF', + threshold=self.threshold) side = 'right' col_names = ['Right Vertical GRF','Right Knee Angle','Right Knee Moment'] time = gait_data.data.index.values.astype(float) @@ -297,8 +293,8 @@ def test_plot_gait_cycles(self): gait_data = GaitData(self.data_frame) gait_data.grf_landmarks('Right Vertical GRF', - 'Left Vertical GRF', - threshold=self.threshold) + 'Left Vertical GRF', + threshold=self.threshold) gait_data.split_at('right') assert_raises(ValueError, gait_data.plot_gait_cycles) @@ -307,8 +303,8 @@ def test_save_load(self): gait_data = GaitData(self.data_frame) gait_data.grf_landmarks('Right Vertical GRF', - 'Left Vertical GRF', - threshold=self.threshold) + 'Left Vertical GRF', + threshold=self.threshold) gait_data.split_at('right') gait_data.save('some_data.h5') @@ -336,3 +332,74 @@ def teardown(self): pass else: os.remove('some_data.h5') + + +def test_lower_extremity_2d_inverse_dynamics(): + + # This test ensures that the new fast Python version of the inverse + # dynamics algorithm gives the same results as the Octave version. + + this_files_dir = os.path.split(__file__)[0] + rel_path_to_m_dir = os.path.join(this_files_dir, '..', 'octave', + '2d_inverse_dynamics') + m_file_directory = os.path.abspath(rel_path_to_m_dir) + m_test_directory = os.path.join(m_file_directory, 'test') + data_file_path = os.path.join(m_test_directory, 'rawdata.txt') + + data = pandas.read_csv(data_file_path, sep="\t", na_values="NaN ", + dtype=float) + + time = data['% TimeStamp'].values + sample_rate = 1.0 / np.mean(np.diff(time)) + + filter_freq = 6.0 + + # interpolate + data = spline_interpolate_over_missing(data, '% TimeStamp') + + # filter data (note this filters the TimeStamp too) + data[data.columns] = butterworth(data.values, filter_freq, sample_rate, + axis=0) + + # differentiate marker data twice + marker_labels = ['RSHO.PosX', + 'RSHO.PosY', + 'RGTRO.PosX', + 'RGTRO.PosY', + 'RLEK.PosX', + 'RLEK.PosY', + 'RLM.PosX', + 'RLM.PosY', + 'RHEE.PosX', + 'RHEE.PosY', + 'RMT5.PosX', + 'RMT5.PosY'] + + marker_pos = data[marker_labels].values + marker_vel = derivative(time, marker_pos, method='combination') + marker_acc = derivative(time, marker_vel, method='combination') + + force_plate_labels = ['FP2.ForX', + 'FP2.ForY', + 'FP2.MomZ'] + + force_plate_values = data[force_plate_labels].values + + res = lower_extremity_2d_inverse_dynamics(time, marker_pos, marker_vel, + marker_acc, + force_plate_values) + + octave.addpath(m_file_directory) + + options = {'freq': filter_freq} + + angles, velocities, moments, forces = \ + octave.leg2d(time.reshape((len(time), 1)), marker_pos, + force_plate_values, options) + + expected_res = (angles, velocities, moments, forces) + + for array, expected_array in zip(res, expected_res): + # NOTE : This is the best check I was able to come up with to + # compare the time series. + assert coefficient_of_determination(expected_array, array) > 0.99 From dd7a62fe8c067675f4de4c3e881748c33a59f639 Mon Sep 17 00:00:00 2001 From: "Jason K. Moore" Date: Fri, 27 Mar 2015 10:42:59 -0700 Subject: [PATCH 3/9] Added a low_pass_filter method to GaitData. --- gaitanalysis/gait.py | 36 +++++++++++++++++++++++++++++++++ gaitanalysis/tests/test_gait.py | 20 ++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/gaitanalysis/gait.py b/gaitanalysis/gait.py index f7c750b..755875d 100644 --- a/gaitanalysis/gait.py +++ b/gaitanalysis/gait.py @@ -716,6 +716,42 @@ def time_derivative(self, col_names, new_col_names=None): self.data[col_name].values, method='combination') + def low_pass_filter(self, col_names, cutoff, new_col_names=None, + order=2): + """Low pass filters the specified columns with a Butterworth filter. + + Parameters + ========== + col_names : list of strings + The column names for the time series which should be numerically + time differentiated. + cutoff : float + The desired low pass cutoff frequency in Hertz. + new_col_names : list of strings, optional + The desired new column name(s) for the filtered series. If None, + then a default name of `Filtered ` will be + used. + order : int + The order of the Butterworth filter. + + """ + + if new_col_names is None: + new_col_names = ['Filtered {}'.format(c) for c in col_names] + + time = self.data.index.values.astype(float) + sample_rate = 1.0 / np.mean(np.diff(time)) + + filtered_data = process.butterworth(self.data[col_names].values, + cutoff, sample_rate, + order=order, axis=0) + + # TODO : Ideally these could be added to the DataFrame in one + # command. + + for i, col in enumerate(new_col_names): + self.data[col] = filtered_data[:, i] + def save(self, filename): """Saves data to disk via HDF5 (PyTables). diff --git a/gaitanalysis/tests/test_gait.py b/gaitanalysis/tests/test_gait.py index 8880087..ea67575 100644 --- a/gaitanalysis/tests/test_gait.py +++ b/gaitanalysis/tests/test_gait.py @@ -299,6 +299,26 @@ def test_plot_gait_cycles(self): assert_raises(ValueError, gait_data.plot_gait_cycles) + def test_low_pass_filter(self): + + gait_data = GaitData(self.data_frame) + + cols = ['Right Knee Angle', 'Right Knee Moment'] + + gait_data.low_pass_filter(cols, 10.0) + + fil_cols = ['Filtered {}'.format(c) for c in cols] + + for c in fil_cols: + assert c in gait_data.data.columns + + gait_data.low_pass_filter(cols, 10.0, new_col_names=['a', 'b']) + + for c in ['a', 'b']: + assert c in gait_data.data.columns + + gait_data.low_pass_filter(cols, 10.0, order=3) + def test_save_load(self): gait_data = GaitData(self.data_frame) From 587beabc3449a8f0928f01270a82a19e9408c7a6 Mon Sep 17 00:00:00 2001 From: "Jason K. Moore" Date: Fri, 27 Mar 2015 11:16:32 -0700 Subject: [PATCH 4/9] Replaced Octave inverse dyn with Python one in GaitData. --- gaitanalysis/gait.py | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/gaitanalysis/gait.py b/gaitanalysis/gait.py index 755875d..cf66d20 100644 --- a/gaitanalysis/gait.py +++ b/gaitanalysis/gait.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # standard library -import os +from time import time as clock from collections import namedtuple import warnings @@ -12,7 +12,6 @@ import matplotlib.pyplot as plt import pandas from dtk import process -from oct2py import octave # local from .utils import _percent_formatter @@ -209,16 +208,9 @@ def inverse_dynamics_2d(self, left_leg_markers, right_leg_markers, the inverse dynamics. You should pass in unfiltered data. """ - this_files_dir = os.path.split(__file__)[0] - m_file_directory = os.path.abspath(os.path.join(this_files_dir, - 'octave', - '2d_inverse_dynamics')) - octave.addpath(m_file_directory) - - options = {'freq': low_pass_cutoff} time = self.data.index.values.astype(float) - time = time.reshape((len(time), 1)) # octave wants a column vector + sample_rate = 1.0 / np.mean(np.diff(time)) marker_sets = [left_leg_markers, right_leg_markers] force_sets = [left_leg_forces, right_leg_forces] @@ -232,18 +224,25 @@ def inverse_dynamics_2d(self, left_leg_markers, right_leg_markers, for side_label, markers, forces in zip(side_labels, marker_sets, force_sets): - marker_array = self.data[markers].values.copy() - normalized_force_array = \ - self.data[forces].values.copy() / body_mass - - # oct2py doesn't allow multiple outputs to be stored in a tuple - # like python, so you have to output each variable - # independently - angles, velocities, moments, forces = \ - octave.leg2d(time, marker_array, normalized_force_array, - options) - - dynamics = angles, velocities, moments, forces + t0 = clock() + marker_pos = self.data[markers].values.copy() + marker_pos = process.butterworth(marker_pos, low_pass_cutoff, + sample_rate, axis=0) + marker_vel = process.derivative(time, marker_pos, + method='combination') + marker_acc = process.derivative(time, marker_vel, + method='combination') + force_array = \ + process.butterworth(self.data[forces].values.copy(), + low_pass_cutoff, sample_rate, axis=0) + normalized_force_array = force_array / body_mass + + dynamics = lower_extremity_2d_inverse_dynamics(time, marker_pos, + marker_vel, + marker_acc, + normalized_force_array) + t1 = clock() + print(t1 - t0) fours = zip(dynamics, dynamic_labels, scale_factors) From b972b6d3dd3b2d35c22ae42b3af92e375dc2597f Mon Sep 17 00:00:00 2001 From: "Jason K. Moore" Date: Fri, 27 Mar 2015 12:14:39 -0700 Subject: [PATCH 5/9] Created a _leg2d method to mimic the Octave call. --- gaitanalysis/gait.py | 73 +++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/gaitanalysis/gait.py b/gaitanalysis/gait.py index cf66d20..2d596ab 100644 --- a/gaitanalysis/gait.py +++ b/gaitanalysis/gait.py @@ -2,13 +2,13 @@ # -*- coding: utf-8 -*- # standard library -from time import time as clock from collections import namedtuple import warnings # external libraries import numpy as np from scipy.integrate import simps +from scipy.signal import firwin, filtfilt import matplotlib.pyplot as plt import pandas from dtk import process @@ -147,6 +147,29 @@ def __init__(self, data): f.close() self.load(data) + def _leg2d(self, time, marker_values, normalized_force_plate_values, + cutoff, sample_rate): + """This method effectively does the same thing that the Octave + routine does.""" + + marker_pos = process.butterworth(marker_values, cutoff, sample_rate, + axis=0) + + marker_vel = process.derivative(time, marker_pos, + method='combination') + + marker_acc = process.derivative(time, marker_vel, + method='combination') + + force_array = process.butterworth(normalized_force_plate_values, + cutoff, sample_rate, axis=0) + + inv_dyn = lower_extremity_2d_inverse_dynamics + + dynamics = inv_dyn(time, marker_pos, marker_vel, marker_acc, + force_array) + return dynamics + def inverse_dynamics_2d(self, left_leg_markers, right_leg_markers, left_leg_forces, right_leg_forces, body_mass, low_pass_cutoff): @@ -224,25 +247,11 @@ def inverse_dynamics_2d(self, left_leg_markers, right_leg_markers, for side_label, markers, forces in zip(side_labels, marker_sets, force_sets): - t0 = clock() - marker_pos = self.data[markers].values.copy() - marker_pos = process.butterworth(marker_pos, low_pass_cutoff, - sample_rate, axis=0) - marker_vel = process.derivative(time, marker_pos, - method='combination') - marker_acc = process.derivative(time, marker_vel, - method='combination') - force_array = \ - process.butterworth(self.data[forces].values.copy(), - low_pass_cutoff, sample_rate, axis=0) - normalized_force_array = force_array / body_mass - - dynamics = lower_extremity_2d_inverse_dynamics(time, marker_pos, - marker_vel, - marker_acc, - normalized_force_array) - t1 = clock() - print(t1 - t0) + marker_vals = self.data[markers].values.copy() + force_vals = self.data[forces].values.copy() / body_mass + + dynamics = self._leg2d(time, marker_vals, force_vals, + low_pass_cutoff, sample_rate) fours = zip(dynamics, dynamic_labels, scale_factors) @@ -355,12 +364,11 @@ def nearest_index(array, val): else: raise ValueError('min_time out of range.') - if method is not 'accel' and method is not 'force': raise ValueError('{} is not a valid method'.format(method)) - func = {'force' : gait_landmarks_from_grf, - 'accel' : gait_landmarks_from_accel} + func = {'force': gait_landmarks_from_grf, + 'accel': gait_landmarks_from_accel} right_strikes, left_strikes, right_offs, left_offs = \ func[method](time[self.min_idx:self.max_idx], @@ -399,7 +407,6 @@ def nearest_index(array, val): return right_strikes, left_strikes, right_offs, left_offs - def plot_landmarks(self, col_names, side, event='both', index=0, window=None, num_cycles_to_plot=None, curve_kwargs=None, heel_kwargs=None, @@ -642,7 +649,7 @@ def split_at(self, side, section='both', num_samples=None, gait_cycle_stats = {'Number of Samples': [], 'Stride Duration': [], 'Stride Frequency': [], - } + } if belt_speed_column is not None: gait_cycle_stats['Stride Length'] = [] @@ -880,6 +887,7 @@ def death_times(ordinate): return right_foot_strikes, left_foot_strikes, right_toe_offs, left_toe_offs + def gait_landmarks_from_accel(time, right_accel, left_accel, threshold=0.33, **kwargs): """ Obtain right and left foot strikes from the time series data of accelerometers placed on the heel. @@ -913,18 +921,16 @@ def gait_landmarks_from_accel(time, right_accel, left_accel, threshold=0.33, **k # ---------------- def filter(data): - from scipy.signal import blackman, firwin, filtfilt a = np.array([1]) # 10 Hz highpass - n = 127; # filter order - Wn = 10 / (sample_rate/2) # cut-off frequency - window = blackman(n) + n = 127 # filter order + Wn = 10.0 / (sample_rate / 2) # cut-off frequency b = firwin(n, Wn, window='blackman', pass_zero=False) data = filtfilt(b, a, data) - data = abs(data) # rectify signal + data = abs(data) # rectify signal # 5 Hz lowpass Wn = 5 / (sample_rate/2) @@ -942,7 +948,7 @@ def peak_detection(x): peaks = [] for i, spike in enumerate(ddx < 0): - if spike == True: + if spike: peaks.append(i) peaks = peaks[::2] @@ -1157,6 +1163,8 @@ def lower_extremity_2d_inverse_dynamics(time, marker_pos, marker_vel, # - +1(-1) if positive angle/moment in prox, joint corresponds to # counterclockwise(clockwise) rotation of segment + # TODO : Create a way for the user to pass in the body segment values. + Segment = namedtuple('Segment', ['name', 'num', 'prox_marker_idx', @@ -1323,7 +1331,8 @@ def lower_extremity_2d_inverse_dynamics(time, marker_pos, marker_vel, prox_force_x = mass * seg_com_x_acc[:, i] - dist_force_x prox_force_y = mass * seg_com_y_acc[:, i] - dist_force_y + mass * g - radius_of_gyration = segment.radius_of_gyration_fraction * seg_lengths[i] + radius_of_gyration = (segment.radius_of_gyration_fraction * + seg_lengths[i]) inertia = mass * radius_of_gyration**2 prox_moment = (inertia * seg_alpha[:, i] - dist_moment From a95e0b6417a08427d1dfebe185f7881e44220d7b Mon Sep 17 00:00:00 2001 From: "Jason K. Moore" Date: Fri, 27 Mar 2015 14:43:22 -0700 Subject: [PATCH 6/9] Filter the marker velocities before computing accelerations. --- gaitanalysis/gait.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/gaitanalysis/gait.py b/gaitanalysis/gait.py index 2d596ab..0d1587f 100644 --- a/gaitanalysis/gait.py +++ b/gaitanalysis/gait.py @@ -158,6 +158,9 @@ def _leg2d(self, time, marker_values, normalized_force_plate_values, marker_vel = process.derivative(time, marker_pos, method='combination') + marker_vel = process.butterworth(marker_vel, cutoff, sample_rate, + axis=0) + marker_acc = process.derivative(time, marker_vel, method='combination') From 12c63d182e8f41e45a6d6be851dfa95b6fa9dc25 Mon Sep 17 00:00:00 2001 From: "Jason K. Moore" Date: Tue, 31 Mar 2015 16:11:59 -0700 Subject: [PATCH 7/9] Differentiate all data before filtering it. --- gaitanalysis/gait.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/gaitanalysis/gait.py b/gaitanalysis/gait.py index 0d1587f..3eb9b5f 100644 --- a/gaitanalysis/gait.py +++ b/gaitanalysis/gait.py @@ -147,30 +147,32 @@ def __init__(self, data): f.close() self.load(data) - def _leg2d(self, time, marker_values, normalized_force_plate_values, + def _leg2d(self, time, marker_pos, normalized_force_plate_values, cutoff, sample_rate): """This method effectively does the same thing that the Octave routine does.""" - marker_pos = process.butterworth(marker_values, cutoff, sample_rate, - axis=0) - + # differentiate to get the marker velocities and accelerations marker_vel = process.derivative(time, marker_pos, method='combination') - - marker_vel = process.butterworth(marker_vel, cutoff, sample_rate, - axis=0) - marker_acc = process.derivative(time, marker_vel, method='combination') + # filter all the input data with the same filter + marker_pos = process.butterworth(marker_pos, cutoff, sample_rate, + axis=0) + marker_vel = process.butterworth(marker_vel, cutoff, sample_rate, + axis=0) + marker_acc = process.butterworth(marker_acc, cutoff, sample_rate, + axis=0) force_array = process.butterworth(normalized_force_plate_values, cutoff, sample_rate, axis=0) + # compute the inverse dynamics inv_dyn = lower_extremity_2d_inverse_dynamics - dynamics = inv_dyn(time, marker_pos, marker_vel, marker_acc, force_array) + return dynamics def inverse_dynamics_2d(self, left_leg_markers, right_leg_markers, From f6831f504dc721e0e9d61ee74ac2ef1aad90c553 Mon Sep 17 00:00:00 2001 From: "Jason K. Moore" Date: Tue, 31 Mar 2015 16:35:55 -0700 Subject: [PATCH 8/9] Made the inverse dynamics test stricter. --- gaitanalysis/tests/test_gait.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/gaitanalysis/tests/test_gait.py b/gaitanalysis/tests/test_gait.py index ea67575..cc2a009 100644 --- a/gaitanalysis/tests/test_gait.py +++ b/gaitanalysis/tests/test_gait.py @@ -11,8 +11,7 @@ from pandas.util.testing import assert_frame_equal from nose.tools import assert_raises from oct2py import octave -from dtk.process import (time_vector, butterworth, derivative, - coefficient_of_determination) +from dtk.process import time_vector, butterworth, derivative # local from ..gait import (find_constant_speed, interpolate, GaitData, @@ -420,6 +419,4 @@ def test_lower_extremity_2d_inverse_dynamics(): expected_res = (angles, velocities, moments, forces) for array, expected_array in zip(res, expected_res): - # NOTE : This is the best check I was able to come up with to - # compare the time series. - assert coefficient_of_determination(expected_array, array) > 0.99 + testing.assert_allclose(array, expected_array) From 9e80dfdcfe0a14b44e0ebcbadb6e9e827d215c3c Mon Sep 17 00:00:00 2001 From: "Jason K. Moore" Date: Fri, 3 Apr 2015 16:22:47 -0700 Subject: [PATCH 9/9] Speeds up the data frame interpolation function. The gait/interpolate function is only used in the GaitData.split_at() method and the way I orginally coded it using Pandas paradigms was very slow. This change speeds things up by about 40X. I had to change the test because it no longer supports nans in the DataFrame. Being that it is only used in the split_at method this seemed ok. So far I've never had nans in the data frame by the time it gets to the split at method. GaitData isn't built to handle nans, they should be fixed before creating a GaitData object. --- gaitanalysis/gait.py | 26 ++++++++++---------------- gaitanalysis/tests/test_gait.py | 12 ++++++------ 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/gaitanalysis/gait.py b/gaitanalysis/gait.py index 3eb9b5f..37e16fb 100644 --- a/gaitanalysis/gait.py +++ b/gaitanalysis/gait.py @@ -8,6 +8,7 @@ # external libraries import numpy as np from scipy.integrate import simps +from scipy.interpolate import interp1d from scipy.signal import firwin, filtfilt import matplotlib.pyplot as plt import pandas @@ -80,7 +81,7 @@ def find_constant_speed(time, speed, plot=False, filter_cutoff=1.0): def interpolate(data_frame, time): - """Returns a data frame with a index based on the provided time + """Returns a new data frame with a index based on the provided time array and linear interpolation. Parameters @@ -100,21 +101,14 @@ def interpolate(data_frame, time): """ - total_index = np.sort(np.hstack((data_frame.index.values, time))) - reindexed_data_frame = data_frame.reindex(total_index) - interpolated_data_frame = \ - reindexed_data_frame.apply(pandas.Series.interpolate, - method='values').loc[time] - - # If the first or last value of a series is NA then the interpolate - # function leaves it as an NA value, so use backfill to take care of - # those. - interpolated_data_frame = \ - interpolated_data_frame.fillna(method='backfill') - # Because the time vector may have matching indices as the original - # index (i.e. always the zero indice), drop any duplicates so the len() - # stays consistent - return interpolated_data_frame.drop_duplicates() + column_names = data_frame.columns + old_time = data_frame.index.values + vals = data_frame.values + + f = interp1d(old_time, vals, axis=0) + new_vals = f(time) + + return pandas.DataFrame(new_vals, index=time, columns=column_names) class GaitData(object): diff --git a/gaitanalysis/tests/test_gait.py b/gaitanalysis/tests/test_gait.py index cc2a009..f2b9538 100644 --- a/gaitanalysis/tests/test_gait.py +++ b/gaitanalysis/tests/test_gait.py @@ -34,10 +34,10 @@ def test_find_constant_speed(): def test_interpolate(): - df = pandas.DataFrame({'a': [np.nan, 3.0, 5.0, 7.0], - 'b': [5.0, np.nan, 9.0, 11.0], + df = pandas.DataFrame({'a': [2.0, 3.0, 5.0, 7.0], + 'b': [5.0, 8.0, 9.0, 11.0], 'c': [2.0, 4.0, 6.0, 8.0], - 'd': [0.5, 1.0, 1.5, np.nan]}, + 'd': [0.5, 1.0, 1.5, 2.5]}, index=[0.0, 2.0, 4.0, 6.0]) time = [0.0, 1.0, 3.0, 5.0] @@ -47,10 +47,10 @@ def test_interpolate(): # NOTE : pandas.Series.interpolate does not extrapolate (because # np.interp doesn't. - df_expected = pandas.DataFrame({'a': [4.0, 4.0, 4.0, 6.0], - 'b': [5.0, 6.0, 8.0, 10.0], + df_expected = pandas.DataFrame({'a': [2.0, 2.5, 4.0, 6.0], + 'b': [5.0, 6.5, 8.5, 10.0], 'c': [2.0, 3.0, 5.0, 7.0], - 'd': [0.5, 0.75, 1.25, 1.5]}, + 'd': [0.5, 0.75, 1.25, 2.0]}, index=time) testing.assert_allclose(interpolated.values, df_expected.values)