Skip to content

Commit

Permalink
Merge pull request #80 from rowingdude/dev-1
Browse files Browse the repository at this point in the history
Update version number to 2.1.1
  • Loading branch information
rowingdude authored Aug 12, 2024
2 parents 4aa75a0 + 218fd77 commit bbe09dc
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 156 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

This document lists the changes and version history for the AnalyzeMFT script and component scripts.

## Version 2.1 (2024-08-02)
## Version 2.1.1 (2024-08-02)

### Changes
- Updated to current PEP standards
Expand Down
2 changes: 1 addition & 1 deletion analyzeMFT.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Version 2.1
# Version 2.1.1
#
# Author: Benjamin Cance ([email protected])
#
Expand Down
2 changes: 1 addition & 1 deletion analyzemft/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Version 2.1
# Version 2.1.1
#
# Author: Benjamin Cance ([email protected])
# Copyright Benjamin Cance 2024
Expand Down
291 changes: 141 additions & 150 deletions analyzemft/mft.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Version 2.1
# Version 2.1.1.1
#
# Author: Benjamin Cance ([email protected])
# Copyright Benjamin Cance 2024
Expand All @@ -14,7 +14,26 @@
from argparse import ArgumentParser
from . import mftutils

unicodeHack = True # This one is for me
UNICODE_HACK = True

attribute_handlers = {
0x10: handle_standard_information,
0x20: handle_attribute_list,
0x30: handle_file_name,
0x40: handle_object_id,
0x50: handle_security_descriptor,
0x60: handle_volume_name,
0x70: handle_volume_information,
0x80: handle_data,
0x90: handle_index_root,
0xA0: handle_index_allocation,
0xB0: handle_bitmap,
0xC0: handle_reparse_point,
0xD0: handle_ea_information,
0xE0: handle_ea,
0xF0: handle_property_set,
0x100: handle_logged_utility_stream,
}

def set_default_options() -> ArgumentParser:
parser = ArgumentParser()
Expand Down Expand Up @@ -48,114 +67,24 @@ def parse_record(raw_record: bytes, options: Any) -> Dict[str, Any]:

read_ptr = record['attr_off']

while (read_ptr < 1024):

while read_ptr < 1024:
ATRrecord = decodeATRHeader(raw_record[read_ptr:])
if ATRrecord['type'] == 0xffffffff: # End of attributes
if ATRrecord['type'] == 0xffffffff:
break

if options.debug:
print(f"Attribute type: {ATRrecord['type']:x} Length: {ATRrecord['len']} Res: {ATRrecord['res']:x}")

if ATRrecord['type'] == 0x10: # Standard Information
if options.debug:
print(f"Standard Information:\n++Type: {hex(ATRrecord['type'])} Length: {ATRrecord['len']} Resident: {ATRrecord['res']} Name Len:{ATRrecord['nlen']} Name Offset: {ATRrecord['name_off']}")
SIrecord = decodeSIAttribute(raw_record[read_ptr+ATRrecord['soff']:], options.localtz)
record['si'] = SIrecord
if options.debug:
print(f"++CRTime: {SIrecord['crtime'].dtstr}\n++MTime: {SIrecord['mtime'].dtstr}\n++ATime: {SIrecord['atime'].dtstr}\n++EntryTime: {SIrecord['ctime'].dtstr}")

elif ATRrecord['type'] == 0x20: # Attribute list
if options.debug:
print("Attribute list")
if ATRrecord['res'] == 0:
ALrecord = decodeAttributeList(raw_record[read_ptr+ATRrecord['soff']:], record)
record['al'] = ALrecord
if options.debug:
print(f"Name: {ALrecord['name']}")
else:
if options.debug:
print("Non-resident Attribute List?")
record['al'] = None


