# pipeline taken from https://huggingface.co/spaces/ml-jku/mhnfs/blob/main/src/data_preprocessing/create_descriptors.py """ This files includes a the data processing for Tox21. As an input it takes a list of SMILES and it outputs a nested dictionary with SMILES and target names as keys. """ import json import numpy as np import pandas as pd from datasets import load_dataset from sklearn.feature_selection import VarianceThreshold from statsmodels.distributions.empirical_distribution import ECDF from rdkit import Chem, DataStructs from rdkit.Chem import Descriptors, rdFingerprintGenerator, MACCSkeys from rdkit.Chem.rdchem import Mol from .utils import ( USED_200_DESCR, TOX_SMARTS_PATH, Standardizer, ) def create_cleaned_mol_objects(smiles: list[str]) -> tuple[list[Mol], np.ndarray]: """This function creates cleaned RDKit mol objects from a list of SMILES. Args: smiles (list[str]): list of SMILES Returns: list[Mol]: list of cleaned molecules np.ndarray[bool]: mask that contains False at index `i`, if molecule in `smiles` at index `i` could not be cleaned and was removed. """ sm = Standardizer(canon_taut=True) clean_mol_mask = list() mols = list() for i, smile in enumerate(smiles): mol = Chem.MolFromSmiles(smile) standardized_mol, _ = sm.standardize_mol(mol) is_cleaned = standardized_mol is not None clean_mol_mask.append(is_cleaned) if not is_cleaned: continue can_mol = Chem.MolFromSmiles(Chem.MolToSmiles(standardized_mol)) mols.append(can_mol) return mols, np.array(clean_mol_mask) def create_ecfp_fps(mols: list[Mol], radius=3, fpsize=2048, **kwargs) -> np.ndarray: """This function ECFP fingerprints for a list of molecules. Args: mols (list[Mol]): list of molecules Returns: np.ndarray: ECFP fingerprints of molecules """ ecfps = list() for mol in mols: gen = rdFingerprintGenerator.GetMorganGenerator( countSimulation=True, fpSize=fpsize, radius=radius ) fp_sparse_vec = gen.GetCountFingerprint(mol) fp = np.zeros((0,), np.int8) DataStructs.ConvertToNumpyArray(fp_sparse_vec, fp) ecfps.append(fp) return np.array(ecfps) def create_maccs_keys(mols: list[Mol]) -> np.ndarray: maccs = [MACCSkeys.GenMACCSKeys(x) for x in mols] return np.array(maccs) def get_tox_patterns(filepath: str): """This calculates tox features defined in tox_smarts.json. Args: mols: A list of Mol n_jobs: If >1 multiprocessing is used """ # load patterns with open(filepath) as f: smarts_list = [s[1] for s in json.load(f)] # Code does not work for this case assert len([s for s in smarts_list if ("AND" in s) and ("OR" in s)]) == 0 # Chem.MolFromSmarts takes a long time so it pays of to parse all the smarts first # and then use them for all molecules. This gives a huge speedup over existing code. # a list of patterns, whether to negate the match result and how to join them to obtain one boolean value all_patterns = [] for smarts in smarts_list: patterns = [] # list of smarts-patterns # value for each of the patterns above. Negates the values of the above later. negations = [] if " AND " in smarts: smarts = smarts.split(" AND ") merge_any = False # If an ' AND ' is found all 'subsmarts' have to match else: # If there is an ' OR ' present it's enough is any of the 'subsmarts' match. # This also accumulates smarts where neither ' OR ' nor ' AND ' occur smarts = smarts.split(" OR ") merge_any = True # for all subsmarts check if they are preceded by 'NOT ' for s in smarts: neg = s.startswith("NOT ") if neg: s = s[4:] patterns.append(Chem.MolFromSmarts(s)) negations.append(neg) all_patterns.append((patterns, negations, merge_any)) return all_patterns def create_tox_features(mols: list[Mol], patterns: list) -> np.ndarray: """Matches the tox patterns against a molecule. Returns a boolean array""" tox_data = [] for mol in mols: mol_features = [] for patts, negations, merge_any in patterns: matches = [mol.HasSubstructMatch(p) for p in patts] matches = [m != n for m, n in zip(matches, negations)] if merge_any: pres = any(matches) else: pres = all(matches) mol_features.append(pres) tox_data.append(np.array(mol_features)) return np.array(tox_data) def create_rdkit_descriptors(mols: list[Mol]) -> np.ndarray: """This function creates RDKit descriptors for a list of molecules. Args: mols (list[Mol]): list of molecules Returns: np.ndarray: RDKit descriptors of molecules """ rdkit_descriptors = list() for mol in mols: descrs = [] for _, descr_calc_fn in Descriptors._descList: descrs.append(descr_calc_fn(mol)) descrs = np.array(descrs) descrs = descrs[USED_200_DESCR] rdkit_descriptors.append(descrs) return np.array(rdkit_descriptors) def create_quantiles(raw_features: np.ndarray, ecdfs: list) -> np.ndarray: """Create quantile values for given features using the columns Args: raw_features (np.ndarray): values to put into quantiles ecdfs (list): ECDFs to use Returns: np.ndarray: computed quantiles """ quantiles = np.zeros_like(raw_features) for column in range(raw_features.shape[1]): raw_values = raw_features[:, column].reshape(-1) ecdf = ecdfs[column] q = ecdf(raw_values) quantiles[:, column] = q return quantiles def fill(features, mask, value=np.nan): n_mols = len(mask) n_features = features.shape[1] data = np.zeros(shape=(n_mols, n_features)) data.fill(value) data[~mask] = features return data def create_descriptors( smiles, ecdfs=None, feature_selection=None, return_ecdfs=False, return_feature_selection=False, **kwargs, ): # Create cleanded rdkit mol objects mols, clean_mol_mask = create_cleaned_mol_objects(smiles) print("Cleaned molecules") tox_patterns = get_tox_patterns(TOX_SMARTS_PATH) # Create fingerprints and descriptors ecfps = create_ecfp_fps(mols, **kwargs) # expand using mol_mask ecfps = fill(ecfps, ~clean_mol_mask) print("Created ECFP fingerprints") tox = create_tox_features(mols, tox_patterns) tox = fill(tox, ~clean_mol_mask) print("Created Tox features") # Create and save feature selection for ecfps and tox if feature_selection is None: print("Create Feature selection") ecfps_selec = get_feature_selection(ecfps, **kwargs) tox_selec = get_feature_selection(tox, **kwargs) feature_selection = {"ecfps_selec": ecfps_selec, "tox_selec": tox_selec} else: ecfps_selec = feature_selection["ecfps_selec"] tox_selec = feature_selection["tox_selec"] ecfps = ecfps[:, ecfps_selec] tox = tox[:, tox_selec] maccs = create_maccs_keys(mols) maccs = fill(maccs, ~clean_mol_mask) print("Created MACCS keys") rdkit_descrs = create_rdkit_descriptors(mols) print("Created RDKit descriptors") # Create and save ecdfs if ecdfs is None: print("Create ECDFs") ecdfs = [] for column in range(rdkit_descrs.shape[1]): raw_values = rdkit_descrs[:, column].reshape(-1) ecdfs.append(ECDF(raw_values)) # Create quantiles rdkit_descr_quantiles = create_quantiles(rdkit_descrs, ecdfs) # expand using mol_mask rdkit_descr_quantiles = fill(rdkit_descr_quantiles, ~clean_mol_mask) print("Created quantiles of RDKit descriptors") # concatenate features features = { "ecfps": ecfps, "tox": tox, "maccs": maccs, "rdkit_descr_quantiles": rdkit_descr_quantiles, } return_dict = {"features": features} if return_ecdfs: return_dict["ecdfs"] = ecdfs if return_feature_selection: return_dict["feature_selection"] = feature_selection return return_dict def get_feature_selection( raw_features: np.ndarray, min_var=0.01, max_corr=0.95, **kwargs ) -> np.ndarray: # select features with at least min_var variation var_thresh = VarianceThreshold(threshold=min_var) feature_selection = var_thresh.fit(raw_features).get_support(indices=True) n_features_preselected = len(feature_selection) # Remove highly correlated features corr_matrix = np.corrcoef(raw_features[:, feature_selection], rowvar=False) upper_tri = np.triu(corr_matrix, k=1) to_keep = np.ones((n_features_preselected,), dtype=bool) for i in range(upper_tri.shape[0]): for j in range(upper_tri.shape[1]): if upper_tri[i, j] > max_corr: to_keep[j] = False feature_selection = feature_selection[to_keep] return feature_selection def get_tox21_split(token, cvfold=None): ds = load_dataset("tschouis/tox21", token=token) train_df = ds["train"].to_pandas() val_df = ds["validation"].to_pandas() if cvfold is None: return {"train": train_df, "validation": val_df} combined_df = pd.concat([train_df, val_df], ignore_index=True) cvfold = float(cvfold) # create new splits cvfold = float(cvfold) train_df = combined_df[combined_df.CVfold != cvfold] val_df = combined_df[combined_df.CVfold == cvfold] # exclude train mols that occur in the validation split val_inchikeys = set(val_df["inchikey"]) train_df = train_df[~train_df["inchikey"].isin(val_inchikeys)] return { "train": train_df.reset_index(drop=True), "validation": val_df.reset_index(drop=True), }