Skip to content

Commit

Permalink
Replace distorm3 with Capstone
Browse files Browse the repository at this point in the history
  • Loading branch information
brkzlr committed Sep 21, 2024
1 parent cae0edf commit feb918b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 79 deletions.
133 changes: 61 additions & 72 deletions libpince/gdb_python_scripts/gdbextensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import gdb, pickle, json, sys, re, struct, ctypes, os, shelve, distorm3, importlib
import gdb, pickle, sys, re, struct, ctypes, os, shelve, importlib
from capstone import Cs, CsError, CS_ARCH_X86, CS_MODE_32, CS_MODE_64
from collections import OrderedDict

# This is some retarded hack
Expand Down Expand Up @@ -204,7 +205,9 @@ def invoke(self, arg, from_tty):
return
for index in range(int(4096 / chunk_size)):
current_offset = chunk_size * index
stack_indicator = hex(sp_address + current_offset) + "(" + stack_register + "+" + hex(current_offset) + ")"
stack_indicator = (
hex(sp_address + current_offset) + "(" + stack_register + "+" + hex(current_offset) + ")"
)
try:
FILE.seek(old_position)
read = FILE.read(chunk_size)
Expand Down Expand Up @@ -422,92 +425,78 @@ def is_memory_valid(self, int_address, discard_invalid_strings=False):

def invoke(self, arg, from_tty):
if gdbutils.current_arch == typedefs.INFERIOR_ARCH.ARCH_64:
disas_option = distorm3.Decode64Bits
disassembler = Cs(CS_ARCH_X86, CS_MODE_64)
else:
disas_option = distorm3.Decode32Bits
disassembler = Cs(CS_ARCH_X86, CS_MODE_32)
disassembler.skipdata = True
referenced_strings_dict = shelve.open(utils.get_referenced_strings_file(pid), writeback=True)
referenced_jumps_dict = shelve.open(utils.get_referenced_jumps_file(pid), writeback=True)
referenced_calls_dict = shelve.open(utils.get_referenced_calls_file(pid), writeback=True)
region_list, discard_invalid_strings = receive_from_pince()
dissect_code_status_file = utils.get_dissect_code_status_file(pid)
region_count = len(region_list)
self.memory = open(gdbutils.mem_file, "rb")
buffer = 0x100000
ref_str_count = len(referenced_strings_dict)
ref_jmp_count = len(referenced_jumps_dict)
ref_call_count = len(referenced_calls_dict)
for region_index, (start_addr, end_addr) in enumerate(region_list):
region_info = start_addr + "-" + end_addr, str(region_index + 1) + " / " + str(region_count)
start_addr = int(start_addr, 16) # Becomes address of the last disassembled instruction later on
end_addr = int(end_addr, 16)
region_finished = False
while not region_finished:
remaining_space = end_addr - start_addr
if remaining_space < buffer:
offset = remaining_space
region_finished = True
else:
offset = buffer
status_info = region_info + (
hex(start_addr)[2:] + "-" + hex(start_addr + offset)[2:],
ref_str_count,
ref_jmp_count,
ref_call_count,
)
pickle.dump(status_info, open(dissect_code_status_file, "wb"))
try:
self.memory.seek(start_addr)
except (OSError, ValueError):
break
code = self.memory.read(offset)
disas_data = distorm3.Decode(start_addr, code, disas_option)
if not region_finished:
last_disas_addr = disas_data[-4][0]
for index in range(4):
del disas_data[-1] # Get rid of last 4 instructions to ensure correct bytecode translation
status_info = region_info + (
hex(start_addr)[2:] + "-" + hex(end_addr)[2:],
ref_str_count,
ref_jmp_count,
ref_call_count,
)
pickle.dump(status_info, open(dissect_code_status_file, "wb"))
try:
self.memory.seek(start_addr)
except (OSError, ValueError):
break
buffer_size = end_addr - start_addr
code = self.memory.read(buffer_size)
try:
disas_data = disassembler.disasm_lite(code, start_addr)
except CsError as e:
print(e)
break
for instruction_addr, _, mnemonic, operands in disas_data:
instruction = f"{mnemonic} {operands}" if operands != "" else mnemonic
found = regexes.dissect_code_valid_address.search(instruction)
if not found:
continue
if instruction.startswith("j") or instruction.startswith("loop"):
referenced_address_str = regexes.hex_number.search(found.group(0)).group(0)
referenced_address_int = int(referenced_address_str, 16)
if self.is_memory_valid(referenced_address_int):
instruction_only = regexes.alphanumerics.search(instruction).group(0).casefold()
try:
referenced_jumps_dict[referenced_address_str][instruction_addr] = instruction_only
except KeyError:
referenced_jumps_dict[referenced_address_str] = {}
referenced_jumps_dict[referenced_address_str][instruction_addr] = instruction_only
ref_jmp_count += 1
elif instruction.startswith("call"):
referenced_address_str = regexes.hex_number.search(found.group(0)).group(0)
referenced_address_int = int(referenced_address_str, 16)
if self.is_memory_valid(referenced_address_int):
try:
referenced_calls_dict[referenced_address_str].add(instruction_addr)
except KeyError:
referenced_calls_dict[referenced_address_str] = set()
referenced_calls_dict[referenced_address_str].add(instruction_addr)
ref_call_count += 1
else:
last_disas_addr = 0
for instruction_offset, size, instruction, hexdump in disas_data:
if isinstance(instruction, bytes):
instruction = instruction.decode()
if instruction.startswith("J") or instruction.startswith("LOOP"):
found = regexes.dissect_code_valid_address.search(instruction)
if found:
referenced_address_str = regexes.hex_number.search(found.group(0)).group(0)
referenced_address_int = int(referenced_address_str, 16)
if self.is_memory_valid(referenced_address_int):
instruction_only = regexes.alphanumerics.search(instruction).group(0).casefold()
try:
referenced_jumps_dict[referenced_address_str][instruction_offset] = instruction_only
except KeyError:
referenced_jumps_dict[referenced_address_str] = {}
referenced_jumps_dict[referenced_address_str][instruction_offset] = instruction_only
ref_jmp_count += 1
elif instruction.startswith("CALL"):
found = regexes.dissect_code_valid_address.search(instruction)
if found:
referenced_address_str = regexes.hex_number.search(found.group(0)).group(0)
referenced_address_int = int(referenced_address_str, 16)
if self.is_memory_valid(referenced_address_int):
try:
referenced_calls_dict[referenced_address_str].add(instruction_offset)
except KeyError:
referenced_calls_dict[referenced_address_str] = set()
referenced_calls_dict[referenced_address_str].add(instruction_offset)
ref_call_count += 1
else:
found = regexes.dissect_code_valid_address.search(instruction)
if found:
referenced_address_str = regexes.hex_number.search(found.group(0)).group(0)
referenced_address_int = int(referenced_address_str, 16)
if self.is_memory_valid(referenced_address_int, discard_invalid_strings):
try:
referenced_strings_dict[referenced_address_str].add(instruction_offset)
except KeyError:
referenced_strings_dict[referenced_address_str] = set()
referenced_strings_dict[referenced_address_str].add(instruction_offset)
ref_str_count += 1
start_addr = last_disas_addr
referenced_address_str = regexes.hex_number.search(found.group(0)).group(0)
referenced_address_int = int(referenced_address_str, 16)
if self.is_memory_valid(referenced_address_int, discard_invalid_strings):
try:
referenced_strings_dict[referenced_address_str].add(instruction_addr)
except KeyError:
referenced_strings_dict[referenced_address_str] = set()
referenced_strings_dict[referenced_address_str].add(instruction_addr)
ref_str_count += 1
self.memory.close()