elif ATRrecord['type'] == 0x30: # File name
if options.debug:
print("File name record")
FNrecord = decodeFNAttribute(raw_record[read_ptr+ATRrecord['soff']:], options.localtz, record)
record[('fn', record['fncnt'])] = FNrecord
if options.debug:
print(f"Name: {FNrecord['name']} ({record['fncnt']})")
record['fncnt'] += 1
if FNrecord['crtime'] != 0:
if options.debug:
print(f"\tCRTime: {FNrecord['crtime'].dtstr} MTime: {FNrecord['mtime'].dtstr} ATime: {FNrecord['atime'].dtstr} EntryTime: {FNrecord['ctime'].dtstr}")

elif ATRrecord['type'] == 0x40: # Object ID
ObjectIDRecord = decodeObjectID(raw_record[read_ptr+ATRrecord['soff']:])
record['objid'] = ObjectIDRecord
if options.debug: print (f"Object ID")

elif ATRrecord['type'] == 0x50: # Security descriptor
record['sd'] = True
if options.debug: print (f"Security descriptor")

elif ATRrecord['type'] == 0x60: # Volume name
record['volname'] = True
if options.debug: print (f"Volume name")

elif ATRrecord['type'] == 0x70: # Volume information
if options.debug: print (f"Volume info attribute")
VolumeInfoRecord = decodeVolumeInfo(raw_record[read_ptr+ATRrecord['soff']:],options)
record['volinfo'] = VolumeInfoRecord

elif ATRrecord['type'] == 0x80: # Data
record['data'] = True
if options.debug: print (f"Data attribute")

elif ATRrecord['type'] == 0x90: # Index root
record['indexroot'] = True
if options.debug: print (f"Index root")

elif ATRrecord['type'] == 0xA0: # Index allocation
record['indexallocation'] = True
if options.debug: print (f"Index allocation")

elif ATRrecord['type'] == 0xB0: # Bitmap
record['bitmap'] = True
if options.debug: print (f"Bitmap")

elif ATRrecord['type'] == 0xC0: # Reparse point
record['reparsepoint'] = True
if options.debug: print (f"Reparse point")

elif ATRrecord['type'] == 0xD0: # EA Information
record['eainfo'] = True
if options.debug: print (f"EA Information")

elif ATRrecord['type'] == 0xE0: # EA
record['ea'] = True
if options.debug: print (f"EA")

elif ATRrecord['type'] == 0xF0: # Property set
record['propertyset'] = True
if options.debug: print (f"Property set")

elif ATRrecord['type'] == 0x100: # Logged utility stream
record['loggedutility'] = True
if options.debug: print (f"Logged utility stream")

else:
if options.debug: print (f"Found an unknown attribute")
handler = attribute_handlers.get(ATRrecord['type'], handle_unknown_attribute)
handler(ATRrecord, raw_record[read_ptr:], record, options)

if ATRrecord['len'] > 0:
read_ptr = read_ptr + ATRrecord['len']
read_ptr += ATRrecord['len']
else:
if options.debug: print (f"ATRrecord->len < 0, exiting loop")
if options.debug:
print("ATRrecord->len <= 0, exiting loop")
break

return record


def mft_to_csv(record: Dict[str, Any], ret_header: bool) -> List[str]:
if ret_header:
Expand Down Expand Up @@ -314,9 +243,6 @@ def mft_to_body(record, full, std):

return (rec_bodyfile)

# l2t CSV output support
# date,time,timezone,MACB,source,sourcetype,type,user,host,short,desc,version,filename,inode,notes,format,extra
# http://code.google.com/p/log2timeline/wiki/l2t_csv

def mft_to_l2t(record):
' Return a MFT record in l2t CSV output format'
Expand Down Expand Up @@ -407,15 +333,6 @@ def decodeMFTmagic(record: Dict[str, Any]) -> str:
}
return magic_values.get(record['magic'], 'Unknown')

# decodeMFTisactive and decodeMFTrecordtype both look at the flags field in the MFT header.
# The first bit indicates if the record is active or inactive. The second bit indicates if it
# is a file or a folder.
#
# I had this coded incorrectly initially. Spencer Lynch identified and fixed the code. Many thanks!
#
# 02-August-2024 - These are now updated to current Python syntax
#

def decodeMFTisactive(record: Dict[str, Any]) -> str:
return 'Active' if record['flags'] & 0x0001 else 'Inactive'

Expand Down Expand Up @@ -493,23 +410,7 @@ def decodeFNAttribute(s, localtz, record):
d['nlen'] = struct.unpack("B",s[64])[0]
d['nspace'] = struct.unpack("B",s[65])[0]

# The $MFT string is stored as \x24\x00\x4D\x00\x46\x00\x54. Ie, the first character is a single
# byte and the remaining characters are two bytes with the first byte a null.
# Note: Actually, it can be stored in several ways and the nspace field tells me which way.
#
# I found the following:
#
# NTFS allows any sequence of 16-bit values for name encoding (file names, stream names, index names,
# etc.). This means UTF-16 codepoints are supported, but the file system does not check whether a
# sequence is valid UTF-16 (it allows any sequence of short values, not restricted to those in the
# Unicode standard).
#
# If true, lovely. But that would explain what I am seeing.
#
# I just ran across an example of "any sequence of ..." - filenames with backspaces and newlines
# in them. Thus, the "isalpha" check. I really need to figure out how to handle Unicode better.

if (unicodeHack):
if UNICODE_HACK:
d['name'] = ''
for i in range(66, 66 + d['nlen']*2):
if s[i] != '\x00': # Just skip over nulls
Expand All @@ -519,23 +420,9 @@ def decodeFNAttribute(s, localtz, record):
d['name'] = "%s0x%02s" % (d['name'], s[i].encode("hex"))
hexFlag = True

# This statement produces a valid unicode string, I just cannot get it to print correctly
# so I'm temporarily hacking it with the if (unicodeHack) above.
else:
d['name'] = s[66:66+d['nlen']*2]

# This didn't work
# d['name'] = struct.pack("\u
# for i in range(0, d['nlen']*2, 2):
# d['name']=d['name'] + struct.unpack("<H",s[66+i:66+i+1])

# What follows is ugly. I'm trying to deal with the filename in Unicode and not doing well.
# This solution works, though it is printing nulls between the characters. It'll do for now.
# d['name'] = struct.unpack("<%dH" % (int(d['nlen'])*2),s[66:66+(d['nlen']*2)])
# d['name'] = s[66:66+(d['nlen']*2)]
# d['decname'] = unicodedata.normalize('NFKD', d['name']).encode('ASCII','ignore')
# d['decname'] = unicode(d['name'],'iso-8859-1','ignore')

if hexFlag:
add_note(record, 'Filename - chars converted to hex')

Expand All @@ -554,7 +441,7 @@ def decodeAttributeList(s, record):
d['file_ref'] = struct.unpack("<Lxx",s[16:22])[0] # 6
d['seq'] = struct.unpack("<H",s[22:24])[0] # 2
d['id'] = struct.unpack("<H",s[24:26])[0] # 4
if (unicodeHack):
if (UNICODE_HACK):
d['name'] = ''
for i in range(26, 26 + d['nlen']*2):
if s[i] != '\x00': # Just skip over nulls
Expand All @@ -580,13 +467,13 @@ def decodeVolumeInfo(s,options):
d['flags'] = struct.unpack("<H",s[10:12])[0] # 2
d['f2'] = struct.unpack("<I",s[12:16])[0] # 4

if (options.debug):
print (f"+Volume Info")
print (f"++F1%d" % d['f1'])
print (f"++Major Version: %d" % d['maj_ver'])
print (f"++Minor Version: %d" % d['min_ver'])
print (f"++Flags: %d" % d['flags'])
print (f"++F2: %d" % d['f2'])
if options.debug:
print(f"+Volume Info")
print(f"++F1%d" % d['f1'])
print(f"++Major Version: %d" % d['maj_ver'])
print(f"++Minor Version: %d" % d['min_ver'])
print(f"++Flags: %d" % d['flags'])
print(f"++F2: %d" % d['f2'])

return d

Expand All @@ -604,3 +491,107 @@ def ObjectID(s: bytes) -> str:
if s == b'\x00' * 16:
return 'Undefined'
return f"{s[:4].hex()}-{s[4:6].hex()}-{s[6:8].hex()}-{s[8:10].hex()}-{s[10:16].hex()}"

