Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions impacket/dcerpc/v5/dcomrt.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,17 @@ def get_cinstance(self):

def set_cinstance(self, cinstance):
self.__cinstance = cinstance

def is_target_loopback(self):
"""
Detect all loopback IP addresses.
This is assuming 127.0.0.0/8 is IPv4 loopback.
Also accounting for IPv6 loopback addresses.
This should only be used to filter user input and automatically redirect any DCOM connections
from local machines to correct string-bindings on the local endpoint mapper.
"""
return self.get_target().startswith('127.') \
or self.get_target() == '::1' or self.get_target() == '0:0:0:0:0:0:0:1' # IPv6 loopback addresses.

def is_fqdn(self):
# I will assume the following
Expand All @@ -1257,7 +1268,7 @@ def is_fqdn(self):
# an FQDN with ':'
# Is it isn't both, then it is a FQDN
try:
socket.inet_aton(self.__target)
socket.inet_aton(self.get_target())
except:
# Not an IPv4
try:
Expand All @@ -1268,9 +1279,8 @@ def is_fqdn(self):
return False

def connect(self, iid = None):
if (self.__target in INTERFACE.CONNECTIONS) is True:
if current_thread().name in INTERFACE.CONNECTIONS[self.__target] and \
(self.__oxid in INTERFACE.CONNECTIONS[self.__target][current_thread().name]) is True:
if self.__target in INTERFACE.CONNECTIONS:
if current_thread().name in INTERFACE.CONNECTIONS[self.__target] and self.__oxid in INTERFACE.CONNECTIONS[self.__target][current_thread().name]:
dce = INTERFACE.CONNECTIONS[self.__target][current_thread().name][self.__oxid]['dce']
currentBinding = INTERFACE.CONNECTIONS[self.__target][current_thread().name][self.__oxid]['currentBinding']
if currentBinding == iid:
Expand All @@ -1281,6 +1291,7 @@ def connect(self, iid = None):
INTERFACE.CONNECTIONS[self.__target][current_thread().name][self.__oxid]['dce'] = newDce
INTERFACE.CONNECTIONS[self.__target][current_thread().name][self.__oxid]['currentBinding'] = iid
else:
# No existing connection, create a new one.
stringBindings = self.get_cinstance().get_string_bindings()
# No OXID present, we should create a new connection and store it
stringBinding = None
Expand All @@ -1291,6 +1302,8 @@ def connect(self, iid = None):
# 1) it's an IPv4 address
# 2) it's an IPv6 address
# 3) it's a NetBios Name
# 4) it's 127.0.0.1 - not resolved via epmapper.
# if it really is a loopback address - return true on any stringbinding.
# we should handle all this cases accordingly
# Does this match exactly what get_target() returns?
LOG.debug('StringBinding: %s' % strBinding['aNetworkAddr'])
Expand All @@ -1303,9 +1316,20 @@ def connect(self, iid = None):
binding = strBinding['aNetworkAddr']
bindingPort = ''

if self.is_target_loopback():
# Accept the first matched string-binding.
# This is because we know we want to connect no matter what.
LOG.debug('Detected loopback DCOM connection attempt, using first available StringBinding.')
stringBinding = 'ncacn_ip_tcp:' + strBinding['aNetworkAddr'][:-1]
# TODO: Decide if we also want to change our self.__target accordingly...?
# Notice - it breaks stuff.
# self.__target = binding
break

if binding.upper().find(self.get_target().upper()) >= 0:
stringBinding = 'ncacn_ip_tcp:' + strBinding['aNetworkAddr'][:-1]
break

# If get_target() is a FQDN, does it match the hostname?
elif isTargetFQDN and binding.upper().find(self.get_target().upper().partition('.')[0]) >= 0:
# Here we replace the aNetworkAddr with self.get_target()
Expand Down