import multiprocessing as mp
from functools import partial
import dask.dataframe as dd
import numpy as np
import pandas as pd
import deimos
[docs]def threshold(features, by='intensity', threshold=0):
'''
Thresholds input :obj:`~pandas.DataFrame` using `by` keyword, greater than
value passed to `threshold`.
Parameters
----------
features : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
by : str
Variable to threshold by.
threshold : float
Threshold value.
Returns
-------
:obj:`~pandas.DataFrame`
Thresholded feature coordinates.
'''
return features.loc[features[by] > threshold, :].reset_index(drop=True)
[docs]def collapse(features, keep=['mz', 'drift_time', 'retention_time'], how=np.sum):
'''
Collpases input data such that only specified dimensions remain, according
to the supplied aggregation function.
Parameters
----------
features : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
keep : str or list
Dimensions to keep during collapse operation.
how : function or str
Aggregation function for collapse operation.
Returns
-------
:obj:`~pandas.DataFrame`
Collapsed feature coordinates and aggregated
intensities.
'''
return features.groupby(by=keep,
as_index=False,
sort=False).agg({'intensity': how})
[docs]def locate(features, by=['mz', 'drift_time', 'retention_time'],
loc=[0, 0, 0], tol=[0, 0, 0], return_index=False):
'''
Given a coordinate and tolerances, return a subset of the
data.
Parameters
----------
features : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
by : str or list
Dimension(s) by which to subset the data.
loc : float or list
Coordinate location.
tol : float or list
Tolerance in each dimension.
return_index : bool
Return boolean index of subset if True.
Returns
-------
:obj:`~pandas.DataFrame`
Subset of feature coordinates and intensities.
:obj:`~numpy.array`
If `return_index` is True, boolean index of subset elements,
i.e. `features[index] = subset`.
'''
# Safely cast to list
by = deimos.utils.safelist(by)
loc = deimos.utils.safelist(loc)
tol = deimos.utils.safelist(tol)
# Check dims
deimos.utils.check_length([by, loc, tol])
if features is None:
if return_index is True:
return None, None
else:
return None
# Store index
rindex = features.index.values
# Extend columns
cols = features.columns
cidx = [cols.get_loc(x) for x in by]
# Subset by each dim
features = features.values
idx = np.full(features.shape[0], True, dtype=bool)
for i, x, dx in zip(cidx, loc, tol):
idx *= (features[:, i] <= x + dx) & (features[:, i] >= x - dx)
features = features[idx]
rindex = rindex[idx]
if return_index is True:
# Data found
if features.shape[0] > 0:
return pd.DataFrame(features, index=rindex, columns=cols), idx
# No data
return None, idx
else:
# Data found
if features.shape[0] > 0:
return pd.DataFrame(features, index=rindex, columns=cols)
# No data
return None
[docs]def locate_asym(features, by=['mz', 'drift_time', 'retention_time'],
loc=[0, 0, 0], low=[0, 0, 0], high=[0, 0, 0],
relative=[False, False, False], return_index=False):
'''
Given a coordinate and asymmetrical tolerances, return a subset of the
data.
Parameters
----------
features : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
by : str or list
Dimension(s) by which to subset the data.
loc : float or list
Coordinate location.
low : float or list
Lower tolerance(s) in each dimension.
high : float or list
Upper tolerance(s) in each dimension.
relative : bool or list
Whether to use relative or absolute tolerance per dimension.
return_index : bool
Return boolean index of subset if True.
Returns
-------
:obj:`~pandas.DataFrame`
Subset of feature coordinates and intensities.
:obj:`~numpy.array`
If `return_index` is True, boolean index of subset elements,
i.e. `features[index] = subset`.
'''
# Safely cast to list
by = deimos.utils.safelist(by)
loc = deimos.utils.safelist(loc)
low = deimos.utils.safelist(low)
high = deimos.utils.safelist(high)
relative = deimos.utils.safelist(relative)
# Check dims
deimos.utils.check_length([by, loc, low, high, relative])
# Determine bounds
lb = []
ub = []
for x, lower, upper, rel in zip(loc, low, high, relative):
if rel is True:
lb.append(x * (1 + lower))
ub.append(x * (1 + upper))
else:
lb.append(x + lower)
ub.append(x + upper)
return deimos.subset.slice(features, by=by, low=lb, high=ub, return_index=return_index)
[docs]def slice(features, by=['mz', 'drift_time', 'retention_time'],
low=[0, 0, 0], high=[0, 0, 0], return_index=False):
'''
Given a feature coordinate and bounds, return a subset of the data.
Parameters
----------
features : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
by : str or list
Dimensions(s) by which to subset the data
low : float or list
Lower bound(s) in each dimension.
high : float or list
Upper bound(s) in each dimension.
return_index : bool
Return boolean index of subset if True.
Returns
-------
:obj:`~pandas.DataFrame`
Subset of feature coordinates and intensities.
:obj:`~numpy.array`
If `return_index` is True, boolean index of subset elements,
i.e. `features[index] = subset`.
'''
# Safely cast to list
by = deimos.utils.safelist(by)
low = deimos.utils.safelist(low)
high = deimos.utils.safelist(high)
# Check dims
deimos.utils.check_length([by, low, high])
if features is None:
if return_index is True:
return None, None
else:
return None
# Store index
rindex = features.index.values
# Extend columns
cols = features.columns
cidx = [cols.get_loc(x) for x in by]
# Subset by each dim
features = features.values
idx = np.full(features.shape[0], True, dtype=bool)
for i, lb, ub in zip(cidx, low, high):
idx *= (features[:, i] <= ub) & (features[:, i] >= lb)
features = features[idx]
rindex = rindex[idx]
if return_index is True:
# Data found
if features.shape[0] > 0:
return pd.DataFrame(features, index=rindex, columns=cols), idx
# No data
return None, idx
else:
# Data found
if features.shape[0] > 0:
return pd.DataFrame(features, index=rindex, columns=cols)
# No data
return None
[docs]class Partitions:
'''
Generator object that will lazily build and return each partition.
Attributes
----------
features : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
split_on : str
Dimension to partition the data.
size : int
Target partition size.
overlap : float
Amount of overlap between partitions to ameliorate edge effects.
'''
def __init__(self, features, split_on='mz', size=1000, overlap=0.05):
'''
Initialize :obj:`~deimos.subset.Partitions` instance.
Parameters
----------
features : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
split_on : str
Dimension to partition the data.
size : int
Target partition size.
overlap : float
Amount of overlap between partitions to ameliorate edge effects.
'''
self.features = features
self.split_on = split_on
self.size = size
self.overlap = overlap
self._compute_splits()
[docs] def _compute_splits(self):
'''
Determines data splits for partitioning.
'''
# Unique to split on
idx = np.unique(self.features[self.split_on].values)
# Number of partitions
partitions = np.ceil(len(idx) / self.size)
# Determine partition bounds
bounds = [[x.min(), x.max()] for x in np.array_split(idx, partitions)]
for i in range(1, len(bounds)):
bounds[i][0] = bounds[i - 1][1] - self.overlap
if (self.overlap > 0) & (len(bounds) > 1):
# Functional bounds
fbounds = []
for i in range(len(bounds)):
a, b = bounds[i]
# First partition
if i < 1:
b = b - self.overlap / 2
# Middle partitions
elif i < len(bounds) - 1:
a = a + self.overlap / 2
b = b - self.overlap / 2
# Last partition
else:
a = a + self.overlap / 2
fbounds.append([a, b])
else:
fbounds = bounds
self.bounds = bounds
self.fbounds = fbounds
def __iter__(self):
'''
Yields each partition.
Yields
------
:obj:`~pandas.DataFrame`
Partition of feature coordinates and intensities.
'''
for a, b in self.bounds:
yield slice(self.features, by=self.split_on, low=a, high=b)
[docs] def map(self, func, processes=1, **kwargs):
'''
Maps `func` to each partition, then returns the combined result,
accounting for overlap regions.
Parameters
----------
func : function
Function to apply to partitions.
processes : int
Number of parallel processes. If less than 2, a serial mapping is
applied.
kwargs
Keyword arguments passed to `func`.
Returns
-------
:obj:`~pandas.DataFrame`
Combined result of `func` applied to partitions.
'''
# Serial
if processes < 2:
result = [func(x, **kwargs) for x in self]
# Parallel
else:
with mp.Pool(processes=processes) as p:
result = list(p.imap(partial(func, **kwargs), self))
# Reconcile overlap
result = [slice(result[i], by=self.split_on, low=a, high=b)
for i, (a, b) in enumerate(self.fbounds)]
# Combine partitions
return pd.concat(result).reset_index(drop=True)
[docs] def zipmap(self, func, b, processes=1, **kwargs):
'''
Maps `func` to each partition pair resulting from the zip operation of
`self` and `b`, then returns the combined result, accounting for
overlap regions.
Parameters
----------
func : function
Function to apply to zipped partitions. Must accept and return two
:obj:`~pandas.DataFrame` instances.
b : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
processes : int
Number of parallel processes. If less than 2, a serial mapping is
applied.
kwargs
Keyword arguments passed to `func`.
Returns
-------
a, b : :obj:`~pandas.DataFrame`
Result of `func` applied to paired partitions.
'''
# Partition other dataset
partitions = (slice(b, by=self.split_on, low=a, high=b_)
for a, b_ in self.bounds)
# Serial
if processes < 2:
result = [func(a, b_, **kwargs) for a, b_ in zip(self, partitions)]
# Parallel
else:
with mp.Pool(processes=processes) as p:
result = list(p.starmap(partial(func, **kwargs),
zip(self, partitions)))
result = {'a': [x[0] for x in result], 'b': [x[1] for x in result]}
# Reconcile overlap
tmp = [slice(result['a'][i], by=self.split_on, low=a, high=b_,
return_index=True)
for i, (a, b_) in enumerate(self.fbounds)]
result['a'] = [x[0] for x in tmp]
idx = [x[1] for x in tmp]
result['b'] = [p.iloc[i, :] if i is not None else None for p,
i in zip(result['b'], idx)]
# Combine partitions
result['a'] = pd.concat(result['a'])
result['b'] = pd.concat(result['b'])
return result['a'], result['b']
[docs]class MultiSamplePartitions:
'''
Generator object that will lazily build and return each partition constructed
from multiple samples.
Attributes
----------
features : :obj:`~pandas.DataFrame` or :obj:`~dask.dataframe.DataFrame`
Input feature coordinates and intensities.
split_on : str
Dimension to partition the data.
size : int
Target partition size.
tol : float
Largest allowed distance between unique `split_on` observations.
'''
def __init__(self, features, split_on='mz', size=500, tol=25E-6):
'''
Initialize :obj:`~deimos.subset.Partitions` instance.
Parameters
----------
features : :obj:`~pandas.DataFrame` or :obj:`~dask.dataframe.DataFrame`
Input feature coordinates and intensities.
split_on : str
Dimension to partition the data.
size : int
Target partition size.
tol : float
Largest allowed distance between unique `split_on` observations.
'''
self.features = features
self.split_on = split_on
self.size = size
self.tol = tol
if isinstance(features, dd.DataFrame):
self.dask = True
else:
self.dask = False
self._compute_splits()
[docs] def _compute_splits(self):
'''
Determines data splits for partitioning.
'''
self.counter = 0
if self.dask:
idx = self.features.groupby(
by=self.split_on).size().compute().sort_index()
else:
idx = self.features.groupby(by=self.split_on).size().sort_index()
counts = idx.values
idx = idx.index
dxs = np.diff(idx) / idx[:-1]
bins = []
current_count = counts[0]
current_bin = [idx[0]]
self._counts = []
for i, dx in zip(range(1, len(idx)), dxs):
if (current_count + counts[i] <= self.size) or (dx <= self.tol):
current_bin.append(idx[i])
current_count += counts[i]
else:
bins.append(np.array(current_bin))
self._counts.append(current_count)
current_bin = [idx[i]]
current_count = counts[i]
# Add last unadded bin
bins.append(np.array(current_bin))
self._counts.append(current_count)
self.bounds = np.array([[x.min(), x.max()] for x in bins])
def __iter__(self):
return self
def __next__(self):
if self.counter < len(self.bounds):
q = '({} >= {}) & ({} <= {})'.format(self.split_on,
self.bounds[self.counter][0],
self.split_on,
self.bounds[self.counter][1])
if self.dask:
subset = self.features.query(q).compute()
else:
subset = self.features.query(q)
self.counter += 1
if len(subset.index) > 1:
return subset
else:
return None
raise StopIteration
[docs] def map(self, func, processes=1, **kwargs):
'''
Maps `func` to each partition, then returns the combined result.
Parameters
----------
func : function
Function to apply to partitions.
processes : int
Number of parallel processes. If less than 2, a serial mapping is
applied.
kwargs
Keyword arguments passed to `func`.
Returns
-------
:obj:`~pandas.DataFrame`
Combined result of `func` applied to partitions.
'''
# Serial
if processes < 2:
result = [func(x, **kwargs) for x in self]
# Parallel
else:
with mp.Pool(processes=processes) as p:
result = list(p.imap(partial(func, **kwargs), self))
# Add partition index
for i in range(len(result)):
if result[i] is not None:
result[i]['partition_idx'] = i
# Combine partitions
return pd.concat(result, ignore_index=True)
[docs]def partition(features, split_on='mz', size=1000, overlap=0.05):
'''
Partitions data along a given dimension.
Parameters
----------
features : :obj:`~pandas.DataFrame`
Input feature coordinates and intensities.
split_on : str
Dimension to partition the data.
size : int
Target partition size.
overlap : float
Amount of overlap between partitions to ameliorate edge effects.
Returns
-------
:obj:`~deimos.subset.Partitions`
A generator object that will lazily build and return each partition.
'''
return Partitions(features, split_on, size, overlap)
[docs]def multi_sample_partition(features, split_on='mz', size=500, tol=25E-6):
'''
Partitions data along a given dimension. For use with features across
multiple samples, e.g. in alignment.
Parameters
----------
features : :obj:`~pandas.DataFrame` or :obj:`~dask.dataframe.DataFrame`
Input feature coordinates and intensities.
split_on : str
Dimension to partition the data.
size : int
Target partition size.
tol : float
Largest allowed distance between unique `split_on` observations.
Returns
-------
:obj:`~deimos.subset.Partitions`
A generator object that will lazily build and return each partition.
'''
return MultiSamplePartitions(features, split_on, size, tol)