def handle_standard_information(ATRrecord, raw_record, record, options):
if options.debug:
print(f"Standard Information:\n++Type: {hex(ATRrecord['type'])} Length: {ATRrecord['len']} Resident: {ATRrecord['res']} Name Len: {ATRrecord['nlen']} Name Offset: {ATRrecord['name_off']}")
SIrecord = decodeSIAttribute(raw_record[ATRrecord['soff']:], options.localtz)
record['si'] = SIrecord
if options.debug:
print(f"++CRTime: {SIrecord['crtime'].dtstr}\n++MTime: {SIrecord['mtime'].dtstr}\n++ATime: {SIrecord['atime'].dtstr}\n++EntryTime: {SIrecord['ctime'].dtstr}")

def handle_attribute_list(ATRrecord, raw_record, record, options):
if options.debug:
print("Attribute list")
if ATRrecord['res'] == 0:
ALrecord = decodeAttributeList(raw_record[ATRrecord['soff']:], record)
record['al'] = ALrecord
if options.debug:
print(f"Name: {ALrecord['name']}")
else:
if options.debug:
print("Non-resident Attribute List?")
record['al'] = None

def handle_file_name(ATRrecord, raw_record, record, options):
if options.debug:
print("File name record")
FNrecord = decodeFNAttribute(raw_record[ATRrecord['soff']:], options.localtz, record)
record[('fn', record['fncnt'])] = FNrecord
if options.debug:
print(f"Name: {FNrecord['name']} ({record['fncnt']})")
record['fncnt'] += 1
if FNrecord['crtime'] != 0:
if options.debug:
print(f"\tCRTime: {FNrecord['crtime'].dtstr} MTime: {FNrecord['mtime'].dtstr} ATime: {FNrecord['atime'].dtstr} EntryTime: {FNrecord['ctime'].dtstr}")

def handle_object_id(ATRrecord, raw_record, record, options):
ObjectIDRecord = decodeObjectID(raw_record[ATRrecord['soff']:])
record['objid'] = ObjectIDRecord
if options.debug:
print("Object ID")

def handle_security_descriptor(ATRrecord, raw_record, record, options):
record['sd'] = True
if options.debug:
print("Security descriptor")

def handle_volume_name(ATRrecord, raw_record, record, options):
record['volname'] = True
if options.debug:
print("Volume name")

def handle_volume_information(ATRrecord, raw_record, record, options):
if options.debug:
print("Volume info attribute")
VolumeInfoRecord = decodeVolumeInfo(raw_record[ATRrecord['soff']:], options)
record['volinfo'] = VolumeInfoRecord

def handle_data(ATRrecord, raw_record, record, options):
record['data'] = True
if options.debug:
print("Data attribute")

def handle_index_root(ATRrecord, raw_record, record, options):
record['indexroot'] = True
if options.debug:
print("Index root")

def handle_index_allocation(ATRrecord, raw_record, record, options):
record['indexallocation'] = True
if options.debug:
print("Index allocation")

def handle_bitmap(ATRrecord, raw_record, record, options):
record['bitmap'] = True
if options.debug:
print("Bitmap")

def handle_reparse_point(ATRrecord, raw_record, record, options):
record['reparsepoint'] = True
if options.debug:
print("Reparse point")

def handle_ea_information(ATRrecord, raw_record, record, options):
record['eainfo'] = True
if options.debug:
print("EA Information")

def handle_ea(ATRrecord, raw_record, record, options):
record['ea'] = True
if options.debug:
print("EA")

def handle_property_set(ATRrecord, raw_record, record, options):
record['propertyset'] = True
if options.debug:
print("Property set")

def handle_logged_utility_stream(ATRrecord, raw_record, record, options):
record['loggedutility'] = True
if options.debug:
print("Logged utility stream")

def handle_unknown_attribute(ATRrecord, raw_record, record, options):
if options.debug:
print(f"Found an unknown attribute type: {ATRrecord['type']:x}")
Loading

0 comments on commit bbe09dc

Please sign in to comment.