diff --git a/src/fastmdanalysis/analysis/dihedrals.py b/src/fastmdanalysis/analysis/dihedrals.py index eca2762..fa40b02 100644 --- a/src/fastmdanalysis/analysis/dihedrals.py +++ b/src/fastmdanalysis/analysis/dihedrals.py @@ -13,11 +13,12 @@ from __future__ import annotations from typing import Dict, Optional, Sequence, Union, List +from pathlib import Path import logging import numpy as np import mdtraj as md -from scipy.stats import circmean +from scipy.stats import circmean, circstd import matplotlib matplotlib.use("Agg") @@ -127,21 +128,25 @@ def run(self) -> Dict[str, np.ndarray]: else: self.residue_indices = full_res_indices - # Circular mean per residue + # Circular mean and std per residue n_residues = angles.shape[1] avg_angles = np.zeros(n_residues) + std_angles = np.zeros(n_residues) for i in range(n_residues): res_angles = angles[:, i] # Remove NaN values (missing angles) valid_angles = res_angles[~np.isnan(res_angles)] if len(valid_angles) > 0: avg_angles[i] = circmean(valid_angles, high=np.pi, low=-np.pi) + std_angles[i] = circstd(valid_angles, high=np.pi, low=-np.pi) else: avg_angles[i] = np.nan + std_angles[i] = np.nan # Convert units if self.units == "degrees": avg_angles = np.degrees(avg_angles) + std_angles = np.degrees(std_angles) self.data = avg_angles.reshape(-1, 1) # If residues were provided, self.data is already filtered. @@ -149,6 +154,7 @@ def run(self) -> Dict[str, np.ndarray]: "phi_avg": self.data, "phi_avg_filtered": self.data, "phi_residues": self.residue_indices, + "phi_std": std_angles.reshape(-1, 1), } # Save data @@ -172,6 +178,7 @@ def plot( self, data: Optional[Union[Sequence[float], np.ndarray]] = None, *, + std_data: Optional[Union[Sequence[float], np.ndarray]] = None, residues: Optional[Union[int, Sequence[int]]] = None, highlight_residues: Optional[Union[int, Sequence[int]]] = None, figsize=(12, 6), @@ -201,10 +208,15 @@ def plot( """ if data is None: data = self.data + if std_data is None: + std_data = self.results.get("phi_std") if data is None: raise AnalysisError("No phi data available to plot.") y = np.asarray(data, dtype=float).flatten() + yerr = None + if std_data is not None: + yerr = np.asarray(std_data, dtype=float).flatten() # X-axis should reflect residue indices of the computed data (not 0..N-1) if self.residue_indices is not None and len(self.residue_indices) == len(y): @@ -222,7 +234,16 @@ def plot( # Plot fig, ax = plt.subplots(figsize=figsize) - ax.bar(x, y, color=color, **kwargs) + ax.errorbar( + x, + y, + yerr=yerr, + fmt="o", + color=color, + ecolor=kwargs.pop("ecolor", "gray"), + capsize=kwargs.pop("capsize", 3), + **kwargs, + ) ax.set_title(title) ax.set_xlabel(xlabel) @@ -237,7 +258,7 @@ def plot( for res in highlight_residues: if res in x: idx = np.where(x == res)[0][0] - ax.bar([x[idx]], [y[idx]], color="red", alpha=0.7) + ax.scatter([x[idx]], [y[idx]], color="red", alpha=0.7, zorder=3) apply_slide_style(ax, x_values=x, y_values=y) fig.tight_layout() @@ -309,22 +330,27 @@ def run(self) -> Dict[str, np.ndarray]: n_residues = angles.shape[1] avg_angles = np.zeros(n_residues) + std_angles = np.zeros(n_residues) for i in range(n_residues): res_angles = angles[:, i] valid_angles = res_angles[~np.isnan(res_angles)] if len(valid_angles) > 0: avg_angles[i] = circmean(valid_angles, high=np.pi, low=-np.pi) + std_angles[i] = circstd(valid_angles, high=np.pi, low=-np.pi) else: avg_angles[i] = np.nan + std_angles[i] = np.nan if self.units == "degrees": avg_angles = np.degrees(avg_angles) + std_angles = np.degrees(std_angles) self.data = avg_angles.reshape(-1, 1) self.results = { "psi_avg": self.data, "psi_avg_filtered": self.data, "psi_residues": self.residue_indices, + "psi_std": std_angles.reshape(-1, 1), } self._save_data(self.data, "psi_avg", header=f"psi_avg_{self.units}") @@ -344,10 +370,15 @@ def plot(self, **kwargs) -> str: if "data" not in kwargs: kwargs["data"] = self.data + if "std_data" not in kwargs: + kwargs["std_data"] = self.results.get("psi_std") if kwargs["data"] is None: raise AnalysisError("No psi data available to plot.") y = np.asarray(kwargs["data"], dtype=float).flatten() + yerr = None + if kwargs.get("std_data") is not None: + yerr = np.asarray(kwargs["std_data"], dtype=float).flatten() if self.residue_indices is not None and len(self.residue_indices) == len(y): x = self.residue_indices.astype(int) else: @@ -363,7 +394,33 @@ def plot(self, **kwargs) -> str: y = y[mask] fig, ax = plt.subplots(figsize=kwargs.get("figsize", (12, 6))) - ax.bar(x, y, color=kwargs.get("color", "green"), **{k: v for k, v in kwargs.items() if k not in ["data", "residues", "figsize", "title", "xlabel", "ylabel", "color", "highlight_residues"]}) + ax.errorbar( + x, + y, + yerr=yerr, + fmt="o", + color=kwargs.get("color", "green"), + ecolor=kwargs.get("ecolor", "gray"), + capsize=kwargs.get("capsize", 3), + **{ + k: v + for k, v in kwargs.items() + if k + not in [ + "data", + "std_data", + "residues", + "figsize", + "title", + "xlabel", + "ylabel", + "color", + "highlight_residues", + "ecolor", + "capsize", + ] + }, + ) ax.set_title(kwargs.get("title", "Average Psi Angles per Residue")) ax.set_xlabel(kwargs.get("xlabel", "Residue Index")) @@ -377,7 +434,7 @@ def plot(self, **kwargs) -> str: for res in highlight_residues: if res in x: idx = np.where(x == res)[0][0] - ax.bar([x[idx]], [y[idx]], color="red", alpha=0.7) + ax.scatter([x[idx]], [y[idx]], color="red", alpha=0.7, zorder=3) apply_slide_style(ax, x_values=x, y_values=y) fig.tight_layout() @@ -448,22 +505,27 @@ def run(self) -> Dict[str, np.ndarray]: n_residues = angles.shape[1] avg_angles = np.zeros(n_residues) + std_angles = np.zeros(n_residues) for i in range(n_residues): res_angles = angles[:, i] valid_angles = res_angles[~np.isnan(res_angles)] if len(valid_angles) > 0: avg_angles[i] = circmean(valid_angles, high=np.pi, low=-np.pi) + std_angles[i] = circstd(valid_angles, high=np.pi, low=-np.pi) else: avg_angles[i] = np.nan + std_angles[i] = np.nan if self.units == "degrees": avg_angles = np.degrees(avg_angles) + std_angles = np.degrees(std_angles) self.data = avg_angles.reshape(-1, 1) self.results = { "omega_avg": self.data, "omega_avg_filtered": self.data, "omega_residues": self.residue_indices, + "omega_std": std_angles.reshape(-1, 1), } self._save_data(self.data, "omega_avg", header=f"omega_avg_{self.units}") @@ -482,10 +544,15 @@ def plot(self, **kwargs) -> str: if "data" not in kwargs: kwargs["data"] = self.data + if "std_data" not in kwargs: + kwargs["std_data"] = self.results.get("omega_std") if kwargs["data"] is None: raise AnalysisError("No omega data available to plot.") y = np.asarray(kwargs["data"], dtype=float).flatten() + yerr = None + if kwargs.get("std_data") is not None: + yerr = np.asarray(kwargs["std_data"], dtype=float).flatten() if self.residue_indices is not None and len(self.residue_indices) == len(y): x = self.residue_indices.astype(int) else: @@ -501,7 +568,33 @@ def plot(self, **kwargs) -> str: y = y[mask] fig, ax = plt.subplots(figsize=kwargs.get("figsize", (12, 6))) - ax.bar(x, y, color=kwargs.get("color", "orange"), **{k: v for k, v in kwargs.items() if k not in ["data", "residues", "figsize", "title", "xlabel", "ylabel", "color", "highlight_residues"]}) + ax.errorbar( + x, + y, + yerr=yerr, + fmt="o", + color=kwargs.get("color", "orange"), + ecolor=kwargs.get("ecolor", "gray"), + capsize=kwargs.get("capsize", 3), + **{ + k: v + for k, v in kwargs.items() + if k + not in [ + "data", + "std_data", + "residues", + "figsize", + "title", + "xlabel", + "ylabel", + "color", + "highlight_residues", + "ecolor", + "capsize", + ] + }, + ) ax.set_title(kwargs.get("title", "Average Omega Angles per Residue")) ax.set_xlabel(kwargs.get("xlabel", "Residue Index")) @@ -515,7 +608,7 @@ def plot(self, **kwargs) -> str: for res in highlight_residues: if res in x: idx = np.where(x == res)[0][0] - ax.bar([x[idx]], [y[idx]], color="red", alpha=0.7) + ax.scatter([x[idx]], [y[idx]], color="red", alpha=0.7, zorder=3) apply_slide_style(ax, x_values=x, y_values=y) fig.tight_layout() @@ -618,6 +711,8 @@ def plot_ramachandran( self, phi_data: Optional[np.ndarray] = None, psi_data: Optional[np.ndarray] = None, + phi_std: Optional[np.ndarray] = None, + psi_std: Optional[np.ndarray] = None, residues: Optional[Union[int, Sequence[int]]] = None, figsize=(8, 8), title: str = "Ramachandran Plot (Average Angles)", @@ -630,12 +725,17 @@ def plot_ramachandran( phi_data = self.results["phi_avg"].flatten() if psi_data is None and "psi_avg" in self.results: psi_data = self.results["psi_avg"].flatten() + if phi_std is None and "phi_std" in self.results: + phi_std = self.results["phi_std"].flatten() + if psi_std is None and "psi_std" in self.results: + psi_std = self.results["psi_std"].flatten() if phi_data is None or psi_data is None: raise AnalysisError("Phi and psi data required for Ramachandran plot") x = phi_data y = psi_data + # Use residue indices when the per-angle analyses were residue-filtered res_indices = self.results.get("phi_residues") if res_indices is None: @@ -643,30 +743,94 @@ def plot_ramachandran( else: res_indices = np.asarray(res_indices, dtype=int) - # Filter residues + if residues is None: + residues = self.residues + res_list = None if residues is not None: if isinstance(residues, int): residues = [residues] - mask = np.isin(res_indices, residues) + res_list = list(residues) + mask = np.isin(res_indices, res_list) x = x[mask] y = y[mask] res_indices = res_indices[mask] + if phi_std is not None: + phi_std = phi_std[mask] + if psi_std is not None: + psi_std = psi_std[mask] fig, ax = plt.subplots(figsize=figsize) - scatter = ax.scatter(x, y, c=res_indices, cmap="viridis", **kwargs) + cmap = plt.get_cmap("viridis") + norm = plt.Normalize(vmin=res_indices.min(), vmax=res_indices.max()) if len(res_indices) else None + + for i, res in enumerate(res_indices): + color = cmap(norm(res)) if norm is not None else "blue" + ax.errorbar( + x[i], + y[i], + xerr=None if phi_std is None else phi_std[i], + yerr=None if psi_std is None else psi_std[i], + fmt="o", + color=color, + ecolor=color, + capsize=3, + **{k: v for k, v in kwargs.items() if k not in ["capsize"]}, + ) + ax.set_title(title) ax.set_xlabel(f"Phi ({self.units})") ax.set_ylabel(f"Psi ({self.units})") ax.grid(True, alpha=0.3) - # Add colorbar - cbar = plt.colorbar(scatter, ax=ax) - cbar.set_label("Residue Index") + if len(res_indices): + mappable = matplotlib.cm.ScalarMappable(norm=norm, cmap=cmap) + cbar = plt.colorbar(mappable, ax=ax) + cbar.set_label("Residue Index") fig.tight_layout() outpath = self._save_plot(fig, "ramachandran") plt.close(fig) + # If residues are selected, also generate per-residue frame-level plots + if res_list: + try: + _, phi_angles = md.compute_phi(self.traj) + _, psi_angles = md.compute_psi(self.traj) + if phi_angles.size and psi_angles.size: + phi_angles = phi_angles[:, res_list] + psi_angles = psi_angles[:, res_list] + + if self.units == "degrees": + phi_angles = np.degrees(phi_angles) + psi_angles = np.degrees(psi_angles) + + per_residue: Dict[int, Path] = {} + for idx, res in enumerate(res_list): + fig_res, ax_res = plt.subplots(figsize=figsize) + ax_res.scatter( + phi_angles[:, idx], + psi_angles[:, idx], + color=kwargs.get("color", "blue"), + alpha=kwargs.get("alpha", 0.7), + ) + ax_res.set_title(f"Ramachandran Plot — Residue {res}") + ax_res.set_xlabel(f"Phi ({self.units})") + ax_res.set_ylabel(f"Psi ({self.units})") + ax_res.grid(True, alpha=0.3) + fig_res.tight_layout() + + per_path = self._save_plot( + fig_res, + "ramachandran", + filename=f"ramachandran_res{res}", + ) + plt.close(fig_res) + per_residue[res] = per_path + + self.results["ramachandran_per_residue"] = per_residue + except Exception as exc: + logger.warning("Per-residue Ramachandran plots failed: %s", exc) + return outpath def plot(self, **kwargs) -> str: