Spaces:
Paused
Paused
| # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
| # Copyright (C) 2003-2017 Nominum, Inc. | |
| # | |
| # Permission to use, copy, modify, and distribute this software and its | |
| # documentation for any purpose with or without fee is hereby granted, | |
| # provided that the above copyright notice and this permission notice | |
| # appear in all copies. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES | |
| # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
| # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR | |
| # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
| # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
| # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
| # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
| """Asynchronous DNS stub resolver.""" | |
| import socket | |
| import time | |
| from typing import Any, Dict, List, Optional, Union | |
| import dns._ddr | |
| import dns.asyncbackend | |
| import dns.asyncquery | |
| import dns.exception | |
| import dns.name | |
| import dns.query | |
| import dns.rdataclass | |
| import dns.rdatatype | |
| import dns.resolver # lgtm[py/import-and-import-from] | |
| # import some resolver symbols for brevity | |
| from dns.resolver import NXDOMAIN, NoAnswer, NoRootSOA, NotAbsolute | |
| # for indentation purposes below | |
| _udp = dns.asyncquery.udp | |
| _tcp = dns.asyncquery.tcp | |
| class Resolver(dns.resolver.BaseResolver): | |
| """Asynchronous DNS stub resolver.""" | |
| async def resolve( | |
| self, | |
| qname: Union[dns.name.Name, str], | |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| source: Optional[str] = None, | |
| raise_on_no_answer: bool = True, | |
| source_port: int = 0, | |
| lifetime: Optional[float] = None, | |
| search: Optional[bool] = None, | |
| backend: Optional[dns.asyncbackend.Backend] = None, | |
| ) -> dns.resolver.Answer: | |
| """Query nameservers asynchronously to find the answer to the question. | |
| *backend*, a ``dns.asyncbackend.Backend``, or ``None``. If ``None``, | |
| the default, then dnspython will use the default backend. | |
| See :py:func:`dns.resolver.Resolver.resolve()` for the | |
| documentation of the other parameters, exceptions, and return | |
| type of this method. | |
| """ | |
| resolution = dns.resolver._Resolution( | |
| self, qname, rdtype, rdclass, tcp, raise_on_no_answer, search | |
| ) | |
| if not backend: | |
| backend = dns.asyncbackend.get_default_backend() | |
| start = time.time() | |
| while True: | |
| (request, answer) = resolution.next_request() | |
| # Note we need to say "if answer is not None" and not just | |
| # "if answer" because answer implements __len__, and python | |
| # will call that. We want to return if we have an answer | |
| # object, including in cases where its length is 0. | |
| if answer is not None: | |
| # cache hit! | |
| return answer | |
| assert request is not None # needed for type checking | |
| done = False | |
| while not done: | |
| (nameserver, tcp, backoff) = resolution.next_nameserver() | |
| if backoff: | |
| await backend.sleep(backoff) | |
| timeout = self._compute_timeout(start, lifetime, resolution.errors) | |
| try: | |
| response = await nameserver.async_query( | |
| request, | |
| timeout=timeout, | |
| source=source, | |
| source_port=source_port, | |
| max_size=tcp, | |
| backend=backend, | |
| ) | |
| except Exception as ex: | |
| (_, done) = resolution.query_result(None, ex) | |
| continue | |
| (answer, done) = resolution.query_result(response, None) | |
| # Note we need to say "if answer is not None" and not just | |
| # "if answer" because answer implements __len__, and python | |
| # will call that. We want to return if we have an answer | |
| # object, including in cases where its length is 0. | |
| if answer is not None: | |
| return answer | |
| async def resolve_address( | |
| self, ipaddr: str, *args: Any, **kwargs: Any | |
| ) -> dns.resolver.Answer: | |
| """Use an asynchronous resolver to run a reverse query for PTR | |
| records. | |
| This utilizes the resolve() method to perform a PTR lookup on the | |
| specified IP address. | |
| *ipaddr*, a ``str``, the IPv4 or IPv6 address you want to get | |
| the PTR record for. | |
| All other arguments that can be passed to the resolve() function | |
| except for rdtype and rdclass are also supported by this | |
| function. | |
| """ | |
| # We make a modified kwargs for type checking happiness, as otherwise | |
| # we get a legit warning about possibly having rdtype and rdclass | |
| # in the kwargs more than once. | |
| modified_kwargs: Dict[str, Any] = {} | |
| modified_kwargs.update(kwargs) | |
| modified_kwargs["rdtype"] = dns.rdatatype.PTR | |
| modified_kwargs["rdclass"] = dns.rdataclass.IN | |
| return await self.resolve( | |
| dns.reversename.from_address(ipaddr), *args, **modified_kwargs | |
| ) | |
| async def resolve_name( | |
| self, | |
| name: Union[dns.name.Name, str], | |
| family: int = socket.AF_UNSPEC, | |
| **kwargs: Any, | |
| ) -> dns.resolver.HostAnswers: | |
| """Use an asynchronous resolver to query for address records. | |
| This utilizes the resolve() method to perform A and/or AAAA lookups on | |
| the specified name. | |
| *qname*, a ``dns.name.Name`` or ``str``, the name to resolve. | |
| *family*, an ``int``, the address family. If socket.AF_UNSPEC | |
| (the default), both A and AAAA records will be retrieved. | |
| All other arguments that can be passed to the resolve() function | |
| except for rdtype and rdclass are also supported by this | |
| function. | |
| """ | |
| # We make a modified kwargs for type checking happiness, as otherwise | |
| # we get a legit warning about possibly having rdtype and rdclass | |
| # in the kwargs more than once. | |
| modified_kwargs: Dict[str, Any] = {} | |
| modified_kwargs.update(kwargs) | |
| modified_kwargs.pop("rdtype", None) | |
| modified_kwargs["rdclass"] = dns.rdataclass.IN | |
| if family == socket.AF_INET: | |
| v4 = await self.resolve(name, dns.rdatatype.A, **modified_kwargs) | |
| return dns.resolver.HostAnswers.make(v4=v4) | |
| elif family == socket.AF_INET6: | |
| v6 = await self.resolve(name, dns.rdatatype.AAAA, **modified_kwargs) | |
| return dns.resolver.HostAnswers.make(v6=v6) | |
| elif family != socket.AF_UNSPEC: | |
| raise NotImplementedError(f"unknown address family {family}") | |
| raise_on_no_answer = modified_kwargs.pop("raise_on_no_answer", True) | |
| lifetime = modified_kwargs.pop("lifetime", None) | |
| start = time.time() | |
| v6 = await self.resolve( | |
| name, | |
| dns.rdatatype.AAAA, | |
| raise_on_no_answer=False, | |
| lifetime=self._compute_timeout(start, lifetime), | |
| **modified_kwargs, | |
| ) | |
| # Note that setting name ensures we query the same name | |
| # for A as we did for AAAA. (This is just in case search lists | |
| # are active by default in the resolver configuration and | |
| # we might be talking to a server that says NXDOMAIN when it | |
| # wants to say NOERROR no data. | |
| name = v6.qname | |
| v4 = await self.resolve( | |
| name, | |
| dns.rdatatype.A, | |
| raise_on_no_answer=False, | |
| lifetime=self._compute_timeout(start, lifetime), | |
| **modified_kwargs, | |
| ) | |
| answers = dns.resolver.HostAnswers.make( | |
| v6=v6, v4=v4, add_empty=not raise_on_no_answer | |
| ) | |
| if not answers: | |
| raise NoAnswer(response=v6.response) | |
| return answers | |
| # pylint: disable=redefined-outer-name | |
| async def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name: | |
| """Determine the canonical name of *name*. | |
| The canonical name is the name the resolver uses for queries | |
| after all CNAME and DNAME renamings have been applied. | |
| *name*, a ``dns.name.Name`` or ``str``, the query name. | |
| This method can raise any exception that ``resolve()`` can | |
| raise, other than ``dns.resolver.NoAnswer`` and | |
| ``dns.resolver.NXDOMAIN``. | |
| Returns a ``dns.name.Name``. | |
| """ | |
| try: | |
| answer = await self.resolve(name, raise_on_no_answer=False) | |
| canonical_name = answer.canonical_name | |
| except dns.resolver.NXDOMAIN as e: | |
| canonical_name = e.canonical_name | |
| return canonical_name | |
| async def try_ddr(self, lifetime: float = 5.0) -> None: | |
| """Try to update the resolver's nameservers using Discovery of Designated | |
| Resolvers (DDR). If successful, the resolver will subsequently use | |
| DNS-over-HTTPS or DNS-over-TLS for future queries. | |
| *lifetime*, a float, is the maximum time to spend attempting DDR. The default | |
| is 5 seconds. | |
| If the SVCB query is successful and results in a non-empty list of nameservers, | |
| then the resolver's nameservers are set to the returned servers in priority | |
| order. | |
| The current implementation does not use any address hints from the SVCB record, | |
| nor does it resolve addresses for the SCVB target name, rather it assumes that | |
| the bootstrap nameserver will always be one of the addresses and uses it. | |
| A future revision to the code may offer fuller support. The code verifies that | |
| the bootstrap nameserver is in the Subject Alternative Name field of the | |
| TLS certficate. | |
| """ | |
| try: | |
| expiration = time.time() + lifetime | |
| answer = await self.resolve( | |
| dns._ddr._local_resolver_name, "svcb", lifetime=lifetime | |
| ) | |
| timeout = dns.query._remaining(expiration) | |
| nameservers = await dns._ddr._get_nameservers_async(answer, timeout) | |
| if len(nameservers) > 0: | |
| self.nameservers = nameservers | |
| except Exception: | |
| pass | |
| default_resolver = None | |
| def get_default_resolver() -> Resolver: | |
| """Get the default asynchronous resolver, initializing it if necessary.""" | |
| if default_resolver is None: | |
| reset_default_resolver() | |
| assert default_resolver is not None | |
| return default_resolver | |
| def reset_default_resolver() -> None: | |
| """Re-initialize default asynchronous resolver. | |
| Note that the resolver configuration (i.e. /etc/resolv.conf on UNIX | |
| systems) will be re-read immediately. | |
| """ | |
| global default_resolver | |
| default_resolver = Resolver() | |
| async def resolve( | |
| qname: Union[dns.name.Name, str], | |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| source: Optional[str] = None, | |
| raise_on_no_answer: bool = True, | |
| source_port: int = 0, | |
| lifetime: Optional[float] = None, | |
| search: Optional[bool] = None, | |
| backend: Optional[dns.asyncbackend.Backend] = None, | |
| ) -> dns.resolver.Answer: | |
| """Query nameservers asynchronously to find the answer to the question. | |
| This is a convenience function that uses the default resolver | |
| object to make the query. | |
| See :py:func:`dns.asyncresolver.Resolver.resolve` for more | |
| information on the parameters. | |
| """ | |
| return await get_default_resolver().resolve( | |
| qname, | |
| rdtype, | |
| rdclass, | |
| tcp, | |
| source, | |
| raise_on_no_answer, | |
| source_port, | |
| lifetime, | |
| search, | |
| backend, | |
| ) | |
| async def resolve_address( | |
| ipaddr: str, *args: Any, **kwargs: Any | |
| ) -> dns.resolver.Answer: | |
| """Use a resolver to run a reverse query for PTR records. | |
| See :py:func:`dns.asyncresolver.Resolver.resolve_address` for more | |
| information on the parameters. | |
| """ | |
| return await get_default_resolver().resolve_address(ipaddr, *args, **kwargs) | |
| async def resolve_name( | |
| name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any | |
| ) -> dns.resolver.HostAnswers: | |
| """Use a resolver to asynchronously query for address records. | |
| See :py:func:`dns.asyncresolver.Resolver.resolve_name` for more | |
| information on the parameters. | |
| """ | |
| return await get_default_resolver().resolve_name(name, family, **kwargs) | |
| async def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name: | |
| """Determine the canonical name of *name*. | |
| See :py:func:`dns.resolver.Resolver.canonical_name` for more | |
| information on the parameters and possible exceptions. | |
| """ | |
| return await get_default_resolver().canonical_name(name) | |
| async def try_ddr(timeout: float = 5.0) -> None: | |
| """Try to update the default resolver's nameservers using Discovery of Designated | |
| Resolvers (DDR). If successful, the resolver will subsequently use | |
| DNS-over-HTTPS or DNS-over-TLS for future queries. | |
| See :py:func:`dns.resolver.Resolver.try_ddr` for more information. | |
| """ | |
| return await get_default_resolver().try_ddr(timeout) | |
| async def zone_for_name( | |
| name: Union[dns.name.Name, str], | |
| rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| resolver: Optional[Resolver] = None, | |
| backend: Optional[dns.asyncbackend.Backend] = None, | |
| ) -> dns.name.Name: | |
| """Find the name of the zone which contains the specified name. | |
| See :py:func:`dns.resolver.Resolver.zone_for_name` for more | |
| information on the parameters and possible exceptions. | |
| """ | |
| if isinstance(name, str): | |
| name = dns.name.from_text(name, dns.name.root) | |
| if resolver is None: | |
| resolver = get_default_resolver() | |
| if not name.is_absolute(): | |
| raise NotAbsolute(name) | |
| while True: | |
| try: | |
| answer = await resolver.resolve( | |
| name, dns.rdatatype.SOA, rdclass, tcp, backend=backend | |
| ) | |
| assert answer.rrset is not None | |
| if answer.rrset.name == name: | |
| return name | |
| # otherwise we were CNAMEd or DNAMEd and need to look higher | |
| except (NXDOMAIN, NoAnswer): | |
| pass | |
| try: | |
| name = name.parent() | |
| except dns.name.NoParent: # pragma: no cover | |
| raise NoRootSOA | |
| async def make_resolver_at( | |
| where: Union[dns.name.Name, str], | |
| port: int = 53, | |
| family: int = socket.AF_UNSPEC, | |
| resolver: Optional[Resolver] = None, | |
| ) -> Resolver: | |
| """Make a stub resolver using the specified destination as the full resolver. | |
| *where*, a ``dns.name.Name`` or ``str`` the domain name or IP address of the | |
| full resolver. | |
| *port*, an ``int``, the port to use. If not specified, the default is 53. | |
| *family*, an ``int``, the address family to use. This parameter is used if | |
| *where* is not an address. The default is ``socket.AF_UNSPEC`` in which case | |
| the first address returned by ``resolve_name()`` will be used, otherwise the | |
| first address of the specified family will be used. | |
| *resolver*, a ``dns.asyncresolver.Resolver`` or ``None``, the resolver to use for | |
| resolution of hostnames. If not specified, the default resolver will be used. | |
| Returns a ``dns.resolver.Resolver`` or raises an exception. | |
| """ | |
| if resolver is None: | |
| resolver = get_default_resolver() | |
| nameservers: List[Union[str, dns.nameserver.Nameserver]] = [] | |
| if isinstance(where, str) and dns.inet.is_address(where): | |
| nameservers.append(dns.nameserver.Do53Nameserver(where, port)) | |
| else: | |
| answers = await resolver.resolve_name(where, family) | |
| for address in answers.addresses(): | |
| nameservers.append(dns.nameserver.Do53Nameserver(address, port)) | |
| res = dns.asyncresolver.Resolver(configure=False) | |
| res.nameservers = nameservers | |
| return res | |
| async def resolve_at( | |
| where: Union[dns.name.Name, str], | |
| qname: Union[dns.name.Name, str], | |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| source: Optional[str] = None, | |
| raise_on_no_answer: bool = True, | |
| source_port: int = 0, | |
| lifetime: Optional[float] = None, | |
| search: Optional[bool] = None, | |
| backend: Optional[dns.asyncbackend.Backend] = None, | |
| port: int = 53, | |
| family: int = socket.AF_UNSPEC, | |
| resolver: Optional[Resolver] = None, | |
| ) -> dns.resolver.Answer: | |
| """Query nameservers to find the answer to the question. | |
| This is a convenience function that calls ``dns.asyncresolver.make_resolver_at()`` | |
| to make a resolver, and then uses it to resolve the query. | |
| See ``dns.asyncresolver.Resolver.resolve`` for more information on the resolution | |
| parameters, and ``dns.asyncresolver.make_resolver_at`` for information about the | |
| resolver parameters *where*, *port*, *family*, and *resolver*. | |
| If making more than one query, it is more efficient to call | |
| ``dns.asyncresolver.make_resolver_at()`` and then use that resolver for the queries | |
| instead of calling ``resolve_at()`` multiple times. | |
| """ | |
| res = await make_resolver_at(where, port, family, resolver) | |
| return await res.resolve( | |
| qname, | |
| rdtype, | |
| rdclass, | |
| tcp, | |
| source, | |
| raise_on_no_answer, | |
| source_port, | |
| lifetime, | |
| search, | |
| backend, | |
| ) | |