Spaces:
Sleeping
Sleeping
| # pipeline taken from https://huggingface.co/spaces/ml-jku/mhnfs/blob/main/src/data_preprocessing/create_descriptors.py | |
| """ | |
| This files includes a the data processing for Tox21. | |
| As an input it takes a list of SMILES and it outputs a nested dictionary with | |
| SMILES and target names as keys. | |
| """ | |
| import json | |
| import numpy as np | |
| import pandas as pd | |
| from datasets import load_dataset | |
| from sklearn.feature_selection import VarianceThreshold | |
| from statsmodels.distributions.empirical_distribution import ECDF | |
| from rdkit import Chem, DataStructs | |
| from rdkit.Chem import Descriptors, rdFingerprintGenerator, MACCSkeys | |
| from rdkit.Chem.rdchem import Mol | |
| from .utils import ( | |
| USED_200_DESCR, | |
| TOX_SMARTS_PATH, | |
| Standardizer, | |
| ) | |
| def create_cleaned_mol_objects(smiles: list[str]) -> tuple[list[Mol], np.ndarray]: | |
| """This function creates cleaned RDKit mol objects from a list of SMILES. | |
| Args: | |
| smiles (list[str]): list of SMILES | |
| Returns: | |
| list[Mol]: list of cleaned molecules | |
| np.ndarray[bool]: mask that contains False at index `i`, if molecule in `smiles` at | |
| index `i` could not be cleaned and was removed. | |
| """ | |
| sm = Standardizer(canon_taut=True) | |
| clean_mol_mask = list() | |
| mols = list() | |
| for i, smile in enumerate(smiles): | |
| mol = Chem.MolFromSmiles(smile) | |
| standardized_mol, _ = sm.standardize_mol(mol) | |
| is_cleaned = standardized_mol is not None | |
| clean_mol_mask.append(is_cleaned) | |
| if not is_cleaned: | |
| continue | |
| can_mol = Chem.MolFromSmiles(Chem.MolToSmiles(standardized_mol)) | |
| mols.append(can_mol) | |
| return mols, np.array(clean_mol_mask) | |
| def create_ecfp_fps(mols: list[Mol], radius=3, fpsize=2048, **kwargs) -> np.ndarray: | |
| """This function ECFP fingerprints for a list of molecules. | |
| Args: | |
| mols (list[Mol]): list of molecules | |
| Returns: | |
| np.ndarray: ECFP fingerprints of molecules | |
| """ | |
| ecfps = list() | |
| for mol in mols: | |
| gen = rdFingerprintGenerator.GetMorganGenerator( | |
| countSimulation=True, fpSize=fpsize, radius=radius | |
| ) | |
| fp_sparse_vec = gen.GetCountFingerprint(mol) | |
| fp = np.zeros((0,), np.int8) | |
| DataStructs.ConvertToNumpyArray(fp_sparse_vec, fp) | |
| ecfps.append(fp) | |
| return np.array(ecfps) | |
| def create_maccs_keys(mols: list[Mol]) -> np.ndarray: | |
| maccs = [MACCSkeys.GenMACCSKeys(x) for x in mols] | |
| return np.array(maccs) | |
| def get_tox_patterns(filepath: str): | |
| """This calculates tox features defined in tox_smarts.json. | |
| Args: | |
| mols: A list of Mol | |
| n_jobs: If >1 multiprocessing is used | |
| """ | |
| # load patterns | |
| with open(filepath) as f: | |
| smarts_list = [s[1] for s in json.load(f)] | |
| # Code does not work for this case | |
| assert len([s for s in smarts_list if ("AND" in s) and ("OR" in s)]) == 0 | |
| # Chem.MolFromSmarts takes a long time so it pays of to parse all the smarts first | |
| # and then use them for all molecules. This gives a huge speedup over existing code. | |
| # a list of patterns, whether to negate the match result and how to join them to obtain one boolean value | |
| all_patterns = [] | |
| for smarts in smarts_list: | |
| patterns = [] # list of smarts-patterns | |
| # value for each of the patterns above. Negates the values of the above later. | |
| negations = [] | |
| if " AND " in smarts: | |
| smarts = smarts.split(" AND ") | |
| merge_any = False # If an ' AND ' is found all 'subsmarts' have to match | |
| else: | |
| # If there is an ' OR ' present it's enough is any of the 'subsmarts' match. | |
| # This also accumulates smarts where neither ' OR ' nor ' AND ' occur | |
| smarts = smarts.split(" OR ") | |
| merge_any = True | |
| # for all subsmarts check if they are preceded by 'NOT ' | |
| for s in smarts: | |
| neg = s.startswith("NOT ") | |
| if neg: | |
| s = s[4:] | |
| patterns.append(Chem.MolFromSmarts(s)) | |
| negations.append(neg) | |
| all_patterns.append((patterns, negations, merge_any)) | |
| return all_patterns | |
| def create_tox_features(mols: list[Mol], patterns: list) -> np.ndarray: | |
| """Matches the tox patterns against a molecule. Returns a boolean array""" | |
| tox_data = [] | |
| for mol in mols: | |
| mol_features = [] | |
| for patts, negations, merge_any in patterns: | |
| matches = [mol.HasSubstructMatch(p) for p in patts] | |
| matches = [m != n for m, n in zip(matches, negations)] | |
| if merge_any: | |
| pres = any(matches) | |
| else: | |
| pres = all(matches) | |
| mol_features.append(pres) | |
| tox_data.append(np.array(mol_features)) | |
| return np.array(tox_data) | |
| def create_rdkit_descriptors(mols: list[Mol]) -> np.ndarray: | |
| """This function creates RDKit descriptors for a list of molecules. | |
| Args: | |
| mols (list[Mol]): list of molecules | |
| Returns: | |
| np.ndarray: RDKit descriptors of molecules | |
| """ | |
| rdkit_descriptors = list() | |
| for mol in mols: | |
| descrs = [] | |
| for _, descr_calc_fn in Descriptors._descList: | |
| descrs.append(descr_calc_fn(mol)) | |
| descrs = np.array(descrs) | |
| descrs = descrs[USED_200_DESCR] | |
| rdkit_descriptors.append(descrs) | |
| return np.array(rdkit_descriptors) | |
| def create_quantiles(raw_features: np.ndarray, ecdfs: list) -> np.ndarray: | |
| """Create quantile values for given features using the columns | |
| Args: | |
| raw_features (np.ndarray): values to put into quantiles | |
| ecdfs (list): ECDFs to use | |
| Returns: | |
| np.ndarray: computed quantiles | |
| """ | |
| quantiles = np.zeros_like(raw_features) | |
| for column in range(raw_features.shape[1]): | |
| raw_values = raw_features[:, column].reshape(-1) | |
| ecdf = ecdfs[column] | |
| q = ecdf(raw_values) | |
| quantiles[:, column] = q | |
| return quantiles | |
| def fill(features, mask, value=np.nan): | |
| n_mols = len(mask) | |
| n_features = features.shape[1] | |
| data = np.zeros(shape=(n_mols, n_features)) | |
| data.fill(value) | |
| data[~mask] = features | |
| return data | |
| def create_descriptors( | |
| smiles, | |
| ecdfs=None, | |
| feature_selection=None, | |
| return_ecdfs=False, | |
| return_feature_selection=False, | |
| **kwargs, | |
| ): | |
| # Create cleanded rdkit mol objects | |
| mols, clean_mol_mask = create_cleaned_mol_objects(smiles) | |
| print("Cleaned molecules") | |
| tox_patterns = get_tox_patterns(TOX_SMARTS_PATH) | |
| # Create fingerprints and descriptors | |
| ecfps = create_ecfp_fps(mols, **kwargs) | |
| # expand using mol_mask | |
| ecfps = fill(ecfps, ~clean_mol_mask) | |
| print("Created ECFP fingerprints") | |
| tox = create_tox_features(mols, tox_patterns) | |
| tox = fill(tox, ~clean_mol_mask) | |
| print("Created Tox features") | |
| # Create and save feature selection for ecfps and tox | |
| if feature_selection is None: | |
| print("Create Feature selection") | |
| ecfps_selec = get_feature_selection(ecfps, **kwargs) | |
| tox_selec = get_feature_selection(tox, **kwargs) | |
| feature_selection = {"ecfps_selec": ecfps_selec, "tox_selec": tox_selec} | |
| else: | |
| ecfps_selec = feature_selection["ecfps_selec"] | |
| tox_selec = feature_selection["tox_selec"] | |
| ecfps = ecfps[:, ecfps_selec] | |
| tox = tox[:, tox_selec] | |
| maccs = create_maccs_keys(mols) | |
| maccs = fill(maccs, ~clean_mol_mask) | |
| print("Created MACCS keys") | |
| rdkit_descrs = create_rdkit_descriptors(mols) | |
| print("Created RDKit descriptors") | |
| # Create and save ecdfs | |
| if ecdfs is None: | |
| print("Create ECDFs") | |
| ecdfs = [] | |
| for column in range(rdkit_descrs.shape[1]): | |
| raw_values = rdkit_descrs[:, column].reshape(-1) | |
| ecdfs.append(ECDF(raw_values)) | |
| # Create quantiles | |
| rdkit_descr_quantiles = create_quantiles(rdkit_descrs, ecdfs) | |
| # expand using mol_mask | |
| rdkit_descr_quantiles = fill(rdkit_descr_quantiles, ~clean_mol_mask) | |
| print("Created quantiles of RDKit descriptors") | |
| # concatenate features | |
| features = { | |
| "ecfps": ecfps, | |
| "tox": tox, | |
| "maccs": maccs, | |
| "rdkit_descr_quantiles": rdkit_descr_quantiles, | |
| } | |
| return_dict = {"features": features} | |
| if return_ecdfs: | |
| return_dict["ecdfs"] = ecdfs | |
| if return_feature_selection: | |
| return_dict["feature_selection"] = feature_selection | |
| return return_dict | |
| def get_feature_selection( | |
| raw_features: np.ndarray, min_var=0.01, max_corr=0.95, **kwargs | |
| ) -> np.ndarray: | |
| # select features with at least min_var variation | |
| var_thresh = VarianceThreshold(threshold=min_var) | |
| feature_selection = var_thresh.fit(raw_features).get_support(indices=True) | |
| n_features_preselected = len(feature_selection) | |
| # Remove highly correlated features | |
| corr_matrix = np.corrcoef(raw_features[:, feature_selection], rowvar=False) | |
| upper_tri = np.triu(corr_matrix, k=1) | |
| to_keep = np.ones((n_features_preselected,), dtype=bool) | |
| for i in range(upper_tri.shape[0]): | |
| for j in range(upper_tri.shape[1]): | |
| if upper_tri[i, j] > max_corr: | |
| to_keep[j] = False | |
| feature_selection = feature_selection[to_keep] | |
| return feature_selection | |
| def get_tox21_split(token, cvfold=None): | |
| ds = load_dataset("tschouis/tox21", token=token) | |
| train_df = ds["train"].to_pandas() | |
| val_df = ds["validation"].to_pandas() | |
| if cvfold is None: | |
| return {"train": train_df, "validation": val_df} | |
| combined_df = pd.concat([train_df, val_df], ignore_index=True) | |
| cvfold = float(cvfold) | |
| # create new splits | |
| cvfold = float(cvfold) | |
| train_df = combined_df[combined_df.CVfold != cvfold] | |
| val_df = combined_df[combined_df.CVfold == cvfold] | |
| # exclude train mols that occur in the validation split | |
| val_inchikeys = set(val_df["inchikey"]) | |
| train_df = train_df[~train_df["inchikey"].isin(val_inchikeys)] | |
| return { | |
| "train": train_df.reset_index(drop=True), | |
| "validation": val_df.reset_index(drop=True), | |
| } | |