import numpy as np
import pandas as pd
from scipy.interpolate import UnivariateSpline
from scipy.spatial import KDTree
import deimos
[docs]def cosine(a, b):
'''
Cosine distance (1 - similarity) between two arrays.
Parameters
----------
a, b : :obj:`~numpy.array`
N-dimensional arrays.
Returns
-------
float
Cosine distance.
'''
a_ = a.flatten()
b_ = b.flatten()
return 1 - np.dot(a_, b_) / np.sqrt(a_.dot(a_) * b_.dot(b_))
[docs]def get_1D_profiles(features, dims=['mz', 'drift_time', 'retention_time']):
'''
Extract 1D profile for each of the indicated dimension(s).
Parameters
----------
features : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
dims : str or list
Dimensions considered in generating 1D profile(s).
Returns
-------
:obj:`dict` of :obj:`~scipy.interpolate.UnivariateSpline`
Dictionary indexed by dimension containing univariate
splines for each 1D profile.
'''
# Safely cast to list
dims = deimos.utils.safelist(dims)
profiles = {}
for dim in dims:
# Collapse to 1D profile
profile = deimos.collapse(features, keep=dim).sort_values(
by=dim, ignore_index=True)
# Interpolate spline
x = profile[dim].values
y = profile['intensity'].values
# Fit univariate spline
try:
uspline = UnivariateSpline(x, y, s=0, ext=1)
except:
def uspline(x): return np.zeros_like(x)
profiles[dim] = uspline
return profiles
[docs]def offset_correction_model(dt_ms2, mz_ms2, mz_ms1, ce=0,
params=[1.02067031, -0.02062323, 0.00176694]):
# Cast params as array
params = np.array(params).reshape(-1, 1)
# Convert collision energy to array
ce = np.ones_like(dt_ms2) * np.log(ce)
# Create constant vector
const = np.ones_like(dt_ms2)
# Sqrt
mu_ms1 = np.sqrt(mz_ms1)
mu_ms2 = np.sqrt(mz_ms2)
# Ratio
mu_ratio = mu_ms2 / mu_ms1
# Create dependent array
x = np.stack((const, mu_ratio, ce), axis=1)
# Predict
y = np.dot(x, params).flatten() * dt_ms2
return y
[docs]class MS2Deconvolution:
'''
Performs MS2 deconvolution by correlating non-m/z separation dimension
profiles and scoring the agreement between precursor and fragment.
'''
def __init__(self, ms1_features, ms1_data, ms2_features, ms2_data):
'''
Initializes :obj:`~deimos.calibration.ArrivalTimeCalibration` object.
Parameters
----------
ms1_features : :obj:`~pandas.DataFrame`
MS1 peak locations and intensities.
ms1_data : :obj:`~pandas.DataFrame`
Complete MS1 data.
ms2_features : :obj:`~pandas.DataFrame`
MS2 peak locations and intensities.
ms2_data : :obj:`~pandas.DataFrame`
Complete MS1 data.
'''
self.ms1_features = ms1_features
self.ms1_data = ms1_data
self.ms2_features = ms2_features
self.ms2_data = ms2_data
[docs] def construct_putative_pairs(self, dims=['drift_time', 'retention_time'],
low=[-0.12, -0.1], high=[1.4, 0.1], ce=None,
model=offset_correction_model,
require_ms1_greater_than_ms2=True,
error_tolerance=0.12):
'''
Determine each possible MS1:MS2 pair within specified tolerances. If
considering drift time, apply a model to correct for drift time
offset such that MS2 can be downselected by respective error in drift
time (i.e. a poor model correction suggests an incorrect MS1:MS2 pair).
Parameters
----------
dims : str or list
Dimension(s) by which to match MS1:MS2.
low : float or list
Lower bound(s) in each dimension.
high : float or list
Upper bound(s) in each dimension.
ce : float
Collision energy of MS2 collection.
model : function
Model used to correct for drift time offset between MS1 precursor
and MS2 fragment. If omitted, a default model is used.
require_ms1_greater_than_ms2 : bool
Signal whether precursor intensity must be greater than fragment
intensity for putative assignments.
error_tolerance : float
Acceptable difference between precursor and fragment drift times
following correction by offset model.
Returns
-------
:obj:`~pandas.DataFrame`
All MS1:MS2 pairings and associated agreement scores per requested dimension.
'''
# Safely cast to list
dims = deimos.utils.safelist(dims)
low = deimos.utils.safelist(low)
high = deimos.utils.safelist(high)
# Check dims
deimos.utils.check_length([dims, low, high])
# Check collision energy is supplied
if ce is None:
raise ValueError("Collision energy must be specified.")
# Match vectors
v_ms1 = self.ms1_features[dims].values
v_ms2 = self.ms2_features[dims].values
# Normalize dims for query
for i, dim in enumerate(dims):
# Cast range as radius
tol = (high[i] - low[i]) / 2
# Offset to center search
v_ms2[:, i] = v_ms2[:, i] + tol + low[i]
# Normalize
v_ms1[:, i] = v_ms1[:, i] / tol
v_ms2[:, i] = v_ms2[:, i] / tol
# Create k-d trees
ms1_tree = KDTree(v_ms1)
ms2_tree = KDTree(v_ms2)
# Query
sdm = ms1_tree.sparse_distance_matrix(
ms2_tree, 1, p=np.inf, output_type='coo_matrix')
# Pairs within tolerance
ms1_matches = self.ms1_features.loc[sdm.row, :].reset_index()
ms2_matches = self.ms2_features.loc[sdm.col, :].reset_index()
# Correct drift time
if 'drift_time' in dims:
ms2_matches['drift_time_raw'] = ms2_matches['drift_time'].values
ms2_matches['drift_time'] = model(ms2_matches['drift_time'].values,
ms2_matches['mz'].values,
ms1_matches['mz'].values, ce=ce)
# Rename columns
ms1_matches.columns = [x + '_ms1' for x in ms1_matches.columns]
ms2_matches.columns = [x + '_ms2' for x in ms2_matches.columns]
# Construct MS1:MS2 data frame
decon_pairs = pd.concat((ms1_matches,
ms2_matches), axis=1)
# Compute offset correction error
if 'drift_time' in dims:
decon_pairs['drift_time_error'] = np.abs(
decon_pairs['drift_time_ms1'] - decon_pairs['drift_time_ms2'])
# MS1 intensity greater than MS2 intensity
if require_ms1_greater_than_ms2 is True:
decon_pairs = decon_pairs.loc[decon_pairs['intensity_ms1']
>= decon_pairs['intensity_ms2'], :]
# Error tolerance
if 'drift_time' in dims:
decon_pairs = decon_pairs.loc[decon_pairs['drift_time_error']
<= error_tolerance, :]
# Remove empty groups
decon_pairs = decon_pairs.groupby(
by='index_ms1', as_index=False).filter(lambda x: len(x) > 0)
# Sort by index
decon_pairs = decon_pairs.sort_values(
by=['index_ms1', 'index_ms2']).reset_index(drop=True)
self.decon_pairs = decon_pairs
return self.decon_pairs
[docs] def apply(self, dims=['drift_time', 'retention_time'], resolution=[0.01, 0.01]):
'''
Perform deconvolution according to mathed features and their
extracted profiles.
Parameters
----------
dims : : str or list
Dimensions(s) for which to calculate MS1:MS2 correspondence
by 1D profile agreement (i.e. non-m/z).
resolution : float or list
Resolution applied to per-dimension profile interpolations.
Returns
-------
:obj:`~pandas.DataFrame`
All MS1:MS2 pairings and associated agreement scores.
'''
# Safely cast to list
dims = deimos.utils.safelist(dims)
resolution = deimos.utils.safelist(resolution)
# Check dims
deimos.utils.check_length([dims, resolution])
# Ensure m/z not in dims
if 'mz' in dims:
raise ValueError('MS1:MS2 similarity does not consider m/z.')
# Recast as dictionaries indexed by dimension
self.profile_resolution = {k: v for k, v in zip(dims, resolution)}
# Extracted ions
ms1_xis = self.profiler(self.ms1_features, self.ms1_data)
ms2_xis = self.profiler(self.ms2_features, self.ms2_data)
# Container for profile similarity scores
scores = {dim: [] for dim in dims}
# Enumerate MS1 featres
for name, grp in self.decon_pairs.groupby(by=['index_ms1'], as_index=False):
# MS1 feature index
idx_i = int(name[0])
# Extracted ion
ms1_xi = ms1_xis[idx_i]
# Build MS1 profile
ms1_profiles = get_1D_profiles(ms1_xi, dims=dims)
# Shared interpolation axis
newx = {}
for dim in dims:
# Determine upper and lower bounds
if self.profile_relative[dim] is True:
lb = min(grp[dim + "_ms1"].min(), grp[dim +
"_ms2"].min()) * (1 + self.profile_low[dim])
ub = max(grp[dim + "_ms1"].max(), grp[dim +
"_ms2"].max()) * (1 + self.profile_high[dim])
else:
lb = min(grp[dim + "_ms1"].min(), grp[dim +
"_ms2"].min()) + self.profile_low[dim]
ub = max(grp[dim + "_ms1"].max(), grp[dim +
"_ms2"].max()) + self.profile_high[dim]
# Determine shared x-axis
newx[dim] = np.arange(lb, ub, self.profile_resolution[dim])
# Evaluate MS1 profile
ms1_profiles = {dim: ms1_profiles[dim](newx[dim]) for dim in dims}
# Enumerate MS2 features
for j, row in grp.reset_index().iterrows():
# MS2 feature index
idx_j = int(row['index_ms2'])
# Extracted ion
ms2_xi = ms2_xis[idx_j].copy()
if 'drift_time' in dims:
# Determine offset
offset = row['drift_time_ms2'] - row['drift_time_raw_ms2']
# Apply ofset
ms2_xi['drift_time'] += offset
# Build MS2 profile
ms2_profiles = get_1D_profiles(ms2_xi, dims=dims)
# Evaluate MS2 profile
ms2_profiles = {dim: ms2_profiles[dim](
newx[dim]) for dim in dims}
# Compute similarity
for dim in dims:
scores[dim].append(
1 - cosine(ms1_profiles[dim], ms2_profiles[dim]))
# Append score columns
for dim in dims:
self.decon_pairs[dim + '_score'] = np.array(scores[dim])
return self.decon_pairs