Spaces:
Runtime error
Runtime error
| import logging | |
| import shutil | |
| import sys | |
| import textwrap | |
| import xmlrpc.client | |
| from collections import OrderedDict | |
| from optparse import Values | |
| from typing import TYPE_CHECKING, Dict, List, Optional | |
| from pip._vendor.packaging.version import parse as parse_version | |
| from pip._internal.cli.base_command import Command | |
| from pip._internal.cli.req_command import SessionCommandMixin | |
| from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS | |
| from pip._internal.exceptions import CommandError | |
| from pip._internal.metadata import get_default_environment | |
| from pip._internal.models.index import PyPI | |
| from pip._internal.network.xmlrpc import PipXmlrpcTransport | |
| from pip._internal.utils.logging import indent_log | |
| from pip._internal.utils.misc import write_output | |
| if TYPE_CHECKING: | |
| from typing import TypedDict | |
| class TransformedHit(TypedDict): | |
| name: str | |
| summary: str | |
| versions: List[str] | |
| logger = logging.getLogger(__name__) | |
| class SearchCommand(Command, SessionCommandMixin): | |
| """Search for PyPI packages whose name or summary contains <query>.""" | |
| usage = """ | |
| %prog [options] <query>""" | |
| ignore_require_venv = True | |
| def add_options(self) -> None: | |
| self.cmd_opts.add_option( | |
| '-i', '--index', | |
| dest='index', | |
| metavar='URL', | |
| default=PyPI.pypi_url, | |
| help='Base URL of Python Package Index (default %default)') | |
| self.parser.insert_option_group(0, self.cmd_opts) | |
| def run(self, options: Values, args: List[str]) -> int: | |
| if not args: | |
| raise CommandError('Missing required argument (search query).') | |
| query = args | |
| pypi_hits = self.search(query, options) | |
| hits = transform_hits(pypi_hits) | |
| terminal_width = None | |
| if sys.stdout.isatty(): | |
| terminal_width = shutil.get_terminal_size()[0] | |
| print_results(hits, terminal_width=terminal_width) | |
| if pypi_hits: | |
| return SUCCESS | |
| return NO_MATCHES_FOUND | |
| def search(self, query: List[str], options: Values) -> List[Dict[str, str]]: | |
| index_url = options.index | |
| session = self.get_default_session(options) | |
| transport = PipXmlrpcTransport(index_url, session) | |
| pypi = xmlrpc.client.ServerProxy(index_url, transport) | |
| try: | |
| hits = pypi.search({'name': query, 'summary': query}, 'or') | |
| except xmlrpc.client.Fault as fault: | |
| message = "XMLRPC request failed [code: {code}]\n{string}".format( | |
| code=fault.faultCode, | |
| string=fault.faultString, | |
| ) | |
| raise CommandError(message) | |
| assert isinstance(hits, list) | |
| return hits | |
| def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]: | |
| """ | |
| The list from pypi is really a list of versions. We want a list of | |
| packages with the list of versions stored inline. This converts the | |
| list from pypi into one we can use. | |
| """ | |
| packages: Dict[str, "TransformedHit"] = OrderedDict() | |
| for hit in hits: | |
| name = hit['name'] | |
| summary = hit['summary'] | |
| version = hit['version'] | |
| if name not in packages.keys(): | |
| packages[name] = { | |
| 'name': name, | |
| 'summary': summary, | |
| 'versions': [version], | |
| } | |
| else: | |
| packages[name]['versions'].append(version) | |
| # if this is the highest version, replace summary and score | |
| if version == highest_version(packages[name]['versions']): | |
| packages[name]['summary'] = summary | |
| return list(packages.values()) | |
| def print_dist_installation_info(name: str, latest: str) -> None: | |
| env = get_default_environment() | |
| dist = env.get_distribution(name) | |
| if dist is not None: | |
| with indent_log(): | |
| if dist.version == latest: | |
| write_output('INSTALLED: %s (latest)', dist.version) | |
| else: | |
| write_output('INSTALLED: %s', dist.version) | |
| if parse_version(latest).pre: | |
| write_output('LATEST: %s (pre-release; install' | |
| ' with "pip install --pre")', latest) | |
| else: | |
| write_output('LATEST: %s', latest) | |
| def print_results( | |
| hits: List["TransformedHit"], | |
| name_column_width: Optional[int] = None, | |
| terminal_width: Optional[int] = None, | |
| ) -> None: | |
| if not hits: | |
| return | |
| if name_column_width is None: | |
| name_column_width = max([ | |
| len(hit['name']) + len(highest_version(hit.get('versions', ['-']))) | |
| for hit in hits | |
| ]) + 4 | |
| for hit in hits: | |
| name = hit['name'] | |
| summary = hit['summary'] or '' | |
| latest = highest_version(hit.get('versions', ['-'])) | |
| if terminal_width is not None: | |
| target_width = terminal_width - name_column_width - 5 | |
| if target_width > 10: | |
| # wrap and indent summary to fit terminal | |
| summary_lines = textwrap.wrap(summary, target_width) | |
| summary = ('\n' + ' ' * (name_column_width + 3)).join( | |
| summary_lines) | |
| name_latest = f'{name} ({latest})' | |
| line = f'{name_latest:{name_column_width}} - {summary}' | |
| try: | |
| write_output(line) | |
| print_dist_installation_info(name, latest) | |
| except UnicodeEncodeError: | |
| pass | |
| def highest_version(versions: List[str]) -> str: | |
| return max(versions, key=parse_version) | |