Expand Down
21 changes: 15 additions & 6 deletions libpince/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import os, shutil, sys, binascii, pickle, json, traceback, re, pwd, pathlib, distorm3
import os, shutil, sys, binascii, pickle, json, traceback, re, pwd, pathlib
from . import typedefs, regexes
from capstone import Cs, CsError, CS_ARCH_X86, CS_MODE_32, CS_MODE_64
from keystone import Ks, KsError, KS_ARCH_X86, KS_MODE_32, KS_MODE_64
from collections import OrderedDict
from importlib.machinery import SourceFileLoader
from pygdbmi import gdbmiparser

# Capstone initialization
cs_32 = Cs(CS_ARCH_X86, CS_MODE_32)
cs_64 = Cs(CS_ARCH_X86, CS_MODE_64)

# Keystone initialization
ks_32 = Ks(KS_ARCH_X86, KS_MODE_32)
ks_64 = Ks(KS_ARCH_X86, KS_MODE_64)
Expand Down Expand Up @@ -666,15 +671,19 @@ def get_opcodes(address, aob, inferior_arch):
None: If there was an error
"""
if inferior_arch == typedefs.INFERIOR_ARCH.ARCH_64:
disas_option = distorm3.Decode64Bits
elif inferior_arch == typedefs.INFERIOR_ARCH.ARCH_32:
disas_option = distorm3.Decode32Bits
disassembler = cs_64
else:
disassembler = cs_32
disassembler.skipdata = True
try:
bytecode = bytes.fromhex(aob.replace(" ", ""))
except ValueError:
return
disas_data = distorm3.Decode(address, bytecode, disas_option)
return "; ".join([data[2] for data in disas_data])
try:
disas_data = disassembler.disasm_lite(bytecode, address)
return "; ".join([f"{data[2]} {data[3]}" if data[3] != "" else data[2] for data in disas_data])
except CsError as e:
print(e)


def assemble(instructions, address, inferior_arch):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PyQt6==6.6.0
PyQt6-Qt6==6.6.0
pexpect==4.9.0
distorm3==3.5.2
capstone==5.0.3
keystone-engine==0.9.2
pygdbmi==0.11.0.0
keyboard==0.13.5
Expand Down

0 comments on commit feb918b

Please sign in to comment.