Source code for deimos.isotopes

import numpy as np
import pandas as pd
import scipy

import deimos


[docs]def OrderedSet(x): return list({k: None for k in x})
[docs]def detect(features, dims=['mz', 'drift_time', 'retention_time'], tol=[0.1, 0.2, 0.3], delta=1.003355, max_isotopes=4, max_charge=1, max_error=50E-6): ''' Perform isotope detection according to expected patterning. Parameters ---------- features : :obj:`~pandas.DataFrame` Input feature coordinates and intensities. dims : str or list Dimensions to perform isotope detection in. tol : float or list Tolerance in each dimension to be considered a match. delta : float Expected spacing between isotopes (e.g. C_13=1.003355). max_isotopes : int Maximum number of isotopes to search for per parent feature. max_charge : int Maximum charge to search for per parent feature. max_error : float Maximum relative error between search pattern and putative isotopic feature. Returns ------- :obj:`~pandas.DataFrame` Features grouped by isotopic pattern. ''' # Safely cast to list dims = deimos.utils.safelist(dims) tol = deimos.utils.safelist(tol) # Check dims deimos.utils.check_length([dims, tol]) # Isolate mz dimension mz_idx = dims.index('mz') else_idx = [i for i, j in enumerate(dims) if i != mz_idx] isotopes = [] idx = [] # Tolerance in other dimensions for i in else_idx: arr = features[dims[i]].values.reshape((-1, 1)) dist = scipy.spatial.distance.cdist(arr, arr) # Less than tolerance idx.append(dist <= tol[i]) # Stack truth arrays idx = np.prod(np.dstack(idx), axis=-1) # Half matrix idx = np.tril(idx, k=-1) # Isotopic distances arr = features[dims[mz_idx]].values.reshape((-1, 1)) d = scipy.spatial.distance.cdist(arr, arr) d = np.multiply(d, idx) # Enumerate putative spacings for charge in range(1, max_charge + 1): for mult in range(1, max_isotopes + 1): dx_i = mult * (delta / charge) r, c = np.where((d > dx_i - tol[mz_idx]) & (d < dx_i + tol[mz_idx])) a = features.iloc[c, :] b = features.iloc[r, :] z = charge * np.ones(len(a)) m = mult * np.ones(len(a)) dx_i = dx_i * np.ones(len(a)) isotopes.append(pd.DataFrame(np.vstack((a['mz'].values, a['intensity'].values, z, m, dx_i, b['mz'].values, b['intensity'].values, a.index.values, b.index.values)).T, columns=['mz', 'intensity', 'charge', 'multiple', 'dx', 'mz_iso', 'intensity_iso', 'idx', 'idx_iso'])) # Combine isotopes = pd.concat(isotopes, axis=0, ignore_index=True) # Stats isotopes['error'] = np.abs( (isotopes['mz_iso'] - isotopes['mz']) - isotopes['dx']) / isotopes['mz'] isotopes['decay'] = isotopes['intensity_iso'] / isotopes['intensity'] # Cull non-decreasing isotopes = isotopes.loc[isotopes['intensity'] > isotopes['intensity_iso'], :] # Cull high error isotopes = isotopes.loc[isotopes['error'] < max_error, :] # Cull children isotopes = isotopes.loc[~isotopes['idx'].isin(isotopes['idx_iso']), :] # Group by parent grouped = isotopes.groupby(by=['mz', 'charge', 'idx', 'intensity'], as_index=False).agg(OrderedSet) grouped['n'] = [len(x) for x in grouped['multiple'].values] # grouped['n_sum'] = [sum(x) for x in grouped['multiple'].values] # grouped['check'] = np.abs(grouped['n'] * (grouped['n'] + 1) / 2 - grouped['n_sum']) return grouped.sort_values(by=['intensity', 'n'], ascending=False).reset_index(drop=True)