Spaces:
Paused
Paused
| # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
| # Copyright (C) 2003-2017 Nominum, Inc. | |
| # | |
| # Permission to use, copy, modify, and distribute this software and its | |
| # documentation for any purpose with or without fee is hereby granted, | |
| # provided that the above copyright notice and this permission notice | |
| # appear in all copies. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES | |
| # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
| # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR | |
| # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
| # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
| # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
| # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
| """DNS stub resolver.""" | |
| import contextlib | |
| import random | |
| import socket | |
| import sys | |
| import threading | |
| import time | |
| import warnings | |
| from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union | |
| from urllib.parse import urlparse | |
| import dns._ddr | |
| import dns.edns | |
| import dns.exception | |
| import dns.flags | |
| import dns.inet | |
| import dns.ipv4 | |
| import dns.ipv6 | |
| import dns.message | |
| import dns.name | |
| import dns.nameserver | |
| import dns.query | |
| import dns.rcode | |
| import dns.rdataclass | |
| import dns.rdatatype | |
| import dns.rdtypes.svcbbase | |
| import dns.reversename | |
| import dns.tsig | |
| if sys.platform == "win32": | |
| import dns.win32util | |
| class NXDOMAIN(dns.exception.DNSException): | |
| """The DNS query name does not exist.""" | |
| supp_kwargs = {"qnames", "responses"} | |
| fmt = None # we have our own __str__ implementation | |
| # pylint: disable=arguments-differ | |
| # We do this as otherwise mypy complains about unexpected keyword argument | |
| # idna_exception | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| def _check_kwargs(self, qnames, responses=None): | |
| if not isinstance(qnames, (list, tuple, set)): | |
| raise AttributeError("qnames must be a list, tuple or set") | |
| if len(qnames) == 0: | |
| raise AttributeError("qnames must contain at least one element") | |
| if responses is None: | |
| responses = {} | |
| elif not isinstance(responses, dict): | |
| raise AttributeError("responses must be a dict(qname=response)") | |
| kwargs = dict(qnames=qnames, responses=responses) | |
| return kwargs | |
| def __str__(self) -> str: | |
| if "qnames" not in self.kwargs: | |
| return super().__str__() | |
| qnames = self.kwargs["qnames"] | |
| if len(qnames) > 1: | |
| msg = "None of DNS query names exist" | |
| else: | |
| msg = "The DNS query name does not exist" | |
| qnames = ", ".join(map(str, qnames)) | |
| return "{}: {}".format(msg, qnames) | |
| def canonical_name(self): | |
| """Return the unresolved canonical name.""" | |
| if "qnames" not in self.kwargs: | |
| raise TypeError("parametrized exception required") | |
| for qname in self.kwargs["qnames"]: | |
| response = self.kwargs["responses"][qname] | |
| try: | |
| cname = response.canonical_name() | |
| if cname != qname: | |
| return cname | |
| except Exception: | |
| # We can just eat this exception as it means there was | |
| # something wrong with the response. | |
| pass | |
| return self.kwargs["qnames"][0] | |
| def __add__(self, e_nx): | |
| """Augment by results from another NXDOMAIN exception.""" | |
| qnames0 = list(self.kwargs.get("qnames", [])) | |
| responses0 = dict(self.kwargs.get("responses", {})) | |
| responses1 = e_nx.kwargs.get("responses", {}) | |
| for qname1 in e_nx.kwargs.get("qnames", []): | |
| if qname1 not in qnames0: | |
| qnames0.append(qname1) | |
| if qname1 in responses1: | |
| responses0[qname1] = responses1[qname1] | |
| return NXDOMAIN(qnames=qnames0, responses=responses0) | |
| def qnames(self): | |
| """All of the names that were tried. | |
| Returns a list of ``dns.name.Name``. | |
| """ | |
| return self.kwargs["qnames"] | |
| def responses(self): | |
| """A map from queried names to their NXDOMAIN responses. | |
| Returns a dict mapping a ``dns.name.Name`` to a | |
| ``dns.message.Message``. | |
| """ | |
| return self.kwargs["responses"] | |
| def response(self, qname): | |
| """The response for query *qname*. | |
| Returns a ``dns.message.Message``. | |
| """ | |
| return self.kwargs["responses"][qname] | |
| class YXDOMAIN(dns.exception.DNSException): | |
| """The DNS query name is too long after DNAME substitution.""" | |
| ErrorTuple = Tuple[ | |
| Optional[str], | |
| bool, | |
| int, | |
| Union[Exception, str], | |
| Optional[dns.message.Message], | |
| ] | |
| def _errors_to_text(errors: List[ErrorTuple]) -> List[str]: | |
| """Turn a resolution errors trace into a list of text.""" | |
| texts = [] | |
| for err in errors: | |
| texts.append("Server {} answered {}".format(err[0], err[3])) | |
| return texts | |
| class LifetimeTimeout(dns.exception.Timeout): | |
| """The resolution lifetime expired.""" | |
| msg = "The resolution lifetime expired." | |
| fmt = "%s after {timeout:.3f} seconds: {errors}" % msg[:-1] | |
| supp_kwargs = {"timeout", "errors"} | |
| # We do this as otherwise mypy complains about unexpected keyword argument | |
| # idna_exception | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| def _fmt_kwargs(self, **kwargs): | |
| srv_msgs = _errors_to_text(kwargs["errors"]) | |
| return super()._fmt_kwargs( | |
| timeout=kwargs["timeout"], errors="; ".join(srv_msgs) | |
| ) | |
| # We added more detail to resolution timeouts, but they are still | |
| # subclasses of dns.exception.Timeout for backwards compatibility. We also | |
| # keep dns.resolver.Timeout defined for backwards compatibility. | |
| Timeout = LifetimeTimeout | |
| class NoAnswer(dns.exception.DNSException): | |
| """The DNS response does not contain an answer to the question.""" | |
| fmt = "The DNS response does not contain an answer to the question: {query}" | |
| supp_kwargs = {"response"} | |
| # We do this as otherwise mypy complains about unexpected keyword argument | |
| # idna_exception | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| def _fmt_kwargs(self, **kwargs): | |
| return super()._fmt_kwargs(query=kwargs["response"].question) | |
| def response(self): | |
| return self.kwargs["response"] | |
| class NoNameservers(dns.exception.DNSException): | |
| """All nameservers failed to answer the query. | |
| errors: list of servers and respective errors | |
| The type of errors is | |
| [(server IP address, any object convertible to string)]. | |
| Non-empty errors list will add explanatory message () | |
| """ | |
| msg = "All nameservers failed to answer the query." | |
| fmt = "%s {query}: {errors}" % msg[:-1] | |
| supp_kwargs = {"request", "errors"} | |
| # We do this as otherwise mypy complains about unexpected keyword argument | |
| # idna_exception | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| def _fmt_kwargs(self, **kwargs): | |
| srv_msgs = _errors_to_text(kwargs["errors"]) | |
| return super()._fmt_kwargs( | |
| query=kwargs["request"].question, errors="; ".join(srv_msgs) | |
| ) | |
| class NotAbsolute(dns.exception.DNSException): | |
| """An absolute domain name is required but a relative name was provided.""" | |
| class NoRootSOA(dns.exception.DNSException): | |
| """There is no SOA RR at the DNS root name. This should never happen!""" | |
| class NoMetaqueries(dns.exception.DNSException): | |
| """DNS metaqueries are not allowed.""" | |
| class NoResolverConfiguration(dns.exception.DNSException): | |
| """Resolver configuration could not be read or specified no nameservers.""" | |
| class Answer: | |
| """DNS stub resolver answer. | |
| Instances of this class bundle up the result of a successful DNS | |
| resolution. | |
| For convenience, the answer object implements much of the sequence | |
| protocol, forwarding to its ``rrset`` attribute. E.g. | |
| ``for a in answer`` is equivalent to ``for a in answer.rrset``. | |
| ``answer[i]`` is equivalent to ``answer.rrset[i]``, and | |
| ``answer[i:j]`` is equivalent to ``answer.rrset[i:j]``. | |
| Note that CNAMEs or DNAMEs in the response may mean that answer | |
| RRset's name might not be the query name. | |
| """ | |
| def __init__( | |
| self, | |
| qname: dns.name.Name, | |
| rdtype: dns.rdatatype.RdataType, | |
| rdclass: dns.rdataclass.RdataClass, | |
| response: dns.message.QueryMessage, | |
| nameserver: Optional[str] = None, | |
| port: Optional[int] = None, | |
| ) -> None: | |
| self.qname = qname | |
| self.rdtype = rdtype | |
| self.rdclass = rdclass | |
| self.response = response | |
| self.nameserver = nameserver | |
| self.port = port | |
| self.chaining_result = response.resolve_chaining() | |
| # Copy some attributes out of chaining_result for backwards | |
| # compatibility and convenience. | |
| self.canonical_name = self.chaining_result.canonical_name | |
| self.rrset = self.chaining_result.answer | |
| self.expiration = time.time() + self.chaining_result.minimum_ttl | |
| def __getattr__(self, attr): # pragma: no cover | |
| if attr == "name": | |
| return self.rrset.name | |
| elif attr == "ttl": | |
| return self.rrset.ttl | |
| elif attr == "covers": | |
| return self.rrset.covers | |
| elif attr == "rdclass": | |
| return self.rrset.rdclass | |
| elif attr == "rdtype": | |
| return self.rrset.rdtype | |
| else: | |
| raise AttributeError(attr) | |
| def __len__(self) -> int: | |
| return self.rrset and len(self.rrset) or 0 | |
| def __iter__(self): | |
| return self.rrset and iter(self.rrset) or iter(tuple()) | |
| def __getitem__(self, i): | |
| if self.rrset is None: | |
| raise IndexError | |
| return self.rrset[i] | |
| def __delitem__(self, i): | |
| if self.rrset is None: | |
| raise IndexError | |
| del self.rrset[i] | |
| class Answers(dict): | |
| """A dict of DNS stub resolver answers, indexed by type.""" | |
| class HostAnswers(Answers): | |
| """A dict of DNS stub resolver answers to a host name lookup, indexed by | |
| type. | |
| """ | |
| def make( | |
| cls, | |
| v6: Optional[Answer] = None, | |
| v4: Optional[Answer] = None, | |
| add_empty: bool = True, | |
| ) -> "HostAnswers": | |
| answers = HostAnswers() | |
| if v6 is not None and (add_empty or v6.rrset): | |
| answers[dns.rdatatype.AAAA] = v6 | |
| if v4 is not None and (add_empty or v4.rrset): | |
| answers[dns.rdatatype.A] = v4 | |
| return answers | |
| # Returns pairs of (address, family) from this result, potentiallys | |
| # filtering by address family. | |
| def addresses_and_families( | |
| self, family: int = socket.AF_UNSPEC | |
| ) -> Iterator[Tuple[str, int]]: | |
| if family == socket.AF_UNSPEC: | |
| yield from self.addresses_and_families(socket.AF_INET6) | |
| yield from self.addresses_and_families(socket.AF_INET) | |
| return | |
| elif family == socket.AF_INET6: | |
| answer = self.get(dns.rdatatype.AAAA) | |
| elif family == socket.AF_INET: | |
| answer = self.get(dns.rdatatype.A) | |
| else: | |
| raise NotImplementedError(f"unknown address family {family}") | |
| if answer: | |
| for rdata in answer: | |
| yield (rdata.address, family) | |
| # Returns addresses from this result, potentially filtering by | |
| # address family. | |
| def addresses(self, family: int = socket.AF_UNSPEC) -> Iterator[str]: | |
| return (pair[0] for pair in self.addresses_and_families(family)) | |
| # Returns the canonical name from this result. | |
| def canonical_name(self) -> dns.name.Name: | |
| answer = self.get(dns.rdatatype.AAAA, self.get(dns.rdatatype.A)) | |
| return answer.canonical_name | |
| class CacheStatistics: | |
| """Cache Statistics""" | |
| def __init__(self, hits: int = 0, misses: int = 0) -> None: | |
| self.hits = hits | |
| self.misses = misses | |
| def reset(self) -> None: | |
| self.hits = 0 | |
| self.misses = 0 | |
| def clone(self) -> "CacheStatistics": | |
| return CacheStatistics(self.hits, self.misses) | |
| class CacheBase: | |
| def __init__(self) -> None: | |
| self.lock = threading.Lock() | |
| self.statistics = CacheStatistics() | |
| def reset_statistics(self) -> None: | |
| """Reset all statistics to zero.""" | |
| with self.lock: | |
| self.statistics.reset() | |
| def hits(self) -> int: | |
| """How many hits has the cache had?""" | |
| with self.lock: | |
| return self.statistics.hits | |
| def misses(self) -> int: | |
| """How many misses has the cache had?""" | |
| with self.lock: | |
| return self.statistics.misses | |
| def get_statistics_snapshot(self) -> CacheStatistics: | |
| """Return a consistent snapshot of all the statistics. | |
| If running with multiple threads, it's better to take a | |
| snapshot than to call statistics methods such as hits() and | |
| misses() individually. | |
| """ | |
| with self.lock: | |
| return self.statistics.clone() | |
| CacheKey = Tuple[dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass] | |
| class Cache(CacheBase): | |
| """Simple thread-safe DNS answer cache.""" | |
| def __init__(self, cleaning_interval: float = 300.0) -> None: | |
| """*cleaning_interval*, a ``float`` is the number of seconds between | |
| periodic cleanings. | |
| """ | |
| super().__init__() | |
| self.data: Dict[CacheKey, Answer] = {} | |
| self.cleaning_interval = cleaning_interval | |
| self.next_cleaning: float = time.time() + self.cleaning_interval | |
| def _maybe_clean(self) -> None: | |
| """Clean the cache if it's time to do so.""" | |
| now = time.time() | |
| if self.next_cleaning <= now: | |
| keys_to_delete = [] | |
| for k, v in self.data.items(): | |
| if v.expiration <= now: | |
| keys_to_delete.append(k) | |
| for k in keys_to_delete: | |
| del self.data[k] | |
| now = time.time() | |
| self.next_cleaning = now + self.cleaning_interval | |
| def get(self, key: CacheKey) -> Optional[Answer]: | |
| """Get the answer associated with *key*. | |
| Returns None if no answer is cached for the key. | |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
| tuple whose values are the query name, rdtype, and rdclass respectively. | |
| Returns a ``dns.resolver.Answer`` or ``None``. | |
| """ | |
| with self.lock: | |
| self._maybe_clean() | |
| v = self.data.get(key) | |
| if v is None or v.expiration <= time.time(): | |
| self.statistics.misses += 1 | |
| return None | |
| self.statistics.hits += 1 | |
| return v | |
| def put(self, key: CacheKey, value: Answer) -> None: | |
| """Associate key and value in the cache. | |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
| tuple whose values are the query name, rdtype, and rdclass respectively. | |
| *value*, a ``dns.resolver.Answer``, the answer. | |
| """ | |
| with self.lock: | |
| self._maybe_clean() | |
| self.data[key] = value | |
| def flush(self, key: Optional[CacheKey] = None) -> None: | |
| """Flush the cache. | |
| If *key* is not ``None``, only that item is flushed. Otherwise the entire cache | |
| is flushed. | |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
| tuple whose values are the query name, rdtype, and rdclass respectively. | |
| """ | |
| with self.lock: | |
| if key is not None: | |
| if key in self.data: | |
| del self.data[key] | |
| else: | |
| self.data = {} | |
| self.next_cleaning = time.time() + self.cleaning_interval | |
| class LRUCacheNode: | |
| """LRUCache node.""" | |
| def __init__(self, key, value): | |
| self.key = key | |
| self.value = value | |
| self.hits = 0 | |
| self.prev = self | |
| self.next = self | |
| def link_after(self, node: "LRUCacheNode") -> None: | |
| self.prev = node | |
| self.next = node.next | |
| node.next.prev = self | |
| node.next = self | |
| def unlink(self) -> None: | |
| self.next.prev = self.prev | |
| self.prev.next = self.next | |
| class LRUCache(CacheBase): | |
| """Thread-safe, bounded, least-recently-used DNS answer cache. | |
| This cache is better than the simple cache (above) if you're | |
| running a web crawler or other process that does a lot of | |
| resolutions. The LRUCache has a maximum number of nodes, and when | |
| it is full, the least-recently used node is removed to make space | |
| for a new one. | |
| """ | |
| def __init__(self, max_size: int = 100000) -> None: | |
| """*max_size*, an ``int``, is the maximum number of nodes to cache; | |
| it must be greater than 0. | |
| """ | |
| super().__init__() | |
| self.data: Dict[CacheKey, LRUCacheNode] = {} | |
| self.set_max_size(max_size) | |
| self.sentinel: LRUCacheNode = LRUCacheNode(None, None) | |
| self.sentinel.prev = self.sentinel | |
| self.sentinel.next = self.sentinel | |
| def set_max_size(self, max_size: int) -> None: | |
| if max_size < 1: | |
| max_size = 1 | |
| self.max_size = max_size | |
| def get(self, key: CacheKey) -> Optional[Answer]: | |
| """Get the answer associated with *key*. | |
| Returns None if no answer is cached for the key. | |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
| tuple whose values are the query name, rdtype, and rdclass respectively. | |
| Returns a ``dns.resolver.Answer`` or ``None``. | |
| """ | |
| with self.lock: | |
| node = self.data.get(key) | |
| if node is None: | |
| self.statistics.misses += 1 | |
| return None | |
| # Unlink because we're either going to move the node to the front | |
| # of the LRU list or we're going to free it. | |
| node.unlink() | |
| if node.value.expiration <= time.time(): | |
| del self.data[node.key] | |
| self.statistics.misses += 1 | |
| return None | |
| node.link_after(self.sentinel) | |
| self.statistics.hits += 1 | |
| node.hits += 1 | |
| return node.value | |
| def get_hits_for_key(self, key: CacheKey) -> int: | |
| """Return the number of cache hits associated with the specified key.""" | |
| with self.lock: | |
| node = self.data.get(key) | |
| if node is None or node.value.expiration <= time.time(): | |
| return 0 | |
| else: | |
| return node.hits | |
| def put(self, key: CacheKey, value: Answer) -> None: | |
| """Associate key and value in the cache. | |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
| tuple whose values are the query name, rdtype, and rdclass respectively. | |
| *value*, a ``dns.resolver.Answer``, the answer. | |
| """ | |
| with self.lock: | |
| node = self.data.get(key) | |
| if node is not None: | |
| node.unlink() | |
| del self.data[node.key] | |
| while len(self.data) >= self.max_size: | |
| gnode = self.sentinel.prev | |
| gnode.unlink() | |
| del self.data[gnode.key] | |
| node = LRUCacheNode(key, value) | |
| node.link_after(self.sentinel) | |
| self.data[key] = node | |
| def flush(self, key: Optional[CacheKey] = None) -> None: | |
| """Flush the cache. | |
| If *key* is not ``None``, only that item is flushed. Otherwise the entire cache | |
| is flushed. | |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
| tuple whose values are the query name, rdtype, and rdclass respectively. | |
| """ | |
| with self.lock: | |
| if key is not None: | |
| node = self.data.get(key) | |
| if node is not None: | |
| node.unlink() | |
| del self.data[node.key] | |
| else: | |
| gnode = self.sentinel.next | |
| while gnode != self.sentinel: | |
| next = gnode.next | |
| gnode.unlink() | |
| gnode = next | |
| self.data = {} | |
| class _Resolution: | |
| """Helper class for dns.resolver.Resolver.resolve(). | |
| All of the "business logic" of resolution is encapsulated in this | |
| class, allowing us to have multiple resolve() implementations | |
| using different I/O schemes without copying all of the | |
| complicated logic. | |
| This class is a "friend" to dns.resolver.Resolver and manipulates | |
| resolver data structures directly. | |
| """ | |
| def __init__( | |
| self, | |
| resolver: "BaseResolver", | |
| qname: Union[dns.name.Name, str], | |
| rdtype: Union[dns.rdatatype.RdataType, str], | |
| rdclass: Union[dns.rdataclass.RdataClass, str], | |
| tcp: bool, | |
| raise_on_no_answer: bool, | |
| search: Optional[bool], | |
| ) -> None: | |
| if isinstance(qname, str): | |
| qname = dns.name.from_text(qname, None) | |
| rdtype = dns.rdatatype.RdataType.make(rdtype) | |
| if dns.rdatatype.is_metatype(rdtype): | |
| raise NoMetaqueries | |
| rdclass = dns.rdataclass.RdataClass.make(rdclass) | |
| if dns.rdataclass.is_metaclass(rdclass): | |
| raise NoMetaqueries | |
| self.resolver = resolver | |
| self.qnames_to_try = resolver._get_qnames_to_try(qname, search) | |
| self.qnames = self.qnames_to_try[:] | |
| self.rdtype = rdtype | |
| self.rdclass = rdclass | |
| self.tcp = tcp | |
| self.raise_on_no_answer = raise_on_no_answer | |
| self.nxdomain_responses: Dict[dns.name.Name, dns.message.QueryMessage] = {} | |
| # Initialize other things to help analysis tools | |
| self.qname = dns.name.empty | |
| self.nameservers: List[dns.nameserver.Nameserver] = [] | |
| self.current_nameservers: List[dns.nameserver.Nameserver] = [] | |
| self.errors: List[ErrorTuple] = [] | |
| self.nameserver: Optional[dns.nameserver.Nameserver] = None | |
| self.tcp_attempt = False | |
| self.retry_with_tcp = False | |
| self.request: Optional[dns.message.QueryMessage] = None | |
| self.backoff = 0.0 | |
| def next_request( | |
| self, | |
| ) -> Tuple[Optional[dns.message.QueryMessage], Optional[Answer]]: | |
| """Get the next request to send, and check the cache. | |
| Returns a (request, answer) tuple. At most one of request or | |
| answer will not be None. | |
| """ | |
| # We return a tuple instead of Union[Message,Answer] as it lets | |
| # the caller avoid isinstance(). | |
| while len(self.qnames) > 0: | |
| self.qname = self.qnames.pop(0) | |
| # Do we know the answer? | |
| if self.resolver.cache: | |
| answer = self.resolver.cache.get( | |
| (self.qname, self.rdtype, self.rdclass) | |
| ) | |
| if answer is not None: | |
| if answer.rrset is None and self.raise_on_no_answer: | |
| raise NoAnswer(response=answer.response) | |
| else: | |
| return (None, answer) | |
| answer = self.resolver.cache.get( | |
| (self.qname, dns.rdatatype.ANY, self.rdclass) | |
| ) | |
| if answer is not None and answer.response.rcode() == dns.rcode.NXDOMAIN: | |
| # cached NXDOMAIN; record it and continue to next | |
| # name. | |
| self.nxdomain_responses[self.qname] = answer.response | |
| continue | |
| # Build the request | |
| request = dns.message.make_query(self.qname, self.rdtype, self.rdclass) | |
| if self.resolver.keyname is not None: | |
| request.use_tsig( | |
| self.resolver.keyring, | |
| self.resolver.keyname, | |
| algorithm=self.resolver.keyalgorithm, | |
| ) | |
| request.use_edns( | |
| self.resolver.edns, | |
| self.resolver.ednsflags, | |
| self.resolver.payload, | |
| options=self.resolver.ednsoptions, | |
| ) | |
| if self.resolver.flags is not None: | |
| request.flags = self.resolver.flags | |
| self.nameservers = self.resolver._enrich_nameservers( | |
| self.resolver._nameservers, | |
| self.resolver.nameserver_ports, | |
| self.resolver.port, | |
| ) | |
| if self.resolver.rotate: | |
| random.shuffle(self.nameservers) | |
| self.current_nameservers = self.nameservers[:] | |
| self.errors = [] | |
| self.nameserver = None | |
| self.tcp_attempt = False | |
| self.retry_with_tcp = False | |
| self.request = request | |
| self.backoff = 0.10 | |
| return (request, None) | |
| # | |
| # We've tried everything and only gotten NXDOMAINs. (We know | |
| # it's only NXDOMAINs as anything else would have returned | |
| # before now.) | |
| # | |
| raise NXDOMAIN(qnames=self.qnames_to_try, responses=self.nxdomain_responses) | |
| def next_nameserver(self) -> Tuple[dns.nameserver.Nameserver, bool, float]: | |
| if self.retry_with_tcp: | |
| assert self.nameserver is not None | |
| assert not self.nameserver.is_always_max_size() | |
| self.tcp_attempt = True | |
| self.retry_with_tcp = False | |
| return (self.nameserver, True, 0) | |
| backoff = 0.0 | |
| if not self.current_nameservers: | |
| if len(self.nameservers) == 0: | |
| # Out of things to try! | |
| raise NoNameservers(request=self.request, errors=self.errors) | |
| self.current_nameservers = self.nameservers[:] | |
| backoff = self.backoff | |
| self.backoff = min(self.backoff * 2, 2) | |
| self.nameserver = self.current_nameservers.pop(0) | |
| self.tcp_attempt = self.tcp or self.nameserver.is_always_max_size() | |
| return (self.nameserver, self.tcp_attempt, backoff) | |
| def query_result( | |
| self, response: Optional[dns.message.Message], ex: Optional[Exception] | |
| ) -> Tuple[Optional[Answer], bool]: | |
| # | |
| # returns an (answer: Answer, end_loop: bool) tuple. | |
| # | |
| assert self.nameserver is not None | |
| if ex: | |
| # Exception during I/O or from_wire() | |
| assert response is None | |
| self.errors.append( | |
| ( | |
| str(self.nameserver), | |
| self.tcp_attempt, | |
| self.nameserver.answer_port(), | |
| ex, | |
| response, | |
| ) | |
| ) | |
| if ( | |
| isinstance(ex, dns.exception.FormError) | |
| or isinstance(ex, EOFError) | |
| or isinstance(ex, OSError) | |
| or isinstance(ex, NotImplementedError) | |
| ): | |
| # This nameserver is no good, take it out of the mix. | |
| self.nameservers.remove(self.nameserver) | |
| elif isinstance(ex, dns.message.Truncated): | |
| if self.tcp_attempt: | |
| # Truncation with TCP is no good! | |
| self.nameservers.remove(self.nameserver) | |
| else: | |
| self.retry_with_tcp = True | |
| return (None, False) | |
| # We got an answer! | |
| assert response is not None | |
| assert isinstance(response, dns.message.QueryMessage) | |
| rcode = response.rcode() | |
| if rcode == dns.rcode.NOERROR: | |
| try: | |
| answer = Answer( | |
| self.qname, | |
| self.rdtype, | |
| self.rdclass, | |
| response, | |
| self.nameserver.answer_nameserver(), | |
| self.nameserver.answer_port(), | |
| ) | |
| except Exception as e: | |
| self.errors.append( | |
| ( | |
| str(self.nameserver), | |
| self.tcp_attempt, | |
| self.nameserver.answer_port(), | |
| e, | |
| response, | |
| ) | |
| ) | |
| # The nameserver is no good, take it out of the mix. | |
| self.nameservers.remove(self.nameserver) | |
| return (None, False) | |
| if self.resolver.cache: | |
| self.resolver.cache.put((self.qname, self.rdtype, self.rdclass), answer) | |
| if answer.rrset is None and self.raise_on_no_answer: | |
| raise NoAnswer(response=answer.response) | |
| return (answer, True) | |
| elif rcode == dns.rcode.NXDOMAIN: | |
| # Further validate the response by making an Answer, even | |
| # if we aren't going to cache it. | |
| try: | |
| answer = Answer( | |
| self.qname, dns.rdatatype.ANY, dns.rdataclass.IN, response | |
| ) | |
| except Exception as e: | |
| self.errors.append( | |
| ( | |
| str(self.nameserver), | |
| self.tcp_attempt, | |
| self.nameserver.answer_port(), | |
| e, | |
| response, | |
| ) | |
| ) | |
| # The nameserver is no good, take it out of the mix. | |
| self.nameservers.remove(self.nameserver) | |
| return (None, False) | |
| self.nxdomain_responses[self.qname] = response | |
| if self.resolver.cache: | |
| self.resolver.cache.put( | |
| (self.qname, dns.rdatatype.ANY, self.rdclass), answer | |
| ) | |
| # Make next_nameserver() return None, so caller breaks its | |
| # inner loop and calls next_request(). | |
| return (None, True) | |
| elif rcode == dns.rcode.YXDOMAIN: | |
| yex = YXDOMAIN() | |
| self.errors.append( | |
| ( | |
| str(self.nameserver), | |
| self.tcp_attempt, | |
| self.nameserver.answer_port(), | |
| yex, | |
| response, | |
| ) | |
| ) | |
| raise yex | |
| else: | |
| # | |
| # We got a response, but we're not happy with the | |
| # rcode in it. | |
| # | |
| if rcode != dns.rcode.SERVFAIL or not self.resolver.retry_servfail: | |
| self.nameservers.remove(self.nameserver) | |
| self.errors.append( | |
| ( | |
| str(self.nameserver), | |
| self.tcp_attempt, | |
| self.nameserver.answer_port(), | |
| dns.rcode.to_text(rcode), | |
| response, | |
| ) | |
| ) | |
| return (None, False) | |
| class BaseResolver: | |
| """DNS stub resolver.""" | |
| # We initialize in reset() | |
| # | |
| # pylint: disable=attribute-defined-outside-init | |
| domain: dns.name.Name | |
| nameserver_ports: Dict[str, int] | |
| port: int | |
| search: List[dns.name.Name] | |
| use_search_by_default: bool | |
| timeout: float | |
| lifetime: float | |
| keyring: Optional[Any] | |
| keyname: Optional[Union[dns.name.Name, str]] | |
| keyalgorithm: Union[dns.name.Name, str] | |
| edns: int | |
| ednsflags: int | |
| ednsoptions: Optional[List[dns.edns.Option]] | |
| payload: int | |
| cache: Any | |
| flags: Optional[int] | |
| retry_servfail: bool | |
| rotate: bool | |
| ndots: Optional[int] | |
| _nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] | |
| def __init__( | |
| self, filename: str = "/etc/resolv.conf", configure: bool = True | |
| ) -> None: | |
| """*filename*, a ``str`` or file object, specifying a file | |
| in standard /etc/resolv.conf format. This parameter is meaningful | |
| only when *configure* is true and the platform is POSIX. | |
| *configure*, a ``bool``. If True (the default), the resolver | |
| instance is configured in the normal fashion for the operating | |
| system the resolver is running on. (I.e. by reading a | |
| /etc/resolv.conf file on POSIX systems and from the registry | |
| on Windows systems.) | |
| """ | |
| self.reset() | |
| if configure: | |
| if sys.platform == "win32": | |
| self.read_registry() | |
| elif filename: | |
| self.read_resolv_conf(filename) | |
| def reset(self) -> None: | |
| """Reset all resolver configuration to the defaults.""" | |
| self.domain = dns.name.Name(dns.name.from_text(socket.gethostname())[1:]) | |
| if len(self.domain) == 0: | |
| self.domain = dns.name.root | |
| self._nameservers = [] | |
| self.nameserver_ports = {} | |
| self.port = 53 | |
| self.search = [] | |
| self.use_search_by_default = False | |
| self.timeout = 2.0 | |
| self.lifetime = 5.0 | |
| self.keyring = None | |
| self.keyname = None | |
| self.keyalgorithm = dns.tsig.default_algorithm | |
| self.edns = -1 | |
| self.ednsflags = 0 | |
| self.ednsoptions = None | |
| self.payload = 0 | |
| self.cache = None | |
| self.flags = None | |
| self.retry_servfail = False | |
| self.rotate = False | |
| self.ndots = None | |
| def read_resolv_conf(self, f: Any) -> None: | |
| """Process *f* as a file in the /etc/resolv.conf format. If f is | |
| a ``str``, it is used as the name of the file to open; otherwise it | |
| is treated as the file itself. | |
| Interprets the following items: | |
| - nameserver - name server IP address | |
| - domain - local domain name | |
| - search - search list for host-name lookup | |
| - options - supported options are rotate, timeout, edns0, and ndots | |
| """ | |
| nameservers = [] | |
| if isinstance(f, str): | |
| try: | |
| cm: contextlib.AbstractContextManager = open(f) | |
| except OSError: | |
| # /etc/resolv.conf doesn't exist, can't be read, etc. | |
| raise NoResolverConfiguration(f"cannot open {f}") | |
| else: | |
| cm = contextlib.nullcontext(f) | |
| with cm as f: | |
| for l in f: | |
| if len(l) == 0 or l[0] == "#" or l[0] == ";": | |
| continue | |
| tokens = l.split() | |
| # Any line containing less than 2 tokens is malformed | |
| if len(tokens) < 2: | |
| continue | |
| if tokens[0] == "nameserver": | |
| nameservers.append(tokens[1]) | |
| elif tokens[0] == "domain": | |
| self.domain = dns.name.from_text(tokens[1]) | |
| # domain and search are exclusive | |
| self.search = [] | |
| elif tokens[0] == "search": | |
| # the last search wins | |
| self.search = [] | |
| for suffix in tokens[1:]: | |
| self.search.append(dns.name.from_text(suffix)) | |
| # We don't set domain as it is not used if | |
| # len(self.search) > 0 | |
| elif tokens[0] == "options": | |
| for opt in tokens[1:]: | |
| if opt == "rotate": | |
| self.rotate = True | |
| elif opt == "edns0": | |
| self.use_edns() | |
| elif "timeout" in opt: | |
| try: | |
| self.timeout = int(opt.split(":")[1]) | |
| except (ValueError, IndexError): | |
| pass | |
| elif "ndots" in opt: | |
| try: | |
| self.ndots = int(opt.split(":")[1]) | |
| except (ValueError, IndexError): | |
| pass | |
| if len(nameservers) == 0: | |
| raise NoResolverConfiguration("no nameservers") | |
| # Assigning directly instead of appending means we invoke the | |
| # setter logic, with additonal checking and enrichment. | |
| self.nameservers = nameservers | |
| def read_registry(self) -> None: | |
| """Extract resolver configuration from the Windows registry.""" | |
| try: | |
| info = dns.win32util.get_dns_info() # type: ignore | |
| if info.domain is not None: | |
| self.domain = info.domain | |
| self.nameservers = info.nameservers | |
| self.search = info.search | |
| except AttributeError: | |
| raise NotImplementedError | |
| def _compute_timeout( | |
| self, | |
| start: float, | |
| lifetime: Optional[float] = None, | |
| errors: Optional[List[ErrorTuple]] = None, | |
| ) -> float: | |
| lifetime = self.lifetime if lifetime is None else lifetime | |
| now = time.time() | |
| duration = now - start | |
| if errors is None: | |
| errors = [] | |
| if duration < 0: | |
| if duration < -1: | |
| # Time going backwards is bad. Just give up. | |
| raise LifetimeTimeout(timeout=duration, errors=errors) | |
| else: | |
| # Time went backwards, but only a little. This can | |
| # happen, e.g. under vmware with older linux kernels. | |
| # Pretend it didn't happen. | |
| duration = 0 | |
| if duration >= lifetime: | |
| raise LifetimeTimeout(timeout=duration, errors=errors) | |
| return min(lifetime - duration, self.timeout) | |
| def _get_qnames_to_try( | |
| self, qname: dns.name.Name, search: Optional[bool] | |
| ) -> List[dns.name.Name]: | |
| # This is a separate method so we can unit test the search | |
| # rules without requiring the Internet. | |
| if search is None: | |
| search = self.use_search_by_default | |
| qnames_to_try = [] | |
| if qname.is_absolute(): | |
| qnames_to_try.append(qname) | |
| else: | |
| abs_qname = qname.concatenate(dns.name.root) | |
| if search: | |
| if len(self.search) > 0: | |
| # There is a search list, so use it exclusively | |
| search_list = self.search[:] | |
| elif self.domain != dns.name.root and self.domain is not None: | |
| # We have some notion of a domain that isn't the root, so | |
| # use it as the search list. | |
| search_list = [self.domain] | |
| else: | |
| search_list = [] | |
| # Figure out the effective ndots (default is 1) | |
| if self.ndots is None: | |
| ndots = 1 | |
| else: | |
| ndots = self.ndots | |
| for suffix in search_list: | |
| qnames_to_try.append(qname + suffix) | |
| if len(qname) > ndots: | |
| # The name has at least ndots dots, so we should try an | |
| # absolute query first. | |
| qnames_to_try.insert(0, abs_qname) | |
| else: | |
| # The name has less than ndots dots, so we should search | |
| # first, then try the absolute name. | |
| qnames_to_try.append(abs_qname) | |
| else: | |
| qnames_to_try.append(abs_qname) | |
| return qnames_to_try | |
| def use_tsig( | |
| self, | |
| keyring: Any, | |
| keyname: Optional[Union[dns.name.Name, str]] = None, | |
| algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, | |
| ) -> None: | |
| """Add a TSIG signature to each query. | |
| The parameters are passed to ``dns.message.Message.use_tsig()``; | |
| see its documentation for details. | |
| """ | |
| self.keyring = keyring | |
| self.keyname = keyname | |
| self.keyalgorithm = algorithm | |
| def use_edns( | |
| self, | |
| edns: Optional[Union[int, bool]] = 0, | |
| ednsflags: int = 0, | |
| payload: int = dns.message.DEFAULT_EDNS_PAYLOAD, | |
| options: Optional[List[dns.edns.Option]] = None, | |
| ) -> None: | |
| """Configure EDNS behavior. | |
| *edns*, an ``int``, is the EDNS level to use. Specifying | |
| ``None``, ``False``, or ``-1`` means "do not use EDNS", and in this case | |
| the other parameters are ignored. Specifying ``True`` is | |
| equivalent to specifying 0, i.e. "use EDNS0". | |
| *ednsflags*, an ``int``, the EDNS flag values. | |
| *payload*, an ``int``, is the EDNS sender's payload field, which is the | |
| maximum size of UDP datagram the sender can handle. I.e. how big | |
| a response to this message can be. | |
| *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS | |
| options. | |
| """ | |
| if edns is None or edns is False: | |
| edns = -1 | |
| elif edns is True: | |
| edns = 0 | |
| self.edns = edns | |
| self.ednsflags = ednsflags | |
| self.payload = payload | |
| self.ednsoptions = options | |
| def set_flags(self, flags: int) -> None: | |
| """Overrides the default flags with your own. | |
| *flags*, an ``int``, the message flags to use. | |
| """ | |
| self.flags = flags | |
| def _enrich_nameservers( | |
| cls, | |
| nameservers: Sequence[Union[str, dns.nameserver.Nameserver]], | |
| nameserver_ports: Dict[str, int], | |
| default_port: int, | |
| ) -> List[dns.nameserver.Nameserver]: | |
| enriched_nameservers = [] | |
| if isinstance(nameservers, list): | |
| for nameserver in nameservers: | |
| enriched_nameserver: dns.nameserver.Nameserver | |
| if isinstance(nameserver, dns.nameserver.Nameserver): | |
| enriched_nameserver = nameserver | |
| elif dns.inet.is_address(nameserver): | |
| port = nameserver_ports.get(nameserver, default_port) | |
| enriched_nameserver = dns.nameserver.Do53Nameserver( | |
| nameserver, port | |
| ) | |
| else: | |
| try: | |
| if urlparse(nameserver).scheme != "https": | |
| raise NotImplementedError | |
| except Exception: | |
| raise ValueError( | |
| f"nameserver {nameserver} is not a " | |
| "dns.nameserver.Nameserver instance or text form, " | |
| "IP address, nor a valid https URL" | |
| ) | |
| enriched_nameserver = dns.nameserver.DoHNameserver(nameserver) | |
| enriched_nameservers.append(enriched_nameserver) | |
| else: | |
| raise ValueError( | |
| "nameservers must be a list or tuple (not a {})".format( | |
| type(nameservers) | |
| ) | |
| ) | |
| return enriched_nameservers | |
| def nameservers( | |
| self, | |
| ) -> Sequence[Union[str, dns.nameserver.Nameserver]]: | |
| return self._nameservers | |
| def nameservers( | |
| self, nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] | |
| ) -> None: | |
| """ | |
| *nameservers*, a ``list`` of nameservers, where a nameserver is either | |
| a string interpretable as a nameserver, or a ``dns.nameserver.Nameserver`` | |
| instance. | |
| Raises ``ValueError`` if *nameservers* is not a list of nameservers. | |
| """ | |
| # We just call _enrich_nameservers() for checking | |
| self._enrich_nameservers(nameservers, self.nameserver_ports, self.port) | |
| self._nameservers = nameservers | |
| class Resolver(BaseResolver): | |
| """DNS stub resolver.""" | |
| def resolve( | |
| self, | |
| qname: Union[dns.name.Name, str], | |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| source: Optional[str] = None, | |
| raise_on_no_answer: bool = True, | |
| source_port: int = 0, | |
| lifetime: Optional[float] = None, | |
| search: Optional[bool] = None, | |
| ) -> Answer: # pylint: disable=arguments-differ | |
| """Query nameservers to find the answer to the question. | |
| The *qname*, *rdtype*, and *rdclass* parameters may be objects | |
| of the appropriate type, or strings that can be converted into objects | |
| of the appropriate type. | |
| *qname*, a ``dns.name.Name`` or ``str``, the query name. | |
| *rdtype*, an ``int`` or ``str``, the query type. | |
| *rdclass*, an ``int`` or ``str``, the query class. | |
| *tcp*, a ``bool``. If ``True``, use TCP to make the query. | |
| *source*, a ``str`` or ``None``. If not ``None``, bind to this IP | |
| address when making queries. | |
| *raise_on_no_answer*, a ``bool``. If ``True``, raise | |
| ``dns.resolver.NoAnswer`` if there's no answer to the question. | |
| *source_port*, an ``int``, the port from which to send the message. | |
| *lifetime*, a ``float``, how many seconds a query should run | |
| before timing out. | |
| *search*, a ``bool`` or ``None``, determines whether the | |
| search list configured in the system's resolver configuration | |
| are used for relative names, and whether the resolver's domain | |
| may be added to relative names. The default is ``None``, | |
| which causes the value of the resolver's | |
| ``use_search_by_default`` attribute to be used. | |
| Raises ``dns.resolver.LifetimeTimeout`` if no answers could be found | |
| in the specified lifetime. | |
| Raises ``dns.resolver.NXDOMAIN`` if the query name does not exist. | |
| Raises ``dns.resolver.YXDOMAIN`` if the query name is too long after | |
| DNAME substitution. | |
| Raises ``dns.resolver.NoAnswer`` if *raise_on_no_answer* is | |
| ``True`` and the query name exists but has no RRset of the | |
| desired type and class. | |
| Raises ``dns.resolver.NoNameservers`` if no non-broken | |
| nameservers are available to answer the question. | |
| Returns a ``dns.resolver.Answer`` instance. | |
| """ | |
| resolution = _Resolution( | |
| self, qname, rdtype, rdclass, tcp, raise_on_no_answer, search | |
| ) | |
| start = time.time() | |
| while True: | |
| (request, answer) = resolution.next_request() | |
| # Note we need to say "if answer is not None" and not just | |
| # "if answer" because answer implements __len__, and python | |
| # will call that. We want to return if we have an answer | |
| # object, including in cases where its length is 0. | |
| if answer is not None: | |
| # cache hit! | |
| return answer | |
| assert request is not None # needed for type checking | |
| done = False | |
| while not done: | |
| (nameserver, tcp, backoff) = resolution.next_nameserver() | |
| if backoff: | |
| time.sleep(backoff) | |
| timeout = self._compute_timeout(start, lifetime, resolution.errors) | |
| try: | |
| response = nameserver.query( | |
| request, | |
| timeout=timeout, | |
| source=source, | |
| source_port=source_port, | |
| max_size=tcp, | |
| ) | |
| except Exception as ex: | |
| (_, done) = resolution.query_result(None, ex) | |
| continue | |
| (answer, done) = resolution.query_result(response, None) | |
| # Note we need to say "if answer is not None" and not just | |
| # "if answer" because answer implements __len__, and python | |
| # will call that. We want to return if we have an answer | |
| # object, including in cases where its length is 0. | |
| if answer is not None: | |
| return answer | |
| def query( | |
| self, | |
| qname: Union[dns.name.Name, str], | |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| source: Optional[str] = None, | |
| raise_on_no_answer: bool = True, | |
| source_port: int = 0, | |
| lifetime: Optional[float] = None, | |
| ) -> Answer: # pragma: no cover | |
| """Query nameservers to find the answer to the question. | |
| This method calls resolve() with ``search=True``, and is | |
| provided for backwards compatibility with prior versions of | |
| dnspython. See the documentation for the resolve() method for | |
| further details. | |
| """ | |
| warnings.warn( | |
| "please use dns.resolver.Resolver.resolve() instead", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| return self.resolve( | |
| qname, | |
| rdtype, | |
| rdclass, | |
| tcp, | |
| source, | |
| raise_on_no_answer, | |
| source_port, | |
| lifetime, | |
| True, | |
| ) | |
| def resolve_address(self, ipaddr: str, *args: Any, **kwargs: Any) -> Answer: | |
| """Use a resolver to run a reverse query for PTR records. | |
| This utilizes the resolve() method to perform a PTR lookup on the | |
| specified IP address. | |
| *ipaddr*, a ``str``, the IPv4 or IPv6 address you want to get | |
| the PTR record for. | |
| All other arguments that can be passed to the resolve() function | |
| except for rdtype and rdclass are also supported by this | |
| function. | |
| """ | |
| # We make a modified kwargs for type checking happiness, as otherwise | |
| # we get a legit warning about possibly having rdtype and rdclass | |
| # in the kwargs more than once. | |
| modified_kwargs: Dict[str, Any] = {} | |
| modified_kwargs.update(kwargs) | |
| modified_kwargs["rdtype"] = dns.rdatatype.PTR | |
| modified_kwargs["rdclass"] = dns.rdataclass.IN | |
| return self.resolve( | |
| dns.reversename.from_address(ipaddr), *args, **modified_kwargs | |
| ) | |
| def resolve_name( | |
| self, | |
| name: Union[dns.name.Name, str], | |
| family: int = socket.AF_UNSPEC, | |
| **kwargs: Any, | |
| ) -> HostAnswers: | |
| """Use a resolver to query for address records. | |
| This utilizes the resolve() method to perform A and/or AAAA lookups on | |
| the specified name. | |
| *qname*, a ``dns.name.Name`` or ``str``, the name to resolve. | |
| *family*, an ``int``, the address family. If socket.AF_UNSPEC | |
| (the default), both A and AAAA records will be retrieved. | |
| All other arguments that can be passed to the resolve() function | |
| except for rdtype and rdclass are also supported by this | |
| function. | |
| """ | |
| # We make a modified kwargs for type checking happiness, as otherwise | |
| # we get a legit warning about possibly having rdtype and rdclass | |
| # in the kwargs more than once. | |
| modified_kwargs: Dict[str, Any] = {} | |
| modified_kwargs.update(kwargs) | |
| modified_kwargs.pop("rdtype", None) | |
| modified_kwargs["rdclass"] = dns.rdataclass.IN | |
| if family == socket.AF_INET: | |
| v4 = self.resolve(name, dns.rdatatype.A, **modified_kwargs) | |
| return HostAnswers.make(v4=v4) | |
| elif family == socket.AF_INET6: | |
| v6 = self.resolve(name, dns.rdatatype.AAAA, **modified_kwargs) | |
| return HostAnswers.make(v6=v6) | |
| elif family != socket.AF_UNSPEC: | |
| raise NotImplementedError(f"unknown address family {family}") | |
| raise_on_no_answer = modified_kwargs.pop("raise_on_no_answer", True) | |
| lifetime = modified_kwargs.pop("lifetime", None) | |
| start = time.time() | |
| v6 = self.resolve( | |
| name, | |
| dns.rdatatype.AAAA, | |
| raise_on_no_answer=False, | |
| lifetime=self._compute_timeout(start, lifetime), | |
| **modified_kwargs, | |
| ) | |
| # Note that setting name ensures we query the same name | |
| # for A as we did for AAAA. (This is just in case search lists | |
| # are active by default in the resolver configuration and | |
| # we might be talking to a server that says NXDOMAIN when it | |
| # wants to say NOERROR no data. | |
| name = v6.qname | |
| v4 = self.resolve( | |
| name, | |
| dns.rdatatype.A, | |
| raise_on_no_answer=False, | |
| lifetime=self._compute_timeout(start, lifetime), | |
| **modified_kwargs, | |
| ) | |
| answers = HostAnswers.make(v6=v6, v4=v4, add_empty=not raise_on_no_answer) | |
| if not answers: | |
| raise NoAnswer(response=v6.response) | |
| return answers | |
| # pylint: disable=redefined-outer-name | |
| def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name: | |
| """Determine the canonical name of *name*. | |
| The canonical name is the name the resolver uses for queries | |
| after all CNAME and DNAME renamings have been applied. | |
| *name*, a ``dns.name.Name`` or ``str``, the query name. | |
| This method can raise any exception that ``resolve()`` can | |
| raise, other than ``dns.resolver.NoAnswer`` and | |
| ``dns.resolver.NXDOMAIN``. | |
| Returns a ``dns.name.Name``. | |
| """ | |
| try: | |
| answer = self.resolve(name, raise_on_no_answer=False) | |
| canonical_name = answer.canonical_name | |
| except dns.resolver.NXDOMAIN as e: | |
| canonical_name = e.canonical_name | |
| return canonical_name | |
| # pylint: enable=redefined-outer-name | |
| def try_ddr(self, lifetime: float = 5.0) -> None: | |
| """Try to update the resolver's nameservers using Discovery of Designated | |
| Resolvers (DDR). If successful, the resolver will subsequently use | |
| DNS-over-HTTPS or DNS-over-TLS for future queries. | |
| *lifetime*, a float, is the maximum time to spend attempting DDR. The default | |
| is 5 seconds. | |
| If the SVCB query is successful and results in a non-empty list of nameservers, | |
| then the resolver's nameservers are set to the returned servers in priority | |
| order. | |
| The current implementation does not use any address hints from the SVCB record, | |
| nor does it resolve addresses for the SCVB target name, rather it assumes that | |
| the bootstrap nameserver will always be one of the addresses and uses it. | |
| A future revision to the code may offer fuller support. The code verifies that | |
| the bootstrap nameserver is in the Subject Alternative Name field of the | |
| TLS certficate. | |
| """ | |
| try: | |
| expiration = time.time() + lifetime | |
| answer = self.resolve( | |
| dns._ddr._local_resolver_name, "SVCB", lifetime=lifetime | |
| ) | |
| timeout = dns.query._remaining(expiration) | |
| nameservers = dns._ddr._get_nameservers_sync(answer, timeout) | |
| if len(nameservers) > 0: | |
| self.nameservers = nameservers | |
| except Exception: | |
| pass | |
| #: The default resolver. | |
| default_resolver: Optional[Resolver] = None | |
| def get_default_resolver() -> Resolver: | |
| """Get the default resolver, initializing it if necessary.""" | |
| if default_resolver is None: | |
| reset_default_resolver() | |
| assert default_resolver is not None | |
| return default_resolver | |
| def reset_default_resolver() -> None: | |
| """Re-initialize default resolver. | |
| Note that the resolver configuration (i.e. /etc/resolv.conf on UNIX | |
| systems) will be re-read immediately. | |
| """ | |
| global default_resolver | |
| default_resolver = Resolver() | |
| def resolve( | |
| qname: Union[dns.name.Name, str], | |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| source: Optional[str] = None, | |
| raise_on_no_answer: bool = True, | |
| source_port: int = 0, | |
| lifetime: Optional[float] = None, | |
| search: Optional[bool] = None, | |
| ) -> Answer: # pragma: no cover | |
| """Query nameservers to find the answer to the question. | |
| This is a convenience function that uses the default resolver | |
| object to make the query. | |
| See ``dns.resolver.Resolver.resolve`` for more information on the | |
| parameters. | |
| """ | |
| return get_default_resolver().resolve( | |
| qname, | |
| rdtype, | |
| rdclass, | |
| tcp, | |
| source, | |
| raise_on_no_answer, | |
| source_port, | |
| lifetime, | |
| search, | |
| ) | |
| def query( | |
| qname: Union[dns.name.Name, str], | |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| source: Optional[str] = None, | |
| raise_on_no_answer: bool = True, | |
| source_port: int = 0, | |
| lifetime: Optional[float] = None, | |
| ) -> Answer: # pragma: no cover | |
| """Query nameservers to find the answer to the question. | |
| This method calls resolve() with ``search=True``, and is | |
| provided for backwards compatibility with prior versions of | |
| dnspython. See the documentation for the resolve() method for | |
| further details. | |
| """ | |
| warnings.warn( | |
| "please use dns.resolver.resolve() instead", DeprecationWarning, stacklevel=2 | |
| ) | |
| return resolve( | |
| qname, | |
| rdtype, | |
| rdclass, | |
| tcp, | |
| source, | |
| raise_on_no_answer, | |
| source_port, | |
| lifetime, | |
| True, | |
| ) | |
| def resolve_address(ipaddr: str, *args: Any, **kwargs: Any) -> Answer: | |
| """Use a resolver to run a reverse query for PTR records. | |
| See ``dns.resolver.Resolver.resolve_address`` for more information on the | |
| parameters. | |
| """ | |
| return get_default_resolver().resolve_address(ipaddr, *args, **kwargs) | |
| def resolve_name( | |
| name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any | |
| ) -> HostAnswers: | |
| """Use a resolver to query for address records. | |
| See ``dns.resolver.Resolver.resolve_name`` for more information on the | |
| parameters. | |
| """ | |
| return get_default_resolver().resolve_name(name, family, **kwargs) | |
| def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name: | |
| """Determine the canonical name of *name*. | |
| See ``dns.resolver.Resolver.canonical_name`` for more information on the | |
| parameters and possible exceptions. | |
| """ | |
| return get_default_resolver().canonical_name(name) | |
| def try_ddr(lifetime: float = 5.0) -> None: | |
| """Try to update the default resolver's nameservers using Discovery of Designated | |
| Resolvers (DDR). If successful, the resolver will subsequently use | |
| DNS-over-HTTPS or DNS-over-TLS for future queries. | |
| See :py:func:`dns.resolver.Resolver.try_ddr` for more information. | |
| """ | |
| return get_default_resolver().try_ddr(lifetime) | |
| def zone_for_name( | |
| name: Union[dns.name.Name, str], | |
| rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| resolver: Optional[Resolver] = None, | |
| lifetime: Optional[float] = None, | |
| ) -> dns.name.Name: | |
| """Find the name of the zone which contains the specified name. | |
| *name*, an absolute ``dns.name.Name`` or ``str``, the query name. | |
| *rdclass*, an ``int``, the query class. | |
| *tcp*, a ``bool``. If ``True``, use TCP to make the query. | |
| *resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use. | |
| If ``None``, the default, then the default resolver is used. | |
| *lifetime*, a ``float``, the total time to allow for the queries needed | |
| to determine the zone. If ``None``, the default, then only the individual | |
| query limits of the resolver apply. | |
| Raises ``dns.resolver.NoRootSOA`` if there is no SOA RR at the DNS | |
| root. (This is only likely to happen if you're using non-default | |
| root servers in your network and they are misconfigured.) | |
| Raises ``dns.resolver.LifetimeTimeout`` if the answer could not be | |
| found in the allotted lifetime. | |
| Returns a ``dns.name.Name``. | |
| """ | |
| if isinstance(name, str): | |
| name = dns.name.from_text(name, dns.name.root) | |
| if resolver is None: | |
| resolver = get_default_resolver() | |
| if not name.is_absolute(): | |
| raise NotAbsolute(name) | |
| start = time.time() | |
| expiration: Optional[float] | |
| if lifetime is not None: | |
| expiration = start + lifetime | |
| else: | |
| expiration = None | |
| while 1: | |
| try: | |
| rlifetime: Optional[float] | |
| if expiration is not None: | |
| rlifetime = expiration - time.time() | |
| if rlifetime <= 0: | |
| rlifetime = 0 | |
| else: | |
| rlifetime = None | |
| answer = resolver.resolve( | |
| name, dns.rdatatype.SOA, rdclass, tcp, lifetime=rlifetime | |
| ) | |
| assert answer.rrset is not None | |
| if answer.rrset.name == name: | |
| return name | |
| # otherwise we were CNAMEd or DNAMEd and need to look higher | |
| except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e: | |
| if isinstance(e, dns.resolver.NXDOMAIN): | |
| response = e.responses().get(name) | |
| else: | |
| response = e.response() # pylint: disable=no-value-for-parameter | |
| if response: | |
| for rrs in response.authority: | |
| if rrs.rdtype == dns.rdatatype.SOA and rrs.rdclass == rdclass: | |
| (nr, _, _) = rrs.name.fullcompare(name) | |
| if nr == dns.name.NAMERELN_SUPERDOMAIN: | |
| # We're doing a proper superdomain check as | |
| # if the name were equal we ought to have gotten | |
| # it in the answer section! We are ignoring the | |
| # possibility that the authority is insane and | |
| # is including multiple SOA RRs for different | |
| # authorities. | |
| return rrs.name | |
| # we couldn't extract anything useful from the response (e.g. it's | |
| # a type 3 NXDOMAIN) | |
| try: | |
| name = name.parent() | |
| except dns.name.NoParent: | |
| raise NoRootSOA | |
| def make_resolver_at( | |
| where: Union[dns.name.Name, str], | |
| port: int = 53, | |
| family: int = socket.AF_UNSPEC, | |
| resolver: Optional[Resolver] = None, | |
| ) -> Resolver: | |
| """Make a stub resolver using the specified destination as the full resolver. | |
| *where*, a ``dns.name.Name`` or ``str`` the domain name or IP address of the | |
| full resolver. | |
| *port*, an ``int``, the port to use. If not specified, the default is 53. | |
| *family*, an ``int``, the address family to use. This parameter is used if | |
| *where* is not an address. The default is ``socket.AF_UNSPEC`` in which case | |
| the first address returned by ``resolve_name()`` will be used, otherwise the | |
| first address of the specified family will be used. | |
| *resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use for | |
| resolution of hostnames. If not specified, the default resolver will be used. | |
| Returns a ``dns.resolver.Resolver`` or raises an exception. | |
| """ | |
| if resolver is None: | |
| resolver = get_default_resolver() | |
| nameservers: List[Union[str, dns.nameserver.Nameserver]] = [] | |
| if isinstance(where, str) and dns.inet.is_address(where): | |
| nameservers.append(dns.nameserver.Do53Nameserver(where, port)) | |
| else: | |
| for address in resolver.resolve_name(where, family).addresses(): | |
| nameservers.append(dns.nameserver.Do53Nameserver(address, port)) | |
| res = dns.resolver.Resolver(configure=False) | |
| res.nameservers = nameservers | |
| return res | |
| def resolve_at( | |
| where: Union[dns.name.Name, str], | |
| qname: Union[dns.name.Name, str], | |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
| tcp: bool = False, | |
| source: Optional[str] = None, | |
| raise_on_no_answer: bool = True, | |
| source_port: int = 0, | |
| lifetime: Optional[float] = None, | |
| search: Optional[bool] = None, | |
| port: int = 53, | |
| family: int = socket.AF_UNSPEC, | |
| resolver: Optional[Resolver] = None, | |
| ) -> Answer: | |
| """Query nameservers to find the answer to the question. | |
| This is a convenience function that calls ``dns.resolver.make_resolver_at()`` to | |
| make a resolver, and then uses it to resolve the query. | |
| See ``dns.resolver.Resolver.resolve`` for more information on the resolution | |
| parameters, and ``dns.resolver.make_resolver_at`` for information about the resolver | |
| parameters *where*, *port*, *family*, and *resolver*. | |
| If making more than one query, it is more efficient to call | |
| ``dns.resolver.make_resolver_at()`` and then use that resolver for the queries | |
| instead of calling ``resolve_at()`` multiple times. | |
| """ | |
| return make_resolver_at(where, port, family, resolver).resolve( | |
| qname, | |
| rdtype, | |
| rdclass, | |
| tcp, | |
| source, | |
| raise_on_no_answer, | |
| source_port, | |
| lifetime, | |
| search, | |
| ) | |
| # | |
| # Support for overriding the system resolver for all python code in the | |
| # running process. | |
| # | |
| _protocols_for_socktype = { | |
| socket.SOCK_DGRAM: [socket.SOL_UDP], | |
| socket.SOCK_STREAM: [socket.SOL_TCP], | |
| } | |
| _resolver = None | |
| _original_getaddrinfo = socket.getaddrinfo | |
| _original_getnameinfo = socket.getnameinfo | |
| _original_getfqdn = socket.getfqdn | |
| _original_gethostbyname = socket.gethostbyname | |
| _original_gethostbyname_ex = socket.gethostbyname_ex | |
| _original_gethostbyaddr = socket.gethostbyaddr | |
| def _getaddrinfo( | |
| host=None, service=None, family=socket.AF_UNSPEC, socktype=0, proto=0, flags=0 | |
| ): | |
| if flags & socket.AI_NUMERICHOST != 0: | |
| # Short circuit directly into the system's getaddrinfo(). We're | |
| # not adding any value in this case, and this avoids infinite loops | |
| # because dns.query.* needs to call getaddrinfo() for IPv6 scoping | |
| # reasons. We will also do this short circuit below if we | |
| # discover that the host is an address literal. | |
| return _original_getaddrinfo(host, service, family, socktype, proto, flags) | |
| if flags & (socket.AI_ADDRCONFIG | socket.AI_V4MAPPED) != 0: | |
| # Not implemented. We raise a gaierror as opposed to a | |
| # NotImplementedError as it helps callers handle errors more | |
| # appropriately. [Issue #316] | |
| # | |
| # We raise EAI_FAIL as opposed to EAI_SYSTEM because there is | |
| # no EAI_SYSTEM on Windows [Issue #416]. We didn't go for | |
| # EAI_BADFLAGS as the flags aren't bad, we just don't | |
| # implement them. | |
| raise socket.gaierror( | |
| socket.EAI_FAIL, "Non-recoverable failure in name resolution" | |
| ) | |
| if host is None and service is None: | |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
| addrs = [] | |
| canonical_name = None # pylint: disable=redefined-outer-name | |
| # Is host None or an address literal? If so, use the system's | |
| # getaddrinfo(). | |
| if host is None: | |
| return _original_getaddrinfo(host, service, family, socktype, proto, flags) | |
| try: | |
| # We don't care about the result of af_for_address(), we're just | |
| # calling it so it raises an exception if host is not an IPv4 or | |
| # IPv6 address. | |
| dns.inet.af_for_address(host) | |
| return _original_getaddrinfo(host, service, family, socktype, proto, flags) | |
| except Exception: | |
| pass | |
| # Something needs resolution! | |
| try: | |
| answers = _resolver.resolve_name(host, family) | |
| addrs = answers.addresses_and_families() | |
| canonical_name = answers.canonical_name().to_text(True) | |
| except dns.resolver.NXDOMAIN: | |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
| except Exception: | |
| # We raise EAI_AGAIN here as the failure may be temporary | |
| # (e.g. a timeout) and EAI_SYSTEM isn't defined on Windows. | |
| # [Issue #416] | |
| raise socket.gaierror(socket.EAI_AGAIN, "Temporary failure in name resolution") | |
| port = None | |
| try: | |
| # Is it a port literal? | |
| if service is None: | |
| port = 0 | |
| else: | |
| port = int(service) | |
| except Exception: | |
| if flags & socket.AI_NUMERICSERV == 0: | |
| try: | |
| port = socket.getservbyname(service) | |
| except Exception: | |
| pass | |
| if port is None: | |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
| tuples = [] | |
| if socktype == 0: | |
| socktypes = [socket.SOCK_DGRAM, socket.SOCK_STREAM] | |
| else: | |
| socktypes = [socktype] | |
| if flags & socket.AI_CANONNAME != 0: | |
| cname = canonical_name | |
| else: | |
| cname = "" | |
| for addr, af in addrs: | |
| for socktype in socktypes: | |
| for proto in _protocols_for_socktype[socktype]: | |
| addr_tuple = dns.inet.low_level_address_tuple((addr, port), af) | |
| tuples.append((af, socktype, proto, cname, addr_tuple)) | |
| if len(tuples) == 0: | |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
| return tuples | |
| def _getnameinfo(sockaddr, flags=0): | |
| host = sockaddr[0] | |
| port = sockaddr[1] | |
| if len(sockaddr) == 4: | |
| scope = sockaddr[3] | |
| family = socket.AF_INET6 | |
| else: | |
| scope = None | |
| family = socket.AF_INET | |
| tuples = _getaddrinfo(host, port, family, socket.SOCK_STREAM, socket.SOL_TCP, 0) | |
| if len(tuples) > 1: | |
| raise socket.error("sockaddr resolved to multiple addresses") | |
| addr = tuples[0][4][0] | |
| if flags & socket.NI_DGRAM: | |
| pname = "udp" | |
| else: | |
| pname = "tcp" | |
| qname = dns.reversename.from_address(addr) | |
| if flags & socket.NI_NUMERICHOST == 0: | |
| try: | |
| answer = _resolver.resolve(qname, "PTR") | |
| hostname = answer.rrset[0].target.to_text(True) | |
| except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): | |
| if flags & socket.NI_NAMEREQD: | |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
| hostname = addr | |
| if scope is not None: | |
| hostname += "%" + str(scope) | |
| else: | |
| hostname = addr | |
| if scope is not None: | |
| hostname += "%" + str(scope) | |
| if flags & socket.NI_NUMERICSERV: | |
| service = str(port) | |
| else: | |
| service = socket.getservbyport(port, pname) | |
| return (hostname, service) | |
| def _getfqdn(name=None): | |
| if name is None: | |
| name = socket.gethostname() | |
| try: | |
| (name, _, _) = _gethostbyaddr(name) | |
| # Python's version checks aliases too, but our gethostbyname | |
| # ignores them, so we do so here as well. | |
| except Exception: | |
| pass | |
| return name | |
| def _gethostbyname(name): | |
| return _gethostbyname_ex(name)[2][0] | |
| def _gethostbyname_ex(name): | |
| aliases = [] | |
| addresses = [] | |
| tuples = _getaddrinfo( | |
| name, 0, socket.AF_INET, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME | |
| ) | |
| canonical = tuples[0][3] | |
| for item in tuples: | |
| addresses.append(item[4][0]) | |
| # XXX we just ignore aliases | |
| return (canonical, aliases, addresses) | |
| def _gethostbyaddr(ip): | |
| try: | |
| dns.ipv6.inet_aton(ip) | |
| sockaddr = (ip, 80, 0, 0) | |
| family = socket.AF_INET6 | |
| except Exception: | |
| try: | |
| dns.ipv4.inet_aton(ip) | |
| except Exception: | |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
| sockaddr = (ip, 80) | |
| family = socket.AF_INET | |
| (name, _) = _getnameinfo(sockaddr, socket.NI_NAMEREQD) | |
| aliases = [] | |
| addresses = [] | |
| tuples = _getaddrinfo( | |
| name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME | |
| ) | |
| canonical = tuples[0][3] | |
| # We only want to include an address from the tuples if it's the | |
| # same as the one we asked about. We do this comparison in binary | |
| # to avoid any differences in text representations. | |
| bin_ip = dns.inet.inet_pton(family, ip) | |
| for item in tuples: | |
| addr = item[4][0] | |
| bin_addr = dns.inet.inet_pton(family, addr) | |
| if bin_ip == bin_addr: | |
| addresses.append(addr) | |
| # XXX we just ignore aliases | |
| return (canonical, aliases, addresses) | |
| def override_system_resolver(resolver: Optional[Resolver] = None) -> None: | |
| """Override the system resolver routines in the socket module with | |
| versions which use dnspython's resolver. | |
| This can be useful in testing situations where you want to control | |
| the resolution behavior of python code without having to change | |
| the system's resolver settings (e.g. /etc/resolv.conf). | |
| The resolver to use may be specified; if it's not, the default | |
| resolver will be used. | |
| resolver, a ``dns.resolver.Resolver`` or ``None``, the resolver to use. | |
| """ | |
| if resolver is None: | |
| resolver = get_default_resolver() | |
| global _resolver | |
| _resolver = resolver | |
| socket.getaddrinfo = _getaddrinfo | |
| socket.getnameinfo = _getnameinfo | |
| socket.getfqdn = _getfqdn | |
| socket.gethostbyname = _gethostbyname | |
| socket.gethostbyname_ex = _gethostbyname_ex | |
| socket.gethostbyaddr = _gethostbyaddr | |
| def restore_system_resolver() -> None: | |
| """Undo the effects of prior override_system_resolver().""" | |
| global _resolver | |
| _resolver = None | |
| socket.getaddrinfo = _original_getaddrinfo | |
| socket.getnameinfo = _original_getnameinfo | |
| socket.getfqdn = _original_getfqdn | |
| socket.gethostbyname = _original_gethostbyname | |
| socket.gethostbyname_ex = _original_gethostbyname_ex | |
| socket.gethostbyaddr = _original_gethostbyaddr | |