antoniaebner's picture
add code, specific requirements
c70bdf2
raw
history blame
10.1 kB
# pipeline taken from https://huggingface.co/spaces/ml-jku/mhnfs/blob/main/src/data_preprocessing/create_descriptors.py
"""
This files includes a the data processing for Tox21.
As an input it takes a list of SMILES and it outputs a nested dictionary with
SMILES and target names as keys.
"""
import json
import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.feature_selection import VarianceThreshold
from statsmodels.distributions.empirical_distribution import ECDF
from rdkit import Chem, DataStructs
from rdkit.Chem import Descriptors, rdFingerprintGenerator, MACCSkeys
from rdkit.Chem.rdchem import Mol
from .utils import (
USED_200_DESCR,
TOX_SMARTS_PATH,
Standardizer,
)
def create_cleaned_mol_objects(smiles: list[str]) -> tuple[list[Mol], np.ndarray]:
"""This function creates cleaned RDKit mol objects from a list of SMILES.
Args:
smiles (list[str]): list of SMILES
Returns:
list[Mol]: list of cleaned molecules
np.ndarray[bool]: mask that contains False at index `i`, if molecule in `smiles` at
index `i` could not be cleaned and was removed.
"""
sm = Standardizer(canon_taut=True)
clean_mol_mask = list()
mols = list()
for i, smile in enumerate(smiles):
mol = Chem.MolFromSmiles(smile)
standardized_mol, _ = sm.standardize_mol(mol)
is_cleaned = standardized_mol is not None
clean_mol_mask.append(is_cleaned)
if not is_cleaned:
continue
can_mol = Chem.MolFromSmiles(Chem.MolToSmiles(standardized_mol))
mols.append(can_mol)
return mols, np.array(clean_mol_mask)
def create_ecfp_fps(mols: list[Mol], radius=3, fpsize=2048, **kwargs) -> np.ndarray:
"""This function ECFP fingerprints for a list of molecules.
Args:
mols (list[Mol]): list of molecules
Returns:
np.ndarray: ECFP fingerprints of molecules
"""
ecfps = list()
for mol in mols:
gen = rdFingerprintGenerator.GetMorganGenerator(
countSimulation=True, fpSize=fpsize, radius=radius
)
fp_sparse_vec = gen.GetCountFingerprint(mol)
fp = np.zeros((0,), np.int8)
DataStructs.ConvertToNumpyArray(fp_sparse_vec, fp)
ecfps.append(fp)
return np.array(ecfps)
def create_maccs_keys(mols: list[Mol]) -> np.ndarray:
maccs = [MACCSkeys.GenMACCSKeys(x) for x in mols]
return np.array(maccs)
def get_tox_patterns(filepath: str):
"""This calculates tox features defined in tox_smarts.json.
Args:
mols: A list of Mol
n_jobs: If >1 multiprocessing is used
"""
# load patterns
with open(filepath) as f:
smarts_list = [s[1] for s in json.load(f)]
# Code does not work for this case
assert len([s for s in smarts_list if ("AND" in s) and ("OR" in s)]) == 0
# Chem.MolFromSmarts takes a long time so it pays of to parse all the smarts first
# and then use them for all molecules. This gives a huge speedup over existing code.
# a list of patterns, whether to negate the match result and how to join them to obtain one boolean value
all_patterns = []
for smarts in smarts_list:
patterns = [] # list of smarts-patterns
# value for each of the patterns above. Negates the values of the above later.
negations = []
if " AND " in smarts:
smarts = smarts.split(" AND ")
merge_any = False # If an ' AND ' is found all 'subsmarts' have to match
else:
# If there is an ' OR ' present it's enough is any of the 'subsmarts' match.
# This also accumulates smarts where neither ' OR ' nor ' AND ' occur
smarts = smarts.split(" OR ")
merge_any = True
# for all subsmarts check if they are preceded by 'NOT '
for s in smarts:
neg = s.startswith("NOT ")
if neg:
s = s[4:]
patterns.append(Chem.MolFromSmarts(s))
negations.append(neg)
all_patterns.append((patterns, negations, merge_any))
return all_patterns
def create_tox_features(mols: list[Mol], patterns: list) -> np.ndarray:
"""Matches the tox patterns against a molecule. Returns a boolean array"""
tox_data = []
for mol in mols:
mol_features = []
for patts, negations, merge_any in patterns:
matches = [mol.HasSubstructMatch(p) for p in patts]
matches = [m != n for m, n in zip(matches, negations)]
if merge_any:
pres = any(matches)
else:
pres = all(matches)
mol_features.append(pres)
tox_data.append(np.array(mol_features))
return np.array(tox_data)
def create_rdkit_descriptors(mols: list[Mol]) -> np.ndarray:
"""This function creates RDKit descriptors for a list of molecules.
Args:
mols (list[Mol]): list of molecules
Returns:
np.ndarray: RDKit descriptors of molecules
"""
rdkit_descriptors = list()
for mol in mols:
descrs = []
for _, descr_calc_fn in Descriptors._descList:
descrs.append(descr_calc_fn(mol))
descrs = np.array(descrs)
descrs = descrs[USED_200_DESCR]
rdkit_descriptors.append(descrs)
return np.array(rdkit_descriptors)
def create_quantiles(raw_features: np.ndarray, ecdfs: list) -> np.ndarray:
"""Create quantile values for given features using the columns
Args:
raw_features (np.ndarray): values to put into quantiles
ecdfs (list): ECDFs to use
Returns:
np.ndarray: computed quantiles
"""
quantiles = np.zeros_like(raw_features)
for column in range(raw_features.shape[1]):
raw_values = raw_features[:, column].reshape(-1)
ecdf = ecdfs[column]
q = ecdf(raw_values)
quantiles[:, column] = q
return quantiles
def fill(features, mask, value=np.nan):
n_mols = len(mask)
n_features = features.shape[1]
data = np.zeros(shape=(n_mols, n_features))
data.fill(value)
data[~mask] = features
return data
def create_descriptors(
smiles,
ecdfs=None,
feature_selection=None,
return_ecdfs=False,
return_feature_selection=False,
**kwargs,
):
# Create cleanded rdkit mol objects
mols, clean_mol_mask = create_cleaned_mol_objects(smiles)
print("Cleaned molecules")
tox_patterns = get_tox_patterns(TOX_SMARTS_PATH)
# Create fingerprints and descriptors
ecfps = create_ecfp_fps(mols, **kwargs)
# expand using mol_mask
ecfps = fill(ecfps, ~clean_mol_mask)
print("Created ECFP fingerprints")
tox = create_tox_features(mols, tox_patterns)
tox = fill(tox, ~clean_mol_mask)
print("Created Tox features")
# Create and save feature selection for ecfps and tox
if feature_selection is None:
print("Create Feature selection")
ecfps_selec = get_feature_selection(ecfps, **kwargs)
tox_selec = get_feature_selection(tox, **kwargs)
feature_selection = {"ecfps_selec": ecfps_selec, "tox_selec": tox_selec}
else:
ecfps_selec = feature_selection["ecfps_selec"]
tox_selec = feature_selection["tox_selec"]
ecfps = ecfps[:, ecfps_selec]
tox = tox[:, tox_selec]
maccs = create_maccs_keys(mols)
maccs = fill(maccs, ~clean_mol_mask)
print("Created MACCS keys")
rdkit_descrs = create_rdkit_descriptors(mols)
print("Created RDKit descriptors")
# Create and save ecdfs
if ecdfs is None:
print("Create ECDFs")
ecdfs = []
for column in range(rdkit_descrs.shape[1]):
raw_values = rdkit_descrs[:, column].reshape(-1)
ecdfs.append(ECDF(raw_values))
# Create quantiles
rdkit_descr_quantiles = create_quantiles(rdkit_descrs, ecdfs)
# expand using mol_mask
rdkit_descr_quantiles = fill(rdkit_descr_quantiles, ~clean_mol_mask)
print("Created quantiles of RDKit descriptors")
# concatenate features
features = {
"ecfps": ecfps,
"tox": tox,
"maccs": maccs,
"rdkit_descr_quantiles": rdkit_descr_quantiles,
}
return_dict = {"features": features}
if return_ecdfs:
return_dict["ecdfs"] = ecdfs
if return_feature_selection:
return_dict["feature_selection"] = feature_selection
return return_dict
def get_feature_selection(
raw_features: np.ndarray, min_var=0.01, max_corr=0.95, **kwargs
) -> np.ndarray:
# select features with at least min_var variation
var_thresh = VarianceThreshold(threshold=min_var)
feature_selection = var_thresh.fit(raw_features).get_support(indices=True)
n_features_preselected = len(feature_selection)
# Remove highly correlated features
corr_matrix = np.corrcoef(raw_features[:, feature_selection], rowvar=False)
upper_tri = np.triu(corr_matrix, k=1)
to_keep = np.ones((n_features_preselected,), dtype=bool)
for i in range(upper_tri.shape[0]):
for j in range(upper_tri.shape[1]):
if upper_tri[i, j] > max_corr:
to_keep[j] = False
feature_selection = feature_selection[to_keep]
return feature_selection
def get_tox21_split(token, cvfold=None):
ds = load_dataset("tschouis/tox21", token=token)
train_df = ds["train"].to_pandas()
val_df = ds["validation"].to_pandas()
if cvfold is None:
return {"train": train_df, "validation": val_df}
combined_df = pd.concat([train_df, val_df], ignore_index=True)
cvfold = float(cvfold)
# create new splits
cvfold = float(cvfold)
train_df = combined_df[combined_df.CVfold != cvfold]
val_df = combined_df[combined_df.CVfold == cvfold]
# exclude train mols that occur in the validation split
val_inchikeys = set(val_df["inchikey"])
train_df = train_df[~train_df["inchikey"].isin(val_inchikeys)]
return {
"train": train_df.reset_index(drop=True),
"validation": val_df.reset_index(drop=True),
}