# -*- coding: utf-8 """Python wrapper for CDK descriptors and fingerprints""" from __future__ import annotations import io import multiprocessing import os import subprocess import warnings from copy import deepcopy from enum import Enum, auto from subprocess import Popen, PIPE from typing import Iterable, List, Optional import more_itertools import numpy as np import pandas as pd from bounded_pool_executor import BoundedProcessPoolExecutor from rdkit import Chem from rdkit.rdBase import BlockLogs from .utils import install_java, mktempfile, needsHs class FPType(Enum): FP = auto() ExtFP = auto() EStateFP = auto() GraphFP = auto() MACCSFP = auto() PubchemFP = auto() SubFP = auto() KRFP = auto() AP2DFP = auto() HybridFP = auto() LingoFP = auto() SPFP = auto() SigFP = auto() CircFP = auto() class CDK: """Wrapper to obtain molecular descriptor from CDK.""" lock = multiprocessing.RLock() # Ensure installation of JRE is thread safe # Path to the JAR file _jarfile = os.path.abspath(os.path.join(__file__, os.pardir, 'CDKdesc.jar')) def __init__(self, ignore_3D: bool = True, fingerprint: FPType = None, nbits: int = 1024, depth: int = 6, backend_smiles: bool = False): """Instantiate a wrapper to calculate CDK molecular descriptors or a fingerprint. :param ignore_3D: whether to include 3D molecular descriptors :param fingerprint: a fingerprint type to be calculated (default: None, calculates descriptors) :param nbits: number of bits (default: 1024 unless the fingerprint has a fixed size) :param depth: depth of the fingerprint (default: 6 unless the fingerprint does not depend on depth) :param backend_smiles: use SMILES as the interchange format to discuss with the CDKdesc backend; the default (i.e. backend_smiles=False) makes use of the V2000 SD format; is ignored if ignore_3D=True. """ # Ensure the jar file exists if not os.path.isfile(self._jarfile): raise IOError('The required CDKdesc JAR file is not present. Reinstall CDK-pywrapper.') if fingerprint is not None: if not isinstance(fingerprint, FPType): raise TypeError(f'Fingerprint type not supported: {fingerprint}') self.include_3D = not ignore_3D self.fingerprint = None if fingerprint is None else fingerprint.name self.nbits = nbits self.depth = depth self.backend_smiles = backend_smiles and ignore_3D # if include_3D, then always False def calculate(self, mols: List[Chem.Mol], show_banner: bool = True, cdk_smiles: bool = False, njobs: int = 1, chunksize: Optional[int] = 1000) -> pd.DataFrame: """Calculate molecular fingerprints. :param mols: RDKit molecules for which descriptors/fingerprints should be calculated (must have 3D conformers if calculating descriptors) :param show_banner: If True, show notice on this package usage :param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK :param njobs: number of concurrent processes :param chunksize: number of molecules to be processed by a process; ignored if njobs is 1 :return: a pandas DataFrame containing all CDK descriptor/fingerprint values """ if show_banner: self._show_banner() if njobs < 0: njobs = os.cpu_count() - njobs + 1 # Parallelize should need be if njobs > 1: with BoundedProcessPoolExecutor(max_workers=njobs) as worker: futures = [worker.submit(self._multiproc_calculate, list(chunk), cdk_smiles) for chunk in more_itertools.batched(mols, chunksize) ] return (pd.concat([future.result() for future in futures]). reset_index(drop=True) ) # Single process return self._calculate(list(mols), cdk_smiles) def _show_banner(self): """Print info message for citing.""" print("""The Chemistry Development Kit (CDK) is a collection of modular Java libraries for processing chemical information (Cheminformatics). It can compute 14 different fingerprint types and 287 molecular descriptors (it requires 3D molecular structures for the latter). ################################### Should you publish results based on the PaDEL descriptors, please cite: Willighagen et al., (2017) J. Cheminf. 9(3), doi:10.1186/s13321-017-0220-4, May and Steinbeck., (2014) J. Cheminf., doi:10.1186/1758-2946-6-3, Steinbeck et al., (2006) Curr. Pharm. Des. 12(17):2111-2120, doi:10.2174/138161206777585274, Steinbeck et al., (2003) J. Chem. Inf. Comput. Sci. 43(2):493-500, doi:10.1021/ci025584y. ################################### """) def _prepare_command(self, mols: List[Chem.Mol], cdk_smiles: bool = False) -> str: """Create the CDK command to be run to obtain molecular descriptors. :param mols: molecules to obtained molecular descriptors of :param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK :return: The command to run. """ # 1) Ensure JRE is accessible with self.lock: self._java_path = install_java(19) # 2) Create temp SD v2k or SMILES file self._tmp_input = mktempfile('molecules_.smi') if self.backend_smiles else mktempfile('molecules_v2k.sd') self._n_mols = 0 self._skipped = [] self.n = 0 try: block = BlockLogs() if self.backend_smiles: writer = Chem.SmilesWriter(self._tmp_input, includeHeader=False, isomericSmiles=True, kekuleSmiles=True) else: writer = Chem.SDWriter(self._tmp_input) # Ensure V2000 as CDK cannot properly process v3000 writer.SetForceV3000(False) for i, mol in enumerate(mols): if mol is not None and isinstance(mol, Chem.Mol): if not self.backend_smiles and mol.GetNumAtoms() > 999: raise ValueError('Cannot calculate descriptors for molecules with more than 999 atoms.') # Does molecule lack hydrogen atoms? if needsHs(mol): warnings.warn('Molecule lacks hydrogen atoms: this might affect the value of calculated descriptors') # Are molecules 3D if self.include_3D: confs = list(mol.GetConformers()) if self.fingerprint is None and not (len(confs) > 0 and confs[-1].Is3D()): raise ValueError('Cannot calculate the 3D descriptors of a conformer-less molecule') writer.write(mol) self._n_mols += 1 else: self._skipped.append(i) self.n += 1 writer.close() del block except ValueError as e: # Free resources and raise error writer.close() del block os.remove(self._tmp_input) raise e from None # 3) Create command java_path = install_java(19) command_parameters = (f"-f {self.fingerprint} -nBits {self.nbits} " f"-depth {self.depth}") if self.fingerprint is not None else "" command_file = f"-{'s' if self.backend_smiles else 'i'} {self._tmp_input}" command_out_smiles = "-S" if cdk_smiles else "" command = f"{java_path} -jar {self._jarfile} {command_file} {command_parameters} {command_out_smiles}" return command def _cleanup(self) -> None: """Cleanup resources used for calculation.""" # Remove temporary files os.remove(self._tmp_input) def _run_command(self, command: str) -> pd.DataFrame: """Run the CDK command. :param command: The command to be run. """ with Popen(command.split(), stdout=PIPE, stderr=subprocess.DEVNULL) as process: values = process.stdout.read().decode() # CDK barf preventing correct parsing if 'not found' in values: # Omit error values = '\n'.join(line for line in values.split('\n') if 'not found' not in line) # Empty result file if len(values) == 0: details = self.get_details() values = pd.DataFrame(np.full((self._n_mols, details.shape[0]), np.nan), columns=details.Name) elif '{' not in values: if self.fingerprint is None: values = values.split('\n') # Ensure all columns are present in the header values[0] = (f'{"SMILES " if values[0].startswith("SMILES") else ""}' 'Fsp3 nSmallRings nAromRings nRingBlocks nAromBlocks nRings3 nRings4 nRings5 nRings6 ' 'nRings7 nRings8 nRings9 tpsaEfficiency Zagreb XLogP WPATH WPOL Wlambda1.unity Wlambda2.unity ' 'Wlambda3.unity Wnu1.unity Wnu2.unity Wgamma1.unity Wgamma2.unity Wgamma3.unity Weta1.unity ' 'Weta2.unity Weta3.unity WT.unity WA.unity WV.unity WK.unity WG.unity WD.unity WTPT-1 WTPT-2 ' 'WTPT-3 WTPT-4 WTPT-5 MW VAdjMat VABC TopoPSA LipinskiFailures nRotB topoShape geomShape ' 'PetitjeanNumber MOMI-X MOMI-Y MOMI-Z MOMI-XY MOMI-XZ MOMI-YZ MOMI-R MDEC-11 MDEC-12 MDEC-13 ' 'MDEC-14 MDEC-22 MDEC-23 MDEC-24 MDEC-33 MDEC-34 MDEC-44 MDEO-11 MDEO-12 MDEO-22 MDEN-11 ' 'MDEN-12 MDEN-13 MDEN-22 MDEN-23 MDEN-33 MLogP nAtomLAC LOBMAX LOBMIN nAtomP nAtomLC khs.sLi ' 'khs.ssBe khs.ssssBe khs.ssBH khs.sssB khs.ssssB khs.sCH3 khs.dCH2 khs.ssCH2 khs.tCH khs.dsCH ' 'khs.aaCH khs.sssCH khs.ddC khs.tsC khs.dssC khs.aasC khs.aaaC khs.ssssC khs.sNH3 khs.sNH2 ' 'khs.ssNH2 khs.dNH khs.ssNH khs.aaNH khs.tN khs.sssNH khs.dsN khs.aaN khs.sssN khs.ddsN ' 'khs.aasN khs.ssssN khs.sOH khs.dO khs.ssO khs.aaO khs.sF khs.sSiH3 khs.ssSiH2 khs.sssSiH ' 'khs.ssssSi khs.sPH2 khs.ssPH khs.sssP khs.dsssP khs.sssssP khs.sSH khs.dS khs.ssS khs.aaS ' 'khs.dssS khs.ddssS khs.sCl khs.sGeH3 khs.ssGeH2 khs.sssGeH khs.ssssGe khs.sAsH2 khs.ssAsH ' 'khs.sssAs khs.sssdAs khs.sssssAs khs.sSeH khs.dSe khs.ssSe khs.aaSe khs.dssSe khs.ddssSe ' 'khs.sBr khs.sSnH3 khs.ssSnH2 khs.sssSnH khs.ssssSn khs.sI khs.sPbH3 khs.ssPbH2 khs.sssPbH ' 'khs.ssssPb Kier1 Kier2 Kier3 HybRatio nHBDon nHBAcc GRAV-1 GRAV-2 GRAV-3 GRAVH-1 GRAVH-2 ' 'GRAVH-3 GRAV-4 GRAV-5 GRAV-6 fragC FMF ECCEN PPSA-1 PPSA-2 PPSA-3 PNSA-1 PNSA-2 PNSA-3 ' 'DPSA-1 DPSA-2 DPSA-3 FPSA-1 FPSA-2 FPSA-3 FNSA-1 FNSA-2 FNSA-3 WPSA-1 WPSA-2 WPSA-3 WNSA-1 ' 'WNSA-2 WNSA-3 RPCG RNCG RPCS RNCS THSA TPSA RHSA RPSA SP-0 SP-1 SP-2 SP-3 SP-4 SP-5 SP-6 ' 'SP-7 VP-0 VP-1 VP-2 VP-3 VP-4 VP-5 VP-6 VP-7 SPC-4 SPC-5 SPC-6 VPC-4 VPC-5 VPC-6 SC-3 SC-4 ' 'SC-5 SC-6 VC-3 VC-4 VC-5 VC-6 SCH-3 SCH-4 SCH-5 SCH-6 SCH-7 VCH-3 VCH-4 VCH-5 VCH-6 VCH-7 ' 'C1SP1 C2SP1 C1SP2 C2SP2 C3SP2 C1SP3 C2SP3 C3SP3 C4SP3 bpol nB BCUTw-1l BCUTw-1h BCUTc-1l ' 'BCUTc-1h BCUTp-1l BCUTp-1h nBase ATSp1 ATSp2 ATSp3 ATSp4 ATSp5 ATSm1 ATSm2 ATSm3 ATSm4 ' 'ATSm5 ATSc1 ATSc2 ATSc3 ATSc4 ATSc5 nAtom nAromBond naAromAtom apol ALogP ALogp2 AMR nAcid ' 'JPLogP') # CDK uses uppercase exponents but pandas needs a lowercase values = '\n'.join([values[0]] + [line.replace('E', 'e') for line in values[1:]]) # Parse with pandas values = pd.read_csv(io.StringIO(values), sep=' ') else: try: values = pd.DataFrame.from_dict(eval('{%s}' % values), orient='index').fillna(0) # Separate SMILES when calculated if isinstance(values.index[0], str) and values.index.str.contains('|').any(): smiles = pd.Series(values.index.str.split('|').str[1], name='SMILES').reset_index(drop=True) values = (pd.concat([smiles, values.reset_index(drop=True)], axis=1) .reset_index(drop=True)) except pd.errors.EmptyDataError: raise RuntimeError('CDK could not obtain molecular descriptors, maybe due to a faulty molecule') # If only 2D, remove 3D descriptors if not self.include_3D and self.fingerprint is None: # Get 3D descriptor names to remove descs_3D = self.get_details() descs_3D = descs_3D[descs_3D.Dimensions == '3D'] values = values[[col for col in values.columns if col not in descs_3D.Name.tolist()]] return values def _calculate(self, mols: List[Chem.Mol], cdk_smiles: bool = False) -> pd.DataFrame: """Calculate CDK molecular descriptors on one process. :param mols: RDKit molecules for which CDK descriptors and fingerprints should be calculated. :param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK :return: a pandas DataFrame containing CDK descriptor values """ # Prepare inputs command = self._prepare_command(mols, cdk_smiles) # Run command and obtain results results = self._run_command(command) # Cleanup self._cleanup() # Insert lines of skipped molecules if len(self._skipped): results = pd.DataFrame(np.insert(results.values, self._skipped, values=[np.NaN] * len(results.columns), axis=0), columns=results.columns) # Omit SMILES column from casting if in values if cdk_smiles: smiles_col = results['SMILES'] results = results.drop(columns=['SMILES']) results = (results.apply(pd.to_numeric, errors='coerce', axis=1) ) # Insert SMILES column back if cdk_smiles: results = pd.concat([smiles_col, results], axis=1) return results def _multiproc_calculate(self, mols: List[Chem.Mol], cdk_smiles: bool = False) -> pd.DataFrame: """Calculate CDK descriptors and fingerprints in thread-safe manner. :param mols: RDKit molecules for which CDK descriptors and fingerprints should be calculated :param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK :return: a pandas DataFrame containing all CDK descriptor values """ # Copy self instance to make thread safe cdk = deepcopy(self) # Run copy result = cdk.calculate(mols, show_banner=False, njobs=1, cdk_smiles=cdk_smiles) return result @staticmethod def get_details(desc_name: Optional[str] = None): """Obtain details about either one or all descriptors. :param desc_name: the name of the descriptor to obtain details about (default: None). If None, returns details about all descriptors. """ details = pd.read_json(os.path.abspath(os.path.join(__file__, os.pardir, 'descs.json')), orient='index') if desc_name is not None: if desc_name not in details.Name.tolist(): raise ValueError(f'descriptor name {desc_name} is not available') details = details[details.Name == desc_name] return details