Spaces:
Paused
Paused
| from typing import Optional, Union | |
| from urllib.parse import urlparse | |
| import dns.asyncbackend | |
| import dns.asyncquery | |
| import dns.inet | |
| import dns.message | |
| import dns.query | |
| class Nameserver: | |
| def __init__(self): | |
| pass | |
| def __str__(self): | |
| raise NotImplementedError | |
| def kind(self) -> str: | |
| raise NotImplementedError | |
| def is_always_max_size(self) -> bool: | |
| raise NotImplementedError | |
| def answer_nameserver(self) -> str: | |
| raise NotImplementedError | |
| def answer_port(self) -> int: | |
| raise NotImplementedError | |
| def query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| raise NotImplementedError | |
| async def async_query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool, | |
| backend: dns.asyncbackend.Backend, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| raise NotImplementedError | |
| class AddressAndPortNameserver(Nameserver): | |
| def __init__(self, address: str, port: int): | |
| super().__init__() | |
| self.address = address | |
| self.port = port | |
| def kind(self) -> str: | |
| raise NotImplementedError | |
| def is_always_max_size(self) -> bool: | |
| return False | |
| def __str__(self): | |
| ns_kind = self.kind() | |
| return f"{ns_kind}:{self.address}@{self.port}" | |
| def answer_nameserver(self) -> str: | |
| return self.address | |
| def answer_port(self) -> int: | |
| return self.port | |
| class Do53Nameserver(AddressAndPortNameserver): | |
| def __init__(self, address: str, port: int = 53): | |
| super().__init__(address, port) | |
| def kind(self): | |
| return "Do53" | |
| def query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| if max_size: | |
| response = dns.query.tcp( | |
| request, | |
| self.address, | |
| timeout=timeout, | |
| port=self.port, | |
| source=source, | |
| source_port=source_port, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| ) | |
| else: | |
| response = dns.query.udp( | |
| request, | |
| self.address, | |
| timeout=timeout, | |
| port=self.port, | |
| source=source, | |
| source_port=source_port, | |
| raise_on_truncation=True, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| ignore_errors=True, | |
| ignore_unexpected=True, | |
| ) | |
| return response | |
| async def async_query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool, | |
| backend: dns.asyncbackend.Backend, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| if max_size: | |
| response = await dns.asyncquery.tcp( | |
| request, | |
| self.address, | |
| timeout=timeout, | |
| port=self.port, | |
| source=source, | |
| source_port=source_port, | |
| backend=backend, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| ) | |
| else: | |
| response = await dns.asyncquery.udp( | |
| request, | |
| self.address, | |
| timeout=timeout, | |
| port=self.port, | |
| source=source, | |
| source_port=source_port, | |
| raise_on_truncation=True, | |
| backend=backend, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| ignore_errors=True, | |
| ignore_unexpected=True, | |
| ) | |
| return response | |
| class DoHNameserver(Nameserver): | |
| def __init__( | |
| self, | |
| url: str, | |
| bootstrap_address: Optional[str] = None, | |
| verify: Union[bool, str] = True, | |
| want_get: bool = False, | |
| ): | |
| super().__init__() | |
| self.url = url | |
| self.bootstrap_address = bootstrap_address | |
| self.verify = verify | |
| self.want_get = want_get | |
| def kind(self): | |
| return "DoH" | |
| def is_always_max_size(self) -> bool: | |
| return True | |
| def __str__(self): | |
| return self.url | |
| def answer_nameserver(self) -> str: | |
| return self.url | |
| def answer_port(self) -> int: | |
| port = urlparse(self.url).port | |
| if port is None: | |
| port = 443 | |
| return port | |
| def query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool = False, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| return dns.query.https( | |
| request, | |
| self.url, | |
| timeout=timeout, | |
| source=source, | |
| source_port=source_port, | |
| bootstrap_address=self.bootstrap_address, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| verify=self.verify, | |
| post=(not self.want_get), | |
| ) | |
| async def async_query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool, | |
| backend: dns.asyncbackend.Backend, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| return await dns.asyncquery.https( | |
| request, | |
| self.url, | |
| timeout=timeout, | |
| source=source, | |
| source_port=source_port, | |
| bootstrap_address=self.bootstrap_address, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| verify=self.verify, | |
| post=(not self.want_get), | |
| ) | |
| class DoTNameserver(AddressAndPortNameserver): | |
| def __init__( | |
| self, | |
| address: str, | |
| port: int = 853, | |
| hostname: Optional[str] = None, | |
| verify: Union[bool, str] = True, | |
| ): | |
| super().__init__(address, port) | |
| self.hostname = hostname | |
| self.verify = verify | |
| def kind(self): | |
| return "DoT" | |
| def query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool = False, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| return dns.query.tls( | |
| request, | |
| self.address, | |
| port=self.port, | |
| timeout=timeout, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| server_hostname=self.hostname, | |
| verify=self.verify, | |
| ) | |
| async def async_query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool, | |
| backend: dns.asyncbackend.Backend, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| return await dns.asyncquery.tls( | |
| request, | |
| self.address, | |
| port=self.port, | |
| timeout=timeout, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| server_hostname=self.hostname, | |
| verify=self.verify, | |
| ) | |
| class DoQNameserver(AddressAndPortNameserver): | |
| def __init__( | |
| self, | |
| address: str, | |
| port: int = 853, | |
| verify: Union[bool, str] = True, | |
| server_hostname: Optional[str] = None, | |
| ): | |
| super().__init__(address, port) | |
| self.verify = verify | |
| self.server_hostname = server_hostname | |
| def kind(self): | |
| return "DoQ" | |
| def query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool = False, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| return dns.query.quic( | |
| request, | |
| self.address, | |
| port=self.port, | |
| timeout=timeout, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| verify=self.verify, | |
| server_hostname=self.server_hostname, | |
| ) | |
| async def async_query( | |
| self, | |
| request: dns.message.QueryMessage, | |
| timeout: float, | |
| source: Optional[str], | |
| source_port: int, | |
| max_size: bool, | |
| backend: dns.asyncbackend.Backend, | |
| one_rr_per_rrset: bool = False, | |
| ignore_trailing: bool = False, | |
| ) -> dns.message.Message: | |
| return await dns.asyncquery.quic( | |
| request, | |
| self.address, | |
| port=self.port, | |
| timeout=timeout, | |
| one_rr_per_rrset=one_rr_per_rrset, | |
| ignore_trailing=ignore_trailing, | |
| verify=self.verify, | |
| server_hostname=self.server_hostname, | |
| ) | |