import numpy as np
import pandas as pd
from typing import Union, Optional
from . import logger
from .distance import Distance
from .symbols import ONE2ONE, ONE2MANY, MANY2ONE, NOVEL, REMAIN, UNASSIGN
from .plot import SEP1, _identify_relation_groups
import os
import pickle
def _subset_and_normalize(D1byD2: pd.DataFrame, D2byD1: pd.DataFrame, relation: pd.DataFrame, row_normalize: bool = True) -> tuple:
"""
For internal use. Subset the two matrices and row normalize to a sum of 1.
"""
if relation.shape[0] > 0:
flag1 = ~D1byD2.index.isin(relation.D1.unique())
flag2 = ~D1byD2.columns.isin(relation.D2.unique())
D1byD2 = D1byD2.loc[flag1, flag2]
D2byD1 = D2byD1.loc[flag2, flag1]
if not D1byD2.empty and row_normalize:
D1byD2 = D1byD2 / D1byD2.sum(axis = 1).values[:, np.newaxis]
D2byD1 = D2byD1 / D2byD1.sum(axis = 1).values[:, np.newaxis]
return D1byD2, D2byD1
def _pairwise_align(D1byD2: pd.DataFrame, D2byD1: pd.DataFrame, check: bool = True, row_normalize: bool = True, minimum_unique_percent: float = 0.5, minimum_divide_percent: float = 0.1, maximum_novel_percent: float = 0.05) -> pd.DataFrame:
"""
For internal use. Align cell types from two datasets based on two confusion matrices.
"""
if check and (not np.array_equal(D1byD2.index, D2byD1.columns) or not np.array_equal(D1byD2.columns, D2byD1.index)):
raise ValueError(
f"🛑 Matrices are not provided correctly")
relation = pd.DataFrame(columns = ['D1', 'relation', 'D2'])
#rows all 0
rownames = D1byD2.index[D1byD2.sum(axis = 1) == 0]
if len(rownames) > 0:
relation = pd.concat([relation, pd.DataFrame({'D1': rownames, 'relation': [ONE2ONE]*len(rownames), 'D2': [NOVEL]*len(rownames)})])
rownames = D2byD1.index[D2byD1.sum(axis = 1) == 0]
if len(rownames) > 0:
relation = pd.concat([relation, pd.DataFrame({'D1': [NOVEL]*len(rownames), 'relation': [ONE2ONE]*len(rownames), 'D2': rownames})])
#subset and normalize
D1byD2, D2byD1 = _subset_and_normalize(D1byD2, D2byD1, relation, row_normalize)
#D1 novel
colnames = D2byD1.columns[D2byD1.max(axis = 0) < maximum_novel_percent]
if len(colnames) > 0:
relation = pd.concat([relation, pd.DataFrame({'D1': colnames, 'relation': [ONE2ONE]*len(colnames), 'D2': [NOVEL]*len(colnames)})])
#D2 novel
colnames = D1byD2.columns[D1byD2.max(axis = 0) < maximum_novel_percent]
if len(colnames) > 0:
relation = pd.concat([relation, pd.DataFrame({'D1': [NOVEL]*len(colnames), 'relation': [ONE2ONE]*len(colnames), 'D2': colnames})])
D1byD2, D2byD1 = _subset_and_normalize(D1byD2, D2byD1, relation, row_normalize)
#one2one
indices = (D1byD2 > minimum_unique_percent) & (D2byD1.T > minimum_unique_percent)
one2many_indices = (D1byD2 > minimum_divide_percent) & (D2byD1.T > minimum_unique_percent)
one2many_indices = one2many_indices & (one2many_indices.sum(axis = 1) >= 2).values[:, np.newaxis]
many2one_indices = (D2byD1 > minimum_divide_percent) & (D1byD2.T > minimum_unique_percent)
many2one_indices = many2one_indices & (many2one_indices.sum(axis = 1) >= 2).values[:, np.newaxis]
row_index, col_index = np.where(indices & (indices.sum(axis = 1) == 1).values[:, np.newaxis] & (indices.sum(axis = 0) == 1).values & (~one2many_indices) & (~many2one_indices.T))
while len(row_index) > 0:
relation = pd.concat([relation, pd.DataFrame({'D1': D1byD2.index[row_index], 'relation': [ONE2ONE]*len(row_index), 'D2': D1byD2.columns[col_index]})])
D1byD2, D2byD1 = _subset_and_normalize(D1byD2, D2byD1, relation, row_normalize)
if D1byD2.empty:
break
indices = (D1byD2 > minimum_unique_percent) & (D2byD1.T > minimum_unique_percent)
one2many_indices = (D1byD2 > minimum_divide_percent) & (D2byD1.T > minimum_unique_percent)
one2many_indices = one2many_indices & (one2many_indices.sum(axis = 1) >= 2).values[:, np.newaxis]
many2one_indices = (D2byD1 > minimum_divide_percent) & (D1byD2.T > minimum_unique_percent)
many2one_indices = many2one_indices & (many2one_indices.sum(axis = 1) >= 2).values[:, np.newaxis]
row_index, col_index = np.where(indices & (indices.sum(axis = 1) == 1).values[:, np.newaxis] & (indices.sum(axis = 0) == 1).values & (~one2many_indices) & (~many2one_indices.T))
#one2many
if not D1byD2.empty:
indices = (D1byD2 > minimum_divide_percent) & (D2byD1.T > minimum_unique_percent)
row_index, col_index = np.where(indices & (indices.sum(axis = 1) >= 2).values[:, np.newaxis])
while len(row_index) > 0:
relation = pd.concat([relation, pd.DataFrame({'D1': D1byD2.index[row_index], 'relation': [ONE2MANY]*len(row_index), 'D2': D1byD2.columns[col_index]})])
D1byD2, D2byD1 = _subset_and_normalize(D1byD2, D2byD1, relation, row_normalize)
if D1byD2.empty:
break
indices = (D1byD2 > minimum_divide_percent) & (D2byD1.T > minimum_unique_percent)
row_index, col_index = np.where(indices & (indices.sum(axis = 1) >= 2).values[:, np.newaxis])
#many2one
if not D1byD2.empty:
indices = (D2byD1 > minimum_divide_percent) & (D1byD2.T > minimum_unique_percent)
row_index, col_index = np.where(indices & (indices.sum(axis = 1) >= 2).values[:, np.newaxis])
while len(row_index) > 0:
relation = pd.concat([relation, pd.DataFrame({'D1': D2byD1.columns[col_index], 'relation': [MANY2ONE]*len(col_index), 'D2': D2byD1.index[row_index]})])
D1byD2, D2byD1 = _subset_and_normalize(D1byD2, D2byD1, relation, row_normalize)
if D1byD2.empty:
break
indices = (D2byD1 > minimum_divide_percent) & (D1byD2.T > minimum_unique_percent)
row_index, col_index = np.where(indices & (indices.sum(axis = 1) >= 2).values[:, np.newaxis])
#remaining
if D1byD2.shape[0] > 0:
relation = pd.concat([relation, pd.DataFrame({'D1': D1byD2.index, 'relation': [ONE2ONE]*len(D1byD2.index), 'D2': [REMAIN]*len(D1byD2.index)})])
if D1byD2.shape[1] > 0:
relation = pd.concat([relation, pd.DataFrame({'D1': [REMAIN]*len(D1byD2.columns), 'relation': [ONE2ONE]*len(D1byD2.columns), 'D2': D1byD2.columns})])
return relation
def _reorder_dataset(sim_df: pd.DataFrame) -> np.ndarray:
"""
For internal use. Reorder datasets based on their pairwise similarities.
"""
stack_datasets = sim_df.iloc[sim_df.similarity.argmax(), [0, 1]].tolist()
sim_df = pd.concat([sim_df, sim_df[['D2', 'D1', 'similarity']].rename(columns = {'D2':'D1', 'D1':'D2'})])
remain_datasets = list(np.setdiff1d(np.unique(sim_df.D1), stack_datasets))
while len(remain_datasets) >= 2:
scores = [sim_df.loc[(sim_df.D1 == x) & sim_df.D2.isin(stack_datasets), 'similarity'].sum() for x in remain_datasets]
stack_datasets.append(remain_datasets.pop(scores.index(max(scores))))
stack_datasets.append(remain_datasets.pop())
return np.array(stack_datasets, dtype = object)
[docs]
class DistanceAlignment():
"""
Class that performs cell type label harmonization across datasets.
Parameters
----------
base_distance
A :class:`~cellhint.distance.Distance` object.
check
Whether to check the supplied `base_distance` is correctly provided.
(Default: `True`)
dataset_order
Order of datasets to be aligned. By default, the order is the same as that in the base distance matrix.
row_normalize
Whether to row normalize the confusion matrix to a sum of 1 in each iteration.
(Default: `True`)
minimum_unique_percent
The minimum cell assignment fraction to claim a cell type as uniquely matched to a cell type from the other dataset.
(Default: `0.5`)
minimum_divide_percent
The minimum cell assignment fraction to claim a cell type as divisible into two or more cell types from the other dataset.
(Default: `0.1`)
maximum_novel_percent
The maximum cell assignment fraction to claim a cell type as novel to a given dataset.
(Default: `0.05`)
Attributes
----------
base_distance
The :class:`~cellhint.distance.Distance` object.
dataset_order
Order of datasets to be aligned.
row_normalize
Whether to row normalize the confusion matrix to a sum of 1 in each iteration.
minimum_unique_percent
The minimum cell assignment fraction to claim a cell type as uniquely matched to a cell type from the other dataset.
minimum_divide_percent
The minimum cell assignment fraction to claim a cell type as divisible into two or more cell types from the other dataset.
maximum_novel_percent
The maximum cell assignment fraction to claim a cell type as novel to a given dataset.
relation
A :class:`~pandas.DataFrame` representing the harmonization result.
aligned_datasets
List of datasets that are already harmonized.
groups
Cell type groups (high-hierarchy cell types) categorizing the rows of `.relation`.
reannotation
A :class:`~pandas.DataFrame` representing the reannotated cell types.
minimum_unique_percents
List of `minimum_unique_percent` values which are used along harmonization iterations in order to get the best alignment.
This attribute is obtained through the :meth:`~cellhint.align.DistanceAlignment.best_align` method.
minimum_divide_percents
List of `minimum_divide_percent` values which are used along harmonization iterations in order to get the best alignment.
This attribute is obtained through the :meth:`~cellhint.align.DistanceAlignment.best_align` method.
"""
def __init__(self, base_distance: Distance, check: bool = True, dataset_order: Optional[Union[list, tuple, np.ndarray, pd.Series, pd.Index]] = None,
row_normalize: bool = True, minimum_unique_percent: float = 0.5, minimum_divide_percent: float = 0.1, maximum_novel_percent: float = 0.05):
if check:
if not isinstance(base_distance, Distance) or not base_distance.symmetric():
raise ValueError(
f"🛑 Please provide a symmetric `Distance` object")
if not hasattr(base_distance, 'assignment'):
raise AttributeError(
f"🛑 No `.assignment` attribute in the `base_distance`. Apply the `.assign` method first")
self.base_distance = base_distance
if dataset_order is None:
dataset_order = np.unique(self.base_distance.cell_type.dataset)
else:
dataset_order = np.array(dataset_order, dtype = object)
if not np.array_equal(np.sort(dataset_order), np.unique(self.base_distance.cell_type.dataset)):
raise ValueError(
f"🛑 Please provide a comprehensive order of datasets with correct names")
self.dataset_order = dataset_order
self.row_normalize = row_normalize
self.minimum_unique_percent = minimum_unique_percent
self.minimum_divide_percent = minimum_divide_percent
self.maximum_novel_percent = maximum_novel_percent
[docs]
def reorder_dataset(self, weights: Union[list, tuple, np.ndarray, pd.Series, pd.Index] = (2, 1, -1, -2), return_similarity: bool = False) -> Union[None, pd.DataFrame]:
"""
Reorder the datasets such that similar datasets will be harmonized first. This method can also be used to calculate CellHint-defined inter-dataset similarities.
Parameters
----------
weights
Weights assigned to one-to-one, one/many-to-many/one, novel, and remaining cell type matches, respectively. Default to 2, 1, -1, -2.
Inter-cell-type similarities will be weighted by these values to derive the weighted sum of similarity between each pair of datasets.
return_similarity
Whether to return the data frame of dataset-dataset similarities.
(Default: `False`)
Returns
----------
Reordered datasets as the attribute `.dataset_order` and if `return_similarity = True`, return a :class:`~pandas.DataFrame` of dataset-dataset similarities.
"""
if len(self.dataset_order) == 2:
logger.warn(f"⚠️ Warning: only two datasets exist, no need to reorder them")
return
weights = np.array(weights, dtype = 'float')
meta_distance = self.base_distance.to_meta(False)
meta_similarity = 1 - meta_distance
sim_df = pd.DataFrame([self.dataset_order[[i, j]] for i in range(0, len(self.dataset_order) - 1) for j in range(i + 1, len(self.dataset_order))], columns = ['D1', 'D2'])
scores = []
for _, s in sim_df.iterrows():
D1 = s.values[0]
D2 = s.values[1]
sub_meta_similarity = meta_similarity.loc[meta_similarity.index.str.startswith(D1 + ": "), meta_similarity.columns.str.startswith(D2 + ": ")]
sub_meta_similarity.index = sub_meta_similarity.index.str.replace(D1 + ": ", '', regex = False)
sub_meta_similarity.columns = sub_meta_similarity.columns.str.replace(D2 + ": ", '', regex = False)
relation = self.pairwise_align(D1, D2, False)
ss = np.full(relation.shape[0], -1, dtype = 'float')
ws = np.full(relation.shape[0], weights[0], dtype = 'float')
#REMAIN NOVEL
flag_REMAIN_1 = (relation[D1] == REMAIN).values
flag_REMAIN_2 = (relation[D2] == REMAIN).values
flag_NOVEL_1 = (relation[D1] == NOVEL).values
flag_NOVEL_2 = (relation[D2] == NOVEL).values
flag_NONE_1 = flag_REMAIN_1 | flag_NOVEL_1
if flag_NONE_1.sum() > 0:
ss[flag_NONE_1] = 1 - sub_meta_similarity[relation.loc[flag_NONE_1, D2].values].max(axis = 0).values
flag_NONE_2 = flag_REMAIN_2 | flag_NOVEL_2
if flag_NONE_2.sum() > 0:
ss[flag_NONE_2] = 1 - sub_meta_similarity.loc[relation.loc[flag_NONE_2, D1].values].max(axis = 1).values
ws[flag_REMAIN_1 | flag_REMAIN_2] = weights[3]
ws[flag_NOVEL_1 | flag_NOVEL_2] = weights[2]
#other ss
other_flag = ~(flag_NONE_1 | flag_NONE_2)
if other_flag.sum() > 0:
ss[other_flag] = np.diag(sub_meta_similarity.loc[relation.loc[other_flag, D1].values, relation.loc[other_flag, D2].values])
##remove in the future-->
assert np.all(ss != -1)
##<<-remove in the future
#ONE2MANY and MANY2ONE weights
flag_ONE2MANY = (relation.relation == ONE2MANY).values
if flag_ONE2MANY.sum() > 0:
DS = relation.loc[flag_ONE2MANY, D1].value_counts()
ws[flag_ONE2MANY] = (weights[1] / DS)[relation.loc[flag_ONE2MANY, D1].values].values
flag_MANY2ONE = (relation.relation == MANY2ONE).values
if flag_MANY2ONE.sum() > 0:
DS = relation.loc[flag_MANY2ONE, D2].value_counts()
ws[flag_MANY2ONE] = (weights[1] / DS)[relation.loc[flag_MANY2ONE, D2].values].values
#final scores
scores.append(np.sum(ss * ws) / sum(sub_meta_similarity.shape))
sim_df['similarity'] = scores
self.dataset_order = _reorder_dataset(sim_df)
if return_similarity:
return sim_df
[docs]
def pairwise_align(self, D1: str, D2: str, check: bool = True) -> pd.DataFrame:
"""
Pairwise alignment of cell types between two datasets.
Parameters
----------
D1
Name of the first dataset.
D2
Name of the second dataset.
check
Whether to check names of the two datasets are contained in the :attr:`~cellhint.align.DistanceAlignment.base_distance`.
(Default: `True`)
Returns
----------
:class:`~pandas.DataFrame`
A :class:`~pandas.DataFrame` with three columns:
1) **name of dataset 1**, cell types from dataset 1.
2) **relation**, being either '=', '∋' or '∈'.
3) **name of dataset 2**, cell types from dataset 2.
"""
D1byD2, D2byD1 = self.base_distance.to_pairwise_confusion(D1, D2, check)
relation = _pairwise_align(D1byD2, D2byD1, False, self.row_normalize, self.minimum_unique_percent, self.minimum_divide_percent, self.maximum_novel_percent)
return relation.rename(columns = {'D1': D1, 'D2': D2})
[docs]
def multi_align(self, relation: pd.DataFrame, D: str, check: bool = True) -> pd.DataFrame:
"""
Multiple alignment of cell types across datasets. Cell types from a new dataset will be integrated into the previous harmonization data frame.
Parameters
----------
relation
A :class:`~pandas.DataFrame` object representing the cell type harmonization result across multiple datasets.
D
Name of the new dataset to be aligned.
check
Whether to check names of the datasets are contained.
(Default: `True`)
Returns
----------
:class:`~pandas.DataFrame`
A :class:`~pandas.DataFrame` with multiple columns:
1) **name of dataset 1**, cell types from dataset 1.
2) **relation**, being either '=', '∋' or '∈'.
3) **name of dataset 2**, cell types from dataset 2.
4) ...
N) **name of the new dataset**, cell types from the new dataset.
"""
D1byD2, D2byD1 = self.base_distance.to_multi_confusion(relation, D, check)
new_relation = _pairwise_align(D1byD2, D2byD1, False, self.row_normalize, self.minimum_unique_percent, self.minimum_divide_percent, self.maximum_novel_percent)
relation.index = relation.apply(lambda row: ' '.join(row.values), axis = 1).values
relation.loc[NOVEL] = np.tile([NOVEL, ONE2ONE], int(relation.shape[1]/2)+1)[:-1]
relation.loc[REMAIN] = np.tile([REMAIN, ONE2ONE], int(relation.shape[1]/2)+1)[:-1]
relation = relation.loc[new_relation.D1.values]
relation.index = new_relation.index
relation['__relation'] = new_relation['relation'].values
relation[D] = new_relation['D2'].values
return relation.rename(columns = {'__relation': 'relation'})
[docs]
def align(self, datasets: Optional[Union[list, tuple, np.ndarray, pd.Series, pd.Index]] = None) -> None:
"""
Iterative alignment of cell types across datasets.
Parameters
----------
datasets
Datasets to be aligned. Default to using all datasets available.
Returns
----------
None
A :class:`~pandas.DataFrame` with multiple columns added as the attribute `.relation`:
1) **name of dataset 1**, cell types from dataset 1.
2) **relation**, being either '=', '∋' or '∈'.
3) **name of dataset 2**, cell types from dataset 2.
4) ...
N) **name of the last dataset**, cell types from the last dataset.
"""
if datasets is None:
datasets = self.dataset_order
else:
datasets = np.array(datasets, dtype = object)
if not set(datasets).issubset(self.dataset_order):
raise ValueError(
f"🛑 The following datasets are not found: {set(datasets).difference(self.dataset_order)}")
logger.info(f"🖇 Harmonizing cell types of {datasets[0]} and {datasets[1]}")
relation = self.pairwise_align(datasets[0], datasets[1], False)
if len(datasets) >= 3:
for i in range(2, len(datasets)):
logger.info(f"🖇 Harmonizing cell types of {datasets[i]}")
relation = self.multi_align(relation, datasets[i], False)
logger.info(f"✅ Harmonization done!")
self.relation = relation
@property
def aligned_datasets(self) -> np.ndarray:
"""Get the datasets which are already harmonized."""
return np.array(self.relation.columns[0::2])
@property
def groups(self) -> np.ndarray:
"""Get the cell type groups (high hierarchy) based on the relation table."""
return _identify_relation_groups(self.relation, order_row = False, order_column = False)[0]
def __repr__(self):
base = f"Cross-dataset cell type alignment for {len(self.dataset_order)} datasets"
base += f"\n base_distance: a cross-dataset distance object"
if hasattr(self, 'relation'):
base += f"\n aligned_datasets: {str(list(self.aligned_datasets))[1:-1]}"
base += f"\n relation: data frame of the harmonization table"
if hasattr(self, 'reannotation'):
base += f"\n reannotation: data frame of the reannotated cells ({str(list(self.reannotation.columns))[1:-1]})"
return base
[docs]
def update(self, datasets: Optional[Union[str, list, tuple, np.ndarray, pd.Series, pd.Index]] = None) -> None:
"""
Iteratively update the alignment of cell types across datasets.
Parameters
----------
datasets
Datasets to be aligned. Default to using all the remaining datasets.
Returns
----------
None
An updated :class:`~pandas.DataFrame` with multiple columns added as the attribute `.relation`:
1) **name of dataset 1**, cell types from dataset 1.
2) **relation**, being either '=', '∋' or '∈'.
3) **name of dataset 2**, cell types from dataset 2.
4) ...
N) **name of the last dataset**, cell types from the last dataset.
"""
if not hasattr(self, 'relation'):
raise AttributeError(
f"🛑 No harmonization result exists. Please run the `.align` method first")
if len(self.aligned_datasets) == len(self.dataset_order):
logger.warn(f"⚠️ All datasets have been harmonized. No update is needed")
return
remaining_datasets = self.dataset_order[~np.isin(self.dataset_order, self.aligned_datasets)]
if datasets is None:
datasets = remaining_datasets
else:
datasets = [datasets] if isinstance(datasets, str) else np.array(datasets, dtype = object)
if not set(datasets).issubset(remaining_datasets):
raise ValueError(
f"🛑 Please provide dataset names from the following list: {set(remaining_datasets)}")
for dataset in datasets:
logger.info(f"🖇 Harmonizing cell types of {dataset}")
self.relation = self.multi_align(self.relation, dataset, False)
logger.info(f"✅ Harmonization done!")
[docs]
def best_align(self, dataset_order: Optional[Union[list, tuple, np.ndarray, pd.Series, pd.Index]] = None, minimum_unique_percents: Union[list, tuple, np.ndarray, pd.Series, pd.Index, float] = (0.4, 0.5, 0.6, 0.7, 0.8), minimum_divide_percents: Union[list, tuple, np.ndarray, pd.Series, pd.Index, float] = (0.1, 0.15, 0.2)):
"""
Iterative alignment of cell types across datasets by finding the best parameter combo in each iteration.
Parameters
----------
dataset_order
Order of datasets to be aligned. This can also be a subset of datasets.
Default to the dataset order in the `DistanceAlignment` object.
minimum_unique_percents
The minimum cell assignment fraction(s) to claim a cell type as uniquely matched to a cell type from the other dataset.
By default, five values will be tried (0.4, 0.5, 0.6, 0.7, 0.8) to find the one that produces least alignments in each harmonization iteration.
minimum_divide_percents
The minimum cell assignment fraction(s) to claim a cell type as divisible into two or more cell types from the other dataset.
By default, three values will be tried (0.1, 0.15, 0.2) to find the one that produces least alignments in each harmonization iteration.
Returns
----------
None
A :class:`~pandas.DataFrame` with multiple columns added as the attribute `.relation`:
1) **name of dataset 1**, cell types from dataset 1.
2) **relation**, being either '=', '∋' or '∈'.
3) **name of dataset 2**, cell types from dataset 2.
4) ...
N) **name of the last dataset**, cell types from the last dataset.
"""
if dataset_order is not None:
dataset_order = np.array(dataset_order, dtype = object)
if not set(dataset_order).issubset(self.dataset_order):
raise ValueError(
f"🛑 The following dataset(s) are not found: {set(dataset_order).difference(self.dataset_order)}")
else:
dataset_order = self.dataset_order
original_mup = self.minimum_unique_percent
original_mdp = self.minimum_divide_percent
minimum_unique_percents = np.array([minimum_unique_percents]) if isinstance(minimum_unique_percents, float) else np.array(minimum_unique_percents)
minimum_divide_percents = np.array([minimum_divide_percents]) if isinstance(minimum_divide_percents, float) else np.array(minimum_divide_percents)
mups = np.full(len(dataset_order) - 1, -1, dtype = 'float')
mdps = mups.copy()
logger.info(f"🖇 Harmonizing cell types of {dataset_order[0]} and {dataset_order[1]}")
n_rows = np.inf
for minimum_unique_percent in minimum_unique_percents:
for minimum_divide_percent in minimum_divide_percents:
self.minimum_unique_percent = minimum_unique_percent
self.minimum_divide_percent = minimum_divide_percent
per_relation = self.pairwise_align(dataset_order[0], dataset_order[1], check = False)
if per_relation.shape[0] <= n_rows:
relation = per_relation
n_rows = per_relation.shape[0]
mups[0] = minimum_unique_percent
mdps[0] = minimum_divide_percent
if len(dataset_order) >= 3:
for i in range(2, len(dataset_order)):
logger.info(f"🖇 Harmonizing cell types of {dataset_order[i]}")
n_rows = np.inf
for minimum_unique_percent in minimum_unique_percents:
for minimum_divide_percent in minimum_divide_percents:
self.minimum_unique_percent = minimum_unique_percent
self.minimum_divide_percent = minimum_divide_percent
per_relation = self.multi_align(relation.copy(), dataset_order[i], check = False)
if per_relation.shape[0] <= n_rows:
expand_relation = per_relation
n_rows = per_relation.shape[0]
mups[i - 1] = minimum_unique_percent
mdps[i - 1] = minimum_divide_percent
relation = expand_relation
self.minimum_unique_percents = mups
self.minimum_divide_percents = mdps
self.minimum_unique_percent = original_mup
self.minimum_divide_percent = original_mdp
self.relation = relation
#def reannotate(self, show_iteration: bool = False, add_group: bool = True, prefix: str = '') -> None:
# """
# Reannotate each cell into the harmonized cell type.
# Parameters
# ----------
# show_iteration
# Whether to store the cell type reannotation result for each harmonization iteration.
# (Default: `False`)
# add_group
# Whether to annotate out cell type group information.
# (Default: `True`)
# prefix
# Prefix of the harmonization columns for all iterations. Default to no prefix.
# Returns
# ----------
# None
# A :class:`~pandas.DataFrame` with multiple columns added as the attribute `.reannotation`:
# 1) **dataset**, datasets where the cells are from.
# 2) **cell_type**, cell types annotated by the original studies/datasets.
# 3) **roundN** or **reannotation**, prefixed with `prefix`; cell types reannotated by the harmonization process.
# 4) **group**, prefixed with `prefix`; annotated cell type groups.
# """
# if not hasattr(self, 'relation'):
# raise AttributeError(
# f"🛑 No harmonization result (`.relation`) exists")
# reannotation = self.base_distance.cell.set_index('ID', inplace = False, drop = True)
# lend = len(self.aligned_datasets)
# reannotation[[f"{prefix}round{x}" for x in range(1, lend)]] = UNASSIGN
# assignment = self.base_distance.assignment[self.aligned_datasets]
# for _, s in self.relation.iterrows():
# celltypes = s.values[0::2]
# flags = (assignment == celltypes) | np.isin(celltypes, [NOVEL, REMAIN])
# non_existing_datasets = self.aligned_datasets[np.isin(celltypes, [NOVEL, REMAIN])]
# flags.loc[reannotation.dataset.isin(non_existing_datasets).values, :] = False
# for N in range(1, lend):
# if (not show_iteration) and (N != lend - 1):
# continue
# sub_s = s.values[:2*N+1]
# sub_celltypes = sub_s[0::2]
# if np.all(sub_celltypes == NOVEL) or np.all(sub_celltypes == REMAIN):
# continue
# reannotation.loc[np.all(flags[self.aligned_datasets[:N+1]], axis = 1).values, f"{prefix}round{N}"] = ' '.join(sub_s)
# if add_group:
# groups, new_relation = _identify_relation_groups(self.relation, order_row = False, order_column = False)
# group_mapping = dict()
# for i in range(len(groups)):
# group_mapping.update({j: groups[i] for j in new_relation.iloc[i].values})
# reannotation[f"{prefix}group"] = (reannotation.dataset + SEP1 + reannotation.cell_type).replace(group_mapping)
# reannotation.loc[reannotation[f"{prefix}group"].str.contains(SEP1), f"{prefix}group"] = UNASSIGN
# if not show_iteration:
# reannotation.rename(columns = {f"{prefix}round{lend-1}": f"{prefix}reannotation"}, inplace = True)
# if lend >= 3:
# reannotation.drop(columns = [f"{prefix}round{x}" for x in range(1, lend-1)], inplace = True)
# self.reannotation = reannotation
[docs]
def reannotate(self, prefix: str = '') -> None:
"""
Reannotate each cell into the harmonized cell type.
Parameters
----------
prefix
Prefix of the harmonization columns. Default to no prefix.
Returns
----------
None
A :class:`~pandas.DataFrame` with multiple columns added as the attribute `.reannotation`:
1) **dataset**, datasets where the cells are from.
2) **cell_type**, cell types annotated by the original studies/datasets.
3) **reannotation**, prefixed with `prefix`; cell types reannotated by the harmonization process.
4) **group**, prefixed with `prefix`; annotated cell type groups.
"""
if not hasattr(self, 'relation'):
raise AttributeError(
f"🛑 No harmonization result (`.relation`) exists")
reannotation = self.base_distance.cell.set_index('ID', inplace = False, drop = True)
reannotation[f"{prefix}reannotation"] = UNASSIGN
assignment = self.base_distance.assignment[self.aligned_datasets]
for dataset in self.aligned_datasets:
for celltype in np.setdiff1d(np.unique(self.relation[dataset]), [NOVEL, REMAIN]):
flag = ((reannotation.dataset == dataset) & (reannotation.cell_type == celltype)).values
sub_relation = self.relation[self.relation[dataset] == celltype]
if sub_relation.shape[0] == 1:
reannotation.loc[flag, f"{prefix}reannotation"] = ' '.join(sub_relation.iloc[0].values)
else:
sub_assignment = assignment[flag]
sums = pd.DataFrame()
for _, s in sub_relation.iterrows():
celltypes = s.values[0::2]
per_s_flag = (sub_assignment == celltypes) | np.isin(celltypes, [NOVEL, REMAIN])
sums[' '.join(s.values)] = per_s_flag.sum(axis = 1).values
reannotation.loc[flag, f"{prefix}reannotation"] = sums.idxmax(axis = 1).values
groups, new_relation = _identify_relation_groups(self.relation, order_row = False, order_column = False)
group_mapping = dict()
for i in range(len(groups)):
group_mapping.update({j: groups[i] for j in new_relation.iloc[i].values})
reannotation[f"{prefix}group"] = (reannotation.dataset + SEP1 + reannotation.cell_type).replace(group_mapping)
reannotation.loc[reannotation[f"{prefix}group"].str.contains(SEP1), f"{prefix}group"] = UNASSIGN
self.reannotation = reannotation
[docs]
@staticmethod
def load(alignment_file: str):
"""Load the DistanceAlignment file."""
with open(alignment_file, "rb") as fh:
return pickle.load(fh)
[docs]
def write(self, file: str) -> None:
"""Write out the DistanceAlignment."""
file = os.path.splitext(file)[0] + '.pkl'
with open(file, 'wb') as output:
pickle.dump(self, output)