diff --git a/libpince/gdb_python_scripts/gdbextensions.py b/libpince/gdb_python_scripts/gdbextensions.py index 9a89444a..d70d240d 100644 --- a/libpince/gdb_python_scripts/gdbextensions.py +++ b/libpince/gdb_python_scripts/gdbextensions.py @@ -15,7 +15,8 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . """ -import gdb, pickle, json, sys, re, struct, ctypes, os, shelve, distorm3, importlib +import gdb, pickle, sys, re, struct, ctypes, os, shelve, importlib +from capstone import Cs, CsError, CS_ARCH_X86, CS_MODE_32, CS_MODE_64 from collections import OrderedDict # This is some retarded hack @@ -204,7 +205,9 @@ def invoke(self, arg, from_tty): return for index in range(int(4096 / chunk_size)): current_offset = chunk_size * index - stack_indicator = hex(sp_address + current_offset) + "(" + stack_register + "+" + hex(current_offset) + ")" + stack_indicator = ( + hex(sp_address + current_offset) + "(" + stack_register + "+" + hex(current_offset) + ")" + ) try: FILE.seek(old_position) read = FILE.read(chunk_size) @@ -422,9 +425,10 @@ def is_memory_valid(self, int_address, discard_invalid_strings=False): def invoke(self, arg, from_tty): if gdbutils.current_arch == typedefs.INFERIOR_ARCH.ARCH_64: - disas_option = distorm3.Decode64Bits + disassembler = Cs(CS_ARCH_X86, CS_MODE_64) else: - disas_option = distorm3.Decode32Bits + disassembler = Cs(CS_ARCH_X86, CS_MODE_32) + disassembler.skipdata = True referenced_strings_dict = shelve.open(utils.get_referenced_strings_file(pid), writeback=True) referenced_jumps_dict = shelve.open(utils.get_referenced_jumps_file(pid), writeback=True) referenced_calls_dict = shelve.open(utils.get_referenced_calls_file(pid), writeback=True) @@ -432,7 +436,6 @@ def invoke(self, arg, from_tty): dissect_code_status_file = utils.get_dissect_code_status_file(pid) region_count = len(region_list) self.memory = open(gdbutils.mem_file, "rb") - buffer = 0x100000 ref_str_count = len(referenced_strings_dict) ref_jmp_count = len(referenced_jumps_dict) ref_call_count = len(referenced_calls_dict) @@ -440,74 +443,60 @@ def invoke(self, arg, from_tty): region_info = start_addr + "-" + end_addr, str(region_index + 1) + " / " + str(region_count) start_addr = int(start_addr, 16) # Becomes address of the last disassembled instruction later on end_addr = int(end_addr, 16) - region_finished = False - while not region_finished: - remaining_space = end_addr - start_addr - if remaining_space < buffer: - offset = remaining_space - region_finished = True - else: - offset = buffer - status_info = region_info + ( - hex(start_addr)[2:] + "-" + hex(start_addr + offset)[2:], - ref_str_count, - ref_jmp_count, - ref_call_count, - ) - pickle.dump(status_info, open(dissect_code_status_file, "wb")) - try: - self.memory.seek(start_addr) - except (OSError, ValueError): - break - code = self.memory.read(offset) - disas_data = distorm3.Decode(start_addr, code, disas_option) - if not region_finished: - last_disas_addr = disas_data[-4][0] - for index in range(4): - del disas_data[-1] # Get rid of last 4 instructions to ensure correct bytecode translation + status_info = region_info + ( + hex(start_addr)[2:] + "-" + hex(end_addr)[2:], + ref_str_count, + ref_jmp_count, + ref_call_count, + ) + pickle.dump(status_info, open(dissect_code_status_file, "wb")) + try: + self.memory.seek(start_addr) + except (OSError, ValueError): + break + buffer_size = end_addr - start_addr + code = self.memory.read(buffer_size) + try: + disas_data = disassembler.disasm_lite(code, start_addr) + except CsError as e: + print(e) + break + for instruction_addr, _, mnemonic, operands in disas_data: + instruction = f"{mnemonic} {operands}" if operands != "" else mnemonic + found = regexes.dissect_code_valid_address.search(instruction) + if not found: + continue + if instruction.startswith("j") or instruction.startswith("loop"): + referenced_address_str = regexes.hex_number.search(found.group(0)).group(0) + referenced_address_int = int(referenced_address_str, 16) + if self.is_memory_valid(referenced_address_int): + instruction_only = regexes.alphanumerics.search(instruction).group(0).casefold() + try: + referenced_jumps_dict[referenced_address_str][instruction_addr] = instruction_only + except KeyError: + referenced_jumps_dict[referenced_address_str] = {} + referenced_jumps_dict[referenced_address_str][instruction_addr] = instruction_only + ref_jmp_count += 1 + elif instruction.startswith("call"): + referenced_address_str = regexes.hex_number.search(found.group(0)).group(0) + referenced_address_int = int(referenced_address_str, 16) + if self.is_memory_valid(referenced_address_int): + try: + referenced_calls_dict[referenced_address_str].add(instruction_addr) + except KeyError: + referenced_calls_dict[referenced_address_str] = set() + referenced_calls_dict[referenced_address_str].add(instruction_addr) + ref_call_count += 1 else: - last_disas_addr = 0 - for instruction_offset, size, instruction, hexdump in disas_data: - if isinstance(instruction, bytes): - instruction = instruction.decode() - if instruction.startswith("J") or instruction.startswith("LOOP"): - found = regexes.dissect_code_valid_address.search(instruction) - if found: - referenced_address_str = regexes.hex_number.search(found.group(0)).group(0) - referenced_address_int = int(referenced_address_str, 16) - if self.is_memory_valid(referenced_address_int): - instruction_only = regexes.alphanumerics.search(instruction).group(0).casefold() - try: - referenced_jumps_dict[referenced_address_str][instruction_offset] = instruction_only - except KeyError: - referenced_jumps_dict[referenced_address_str] = {} - referenced_jumps_dict[referenced_address_str][instruction_offset] = instruction_only - ref_jmp_count += 1 - elif instruction.startswith("CALL"): - found = regexes.dissect_code_valid_address.search(instruction) - if found: - referenced_address_str = regexes.hex_number.search(found.group(0)).group(0) - referenced_address_int = int(referenced_address_str, 16) - if self.is_memory_valid(referenced_address_int): - try: - referenced_calls_dict[referenced_address_str].add(instruction_offset) - except KeyError: - referenced_calls_dict[referenced_address_str] = set() - referenced_calls_dict[referenced_address_str].add(instruction_offset) - ref_call_count += 1 - else: - found = regexes.dissect_code_valid_address.search(instruction) - if found: - referenced_address_str = regexes.hex_number.search(found.group(0)).group(0) - referenced_address_int = int(referenced_address_str, 16) - if self.is_memory_valid(referenced_address_int, discard_invalid_strings): - try: - referenced_strings_dict[referenced_address_str].add(instruction_offset) - except KeyError: - referenced_strings_dict[referenced_address_str] = set() - referenced_strings_dict[referenced_address_str].add(instruction_offset) - ref_str_count += 1 - start_addr = last_disas_addr + referenced_address_str = regexes.hex_number.search(found.group(0)).group(0) + referenced_address_int = int(referenced_address_str, 16) + if self.is_memory_valid(referenced_address_int, discard_invalid_strings): + try: + referenced_strings_dict[referenced_address_str].add(instruction_addr) + except KeyError: + referenced_strings_dict[referenced_address_str] = set() + referenced_strings_dict[referenced_address_str].add(instruction_addr) + ref_str_count += 1 self.memory.close() diff --git a/libpince/utils.py b/libpince/utils.py index def25baa..72d2a6de 100644 --- a/libpince/utils.py +++ b/libpince/utils.py @@ -15,13 +15,18 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . """ -import os, shutil, sys, binascii, pickle, json, traceback, re, pwd, pathlib, distorm3 +import os, shutil, sys, binascii, pickle, json, traceback, re, pwd, pathlib from . import typedefs, regexes +from capstone import Cs, CsError, CS_ARCH_X86, CS_MODE_32, CS_MODE_64 from keystone import Ks, KsError, KS_ARCH_X86, KS_MODE_32, KS_MODE_64 from collections import OrderedDict from importlib.machinery import SourceFileLoader from pygdbmi import gdbmiparser +# Capstone initialization +cs_32 = Cs(CS_ARCH_X86, CS_MODE_32) +cs_64 = Cs(CS_ARCH_X86, CS_MODE_64) + # Keystone initialization ks_32 = Ks(KS_ARCH_X86, KS_MODE_32) ks_64 = Ks(KS_ARCH_X86, KS_MODE_64) @@ -666,15 +671,19 @@ def get_opcodes(address, aob, inferior_arch): None: If there was an error """ if inferior_arch == typedefs.INFERIOR_ARCH.ARCH_64: - disas_option = distorm3.Decode64Bits - elif inferior_arch == typedefs.INFERIOR_ARCH.ARCH_32: - disas_option = distorm3.Decode32Bits + disassembler = cs_64 + else: + disassembler = cs_32 + disassembler.skipdata = True try: bytecode = bytes.fromhex(aob.replace(" ", "")) except ValueError: return - disas_data = distorm3.Decode(address, bytecode, disas_option) - return "; ".join([data[2] for data in disas_data]) + try: + disas_data = disassembler.disasm_lite(bytecode, address) + return "; ".join([f"{data[2]} {data[3]}" if data[3] != "" else data[2] for data in disas_data]) + except CsError as e: + print(e) def assemble(instructions, address, inferior_arch): diff --git a/requirements.txt b/requirements.txt index 8f0c4ee2..12699e07 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ PyQt6==6.6.0 PyQt6-Qt6==6.6.0 pexpect==4.9.0 -distorm3==3.5.2 +capstone==5.0.3 keystone-engine==0.9.2 pygdbmi==0.11.0.0 keyboard==0.13.5