Spaces:
Paused
Paused
| # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
| # Copyright (C) 2001-2007, 2009-2011 Nominum, Inc. | |
| # | |
| # Permission to use, copy, modify, and distribute this software and its | |
| # documentation for any purpose with or without fee is hereby granted, | |
| # provided that the above copyright notice and this permission notice | |
| # appear in all copies. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES | |
| # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
| # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR | |
| # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
| # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
| # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
| # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
| """DNS TSIG support.""" | |
| import base64 | |
| import hashlib | |
| import hmac | |
| import struct | |
| import dns.exception | |
| import dns.name | |
| import dns.rcode | |
| import dns.rdataclass | |
| class BadTime(dns.exception.DNSException): | |
| """The current time is not within the TSIG's validity time.""" | |
| class BadSignature(dns.exception.DNSException): | |
| """The TSIG signature fails to verify.""" | |
| class BadKey(dns.exception.DNSException): | |
| """The TSIG record owner name does not match the key.""" | |
| class BadAlgorithm(dns.exception.DNSException): | |
| """The TSIG algorithm does not match the key.""" | |
| class PeerError(dns.exception.DNSException): | |
| """Base class for all TSIG errors generated by the remote peer""" | |
| class PeerBadKey(PeerError): | |
| """The peer didn't know the key we used""" | |
| class PeerBadSignature(PeerError): | |
| """The peer didn't like the signature we sent""" | |
| class PeerBadTime(PeerError): | |
| """The peer didn't like the time we sent""" | |
| class PeerBadTruncation(PeerError): | |
| """The peer didn't like amount of truncation in the TSIG we sent""" | |
| # TSIG Algorithms | |
| HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT") | |
| HMAC_SHA1 = dns.name.from_text("hmac-sha1") | |
| HMAC_SHA224 = dns.name.from_text("hmac-sha224") | |
| HMAC_SHA256 = dns.name.from_text("hmac-sha256") | |
| HMAC_SHA256_128 = dns.name.from_text("hmac-sha256-128") | |
| HMAC_SHA384 = dns.name.from_text("hmac-sha384") | |
| HMAC_SHA384_192 = dns.name.from_text("hmac-sha384-192") | |
| HMAC_SHA512 = dns.name.from_text("hmac-sha512") | |
| HMAC_SHA512_256 = dns.name.from_text("hmac-sha512-256") | |
| GSS_TSIG = dns.name.from_text("gss-tsig") | |
| default_algorithm = HMAC_SHA256 | |
| mac_sizes = { | |
| HMAC_SHA1: 20, | |
| HMAC_SHA224: 28, | |
| HMAC_SHA256: 32, | |
| HMAC_SHA256_128: 16, | |
| HMAC_SHA384: 48, | |
| HMAC_SHA384_192: 24, | |
| HMAC_SHA512: 64, | |
| HMAC_SHA512_256: 32, | |
| HMAC_MD5: 16, | |
| GSS_TSIG: 128, # This is what we assume to be the worst case! | |
| } | |
| class GSSTSig: | |
| """ | |
| GSS-TSIG TSIG implementation. This uses the GSS-API context established | |
| in the TKEY message handshake to sign messages using GSS-API message | |
| integrity codes, per the RFC. | |
| In order to avoid a direct GSSAPI dependency, the keyring holds a ref | |
| to the GSSAPI object required, rather than the key itself. | |
| """ | |
| def __init__(self, gssapi_context): | |
| self.gssapi_context = gssapi_context | |
| self.data = b"" | |
| self.name = "gss-tsig" | |
| def update(self, data): | |
| self.data += data | |
| def sign(self): | |
| # defer to the GSSAPI function to sign | |
| return self.gssapi_context.get_signature(self.data) | |
| def verify(self, expected): | |
| try: | |
| # defer to the GSSAPI function to verify | |
| return self.gssapi_context.verify_signature(self.data, expected) | |
| except Exception: | |
| # note the usage of a bare exception | |
| raise BadSignature | |
| class GSSTSigAdapter: | |
| def __init__(self, keyring): | |
| self.keyring = keyring | |
| def __call__(self, message, keyname): | |
| if keyname in self.keyring: | |
| key = self.keyring[keyname] | |
| if isinstance(key, Key) and key.algorithm == GSS_TSIG: | |
| if message: | |
| GSSTSigAdapter.parse_tkey_and_step(key, message, keyname) | |
| return key | |
| else: | |
| return None | |
| def parse_tkey_and_step(cls, key, message, keyname): | |
| # if the message is a TKEY type, absorb the key material | |
| # into the context using step(); this is used to allow the | |
| # client to complete the GSSAPI negotiation before attempting | |
| # to verify the signed response to a TKEY message exchange | |
| try: | |
| rrset = message.find_rrset( | |
| message.answer, keyname, dns.rdataclass.ANY, dns.rdatatype.TKEY | |
| ) | |
| if rrset: | |
| token = rrset[0].key | |
| gssapi_context = key.secret | |
| return gssapi_context.step(token) | |
| except KeyError: | |
| pass | |
| class HMACTSig: | |
| """ | |
| HMAC TSIG implementation. This uses the HMAC python module to handle the | |
| sign/verify operations. | |
| """ | |
| _hashes = { | |
| HMAC_SHA1: hashlib.sha1, | |
| HMAC_SHA224: hashlib.sha224, | |
| HMAC_SHA256: hashlib.sha256, | |
| HMAC_SHA256_128: (hashlib.sha256, 128), | |
| HMAC_SHA384: hashlib.sha384, | |
| HMAC_SHA384_192: (hashlib.sha384, 192), | |
| HMAC_SHA512: hashlib.sha512, | |
| HMAC_SHA512_256: (hashlib.sha512, 256), | |
| HMAC_MD5: hashlib.md5, | |
| } | |
| def __init__(self, key, algorithm): | |
| try: | |
| hashinfo = self._hashes[algorithm] | |
| except KeyError: | |
| raise NotImplementedError(f"TSIG algorithm {algorithm} is not supported") | |
| # create the HMAC context | |
| if isinstance(hashinfo, tuple): | |
| self.hmac_context = hmac.new(key, digestmod=hashinfo[0]) | |
| self.size = hashinfo[1] | |
| else: | |
| self.hmac_context = hmac.new(key, digestmod=hashinfo) | |
| self.size = None | |
| self.name = self.hmac_context.name | |
| if self.size: | |
| self.name += f"-{self.size}" | |
| def update(self, data): | |
| return self.hmac_context.update(data) | |
| def sign(self): | |
| # defer to the HMAC digest() function for that digestmod | |
| digest = self.hmac_context.digest() | |
| if self.size: | |
| digest = digest[: (self.size // 8)] | |
| return digest | |
| def verify(self, expected): | |
| # re-digest and compare the results | |
| mac = self.sign() | |
| if not hmac.compare_digest(mac, expected): | |
| raise BadSignature | |
| def _digest(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=None): | |
| """Return a context containing the TSIG rdata for the input parameters | |
| @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object | |
| @raises ValueError: I{other_data} is too long | |
| @raises NotImplementedError: I{algorithm} is not supported | |
| """ | |
| first = not (ctx and multi) | |
| if first: | |
| ctx = get_context(key) | |
| if request_mac: | |
| ctx.update(struct.pack("!H", len(request_mac))) | |
| ctx.update(request_mac) | |
| ctx.update(struct.pack("!H", rdata.original_id)) | |
| ctx.update(wire[2:]) | |
| if first: | |
| ctx.update(key.name.to_digestable()) | |
| ctx.update(struct.pack("!H", dns.rdataclass.ANY)) | |
| ctx.update(struct.pack("!I", 0)) | |
| if time is None: | |
| time = rdata.time_signed | |
| upper_time = (time >> 32) & 0xFFFF | |
| lower_time = time & 0xFFFFFFFF | |
| time_encoded = struct.pack("!HIH", upper_time, lower_time, rdata.fudge) | |
| other_len = len(rdata.other) | |
| if other_len > 65535: | |
| raise ValueError("TSIG Other Data is > 65535 bytes") | |
| if first: | |
| ctx.update(key.algorithm.to_digestable() + time_encoded) | |
| ctx.update(struct.pack("!HH", rdata.error, other_len) + rdata.other) | |
| else: | |
| ctx.update(time_encoded) | |
| return ctx | |
| def _maybe_start_digest(key, mac, multi): | |
| """If this is the first message in a multi-message sequence, | |
| start a new context. | |
| @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object | |
| """ | |
| if multi: | |
| ctx = get_context(key) | |
| ctx.update(struct.pack("!H", len(mac))) | |
| ctx.update(mac) | |
| return ctx | |
| else: | |
| return None | |
| def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False): | |
| """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata | |
| for the input parameters, the HMAC MAC calculated by applying the | |
| TSIG signature algorithm, and the TSIG digest context. | |
| @rtype: (string, dns.tsig.HMACTSig or dns.tsig.GSSTSig object) | |
| @raises ValueError: I{other_data} is too long | |
| @raises NotImplementedError: I{algorithm} is not supported | |
| """ | |
| ctx = _digest(wire, key, rdata, time, request_mac, ctx, multi) | |
| mac = ctx.sign() | |
| tsig = rdata.replace(time_signed=time, mac=mac) | |
| return (tsig, _maybe_start_digest(key, mac, multi)) | |
| def validate( | |
| wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None, multi=False | |
| ): | |
| """Validate the specified TSIG rdata against the other input parameters. | |
| @raises FormError: The TSIG is badly formed. | |
| @raises BadTime: There is too much time skew between the client and the | |
| server. | |
| @raises BadSignature: The TSIG signature did not validate | |
| @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object""" | |
| (adcount,) = struct.unpack("!H", wire[10:12]) | |
| if adcount == 0: | |
| raise dns.exception.FormError | |
| adcount -= 1 | |
| new_wire = wire[0:10] + struct.pack("!H", adcount) + wire[12:tsig_start] | |
| if rdata.error != 0: | |
| if rdata.error == dns.rcode.BADSIG: | |
| raise PeerBadSignature | |
| elif rdata.error == dns.rcode.BADKEY: | |
| raise PeerBadKey | |
| elif rdata.error == dns.rcode.BADTIME: | |
| raise PeerBadTime | |
| elif rdata.error == dns.rcode.BADTRUNC: | |
| raise PeerBadTruncation | |
| else: | |
| raise PeerError("unknown TSIG error code %d" % rdata.error) | |
| if abs(rdata.time_signed - now) > rdata.fudge: | |
| raise BadTime | |
| if key.name != owner: | |
| raise BadKey | |
| if key.algorithm != rdata.algorithm: | |
| raise BadAlgorithm | |
| ctx = _digest(new_wire, key, rdata, None, request_mac, ctx, multi) | |
| ctx.verify(rdata.mac) | |
| return _maybe_start_digest(key, rdata.mac, multi) | |
| def get_context(key): | |
| """Returns an HMAC context for the specified key. | |
| @rtype: HMAC context | |
| @raises NotImplementedError: I{algorithm} is not supported | |
| """ | |
| if key.algorithm == GSS_TSIG: | |
| return GSSTSig(key.secret) | |
| else: | |
| return HMACTSig(key.secret, key.algorithm) | |
| class Key: | |
| def __init__(self, name, secret, algorithm=default_algorithm): | |
| if isinstance(name, str): | |
| name = dns.name.from_text(name) | |
| self.name = name | |
| if isinstance(secret, str): | |
| secret = base64.decodebytes(secret.encode()) | |
| self.secret = secret | |
| if isinstance(algorithm, str): | |
| algorithm = dns.name.from_text(algorithm) | |
| self.algorithm = algorithm | |
| def __eq__(self, other): | |
| return ( | |
| isinstance(other, Key) | |
| and self.name == other.name | |
| and self.secret == other.secret | |
| and self.algorithm == other.algorithm | |
| ) | |
| def __repr__(self): | |
| r = f"<DNS key name='{self.name}', " + f"algorithm='{self.algorithm}'" | |
| if self.algorithm != GSS_TSIG: | |
| r += f", secret='{base64.b64encode(self.secret).decode()}'" | |
| r += ">" | |
| return r | |