Skip to content

Commit

Permalink
MDF scramble static method: scramble text blocks and keep original fi…
Browse files Browse the repository at this point in the history
…le structure (useful for privacy concerns when sending mdf files for debug purpose)
  • Loading branch information
danielhrisca committed Jul 22, 2018
1 parent e590f00 commit 284cbc4
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 11 deletions.
4 changes: 4 additions & 0 deletions ISSUE_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ except ImportError:
_please write here the error traceback_

# Description

The fastest way to debug is to have the original file. For data protection you can use the static
method _scramble_ to scramble all text blocks, and send the scrambled file by e-mail.

_Please describe the issue here._
142 changes: 142 additions & 0 deletions asammdf/mdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@
MDF3_VERSIONS,
MDF4_VERSIONS,
SUPPORTED_VERSIONS,
UINT64,
randomized_string,
)
from .v2_v3_blocks import Channel as ChannelV3
from .v2_v3_blocks import HeaderBlock as HeaderV3
from .v4_blocks import SourceInformation
from .v4_blocks import ChannelConversion as ChannelConversionV4
from .v4_blocks import Channel as ChannelV4
from .v4_blocks import HeaderBlock as HeaderV4
from .v4_blocks import TextBlock as TextBlockV4
from .v4_blocks import ChannelArrayBlock, EventBlock
from . import v4_constants as v4c

Expand Down Expand Up @@ -2599,6 +2604,143 @@ def whereis(self, channel):
else:
return tuple()

@staticmethod
def scramble(name, memory='low'):
""" scramble text blocks and keep original file strcuture
Parameters
----------
name : str
file name
memory : str
memory option; default *'low'*
"""

memory = validate_memory_argument(memory)
mdf = MDF(name, memory=memory)
texts = {}

if mdf.version >= '4.00':
Channel = ChannelV4
ChannelConversion = ChannelConversionV4
TextBlock = TextBlockV4

stream = mdf._file

if mdf.header['comment_addr']:
stream.seek(mdf.header['comment_addr'] + 8)
size = UINT64(stream.read(8))[0] - 24
texts[mdf.header['comment_addr']] = randomized_string(size)

for fh in mdf.file_history:
addr = fh['comment_addr']
if addr and addr not in texts:
stream.seek(addr + 8)
size = UINT64(stream.read(8))[0] - 24
texts[addr] = randomized_string(size)

for ev in mdf.events:
for addr in (ev['comment_addr'], ev['name_addr']):
if addr and addr not in texts:
stream.seek(addr + 8)
size = UINT64(stream.read(8))[0] - 24
texts[addr] = randomized_string(size)

for gp in mdf.groups:

addr = gp['data_group']['comment_addr']
if addr and addr not in texts:
stream.seek(addr + 8)
size = UINT64(stream.read(8))[0] - 24
texts[addr] = randomized_string(size)

cg = gp['channel_group']
for addr in (
cg['acq_name_addr'],
cg['comment_addr']):
if cg['flags'] & v4c.FLAG_CG_BUS_EVENT:
continue

if addr and addr not in texts:
stream.seek(addr + 8)
size = UINT64(stream.read(8))[0] - 24
texts[addr] = randomized_string(size)

source = cg['acq_source_addr']
if source:
source = SourceInformation(address=source, stream=stream)
for addr in (
source['name_addr'],
source['path_addr'],
source['comment_addr']):
if addr and addr not in texts:
stream.seek(addr + 8)
size = UINT64(stream.read(8))[0] - 24
texts[addr] = randomized_string(size)

for ch in gp['channels']:
if mdf.memory == 'minimum':
ch = Channel(address=ch, stream=stream,
load_metadata=False)

for addr in (
ch['name_addr'],
ch['unit_addr'],
ch['comment_addr']):
if addr and addr not in texts:
stream.seek(addr + 8)
size = UINT64(stream.read(8))[0] - 24
texts[addr] = randomized_string(size)

source = ch['source_addr']
if source:
source = SourceInformation(address=source, stream=stream)
for addr in (
source['name_addr'],
source['path_addr'],
source['comment_addr']):
if addr and addr not in texts:
stream.seek(addr + 8)
size = UINT64(stream.read(8))[0] - 24
texts[addr] = randomized_string(size)

conv = ch['conversion_addr']
if conv:
conv = ChannelConversion(address=conv, stream=stream)
for addr in (
conv['name_addr'],
conv['unit_addr'],
conv['comment_addr']):
if addr and addr not in texts:
stream.seek(addr + 8)
size = UINT64(stream.read(8))[0] - 24
texts[addr] = randomized_string(size)
if conv['conversion_type'] == v4c.CONVERSION_TYPE_ALG:
addr = conv['formula_addr']
if addr and addr not in texts:
stream.seek(addr + 8)
size = UINT64(stream.read(8))[0] - 24
texts[addr] = randomized_string(size)

for key, block in conv.referenced_blocks.items():
if block:
if block['id'] == b'##TX':
addr = block.address
if addr not in texts:
stream.seek(addr + 8)
size = block['block_len'] - 24
texts[addr] = randomized_string(size)
mdf.close()

with open(name, 'rb+') as mdf:
for addr, bts in texts.items():
mdf.seek(addr + 24)
mdf.write(bts)

else:
raise NotImplementedError()


if __name__ == '__main__':
pass
1 change: 1 addition & 0 deletions asammdf/mdf_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from .signal import Signal
from .conversion_utils import conversion_transfer
from .utils import (
UINT64,
CHANNEL_COUNT,
CONVERT_LOW,
CONVERT_MINIMUM,
Expand Down
44 changes: 33 additions & 11 deletions asammdf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import xml.etree.ElementTree as ET

from collections import namedtuple
from random import randint
from struct import Struct
from warnings import warn

Expand Down Expand Up @@ -122,17 +123,18 @@ class MdfException(Exception):
pass


# pylint: disable=W0622
def bytes(obj):
""" Python 2 compatibility function """
try:
return obj.__bytes__()
except AttributeError:
if isinstance(obj, str):
return obj
else:
raise
# pylint: enable=W0622
if PYVERSION < 3:
# pylint: disable=W0622
def bytes(obj):
""" Python 2 compatibility function """
try:
return obj.__bytes__()
except AttributeError:
if isinstance(obj, str):
return obj
else:
raise
# pylint: enable=W0622


def extract_cncomment_xml(comment):
Expand Down Expand Up @@ -916,3 +918,23 @@ def add(self, channel_name, group_index, channel_index):
self[channel_name].append(
entry
)


def randomized_string(size):
""" get a \0 terminated string of size length
Parameters
----------
size : int
target string length
Returns
-------
string : bytes
randomized string
"""
if PYVERSION >= 3:
return bytes(randint(65, 90) for _ in range(size - 1)) + b'\0'
else:
return ''.join(chr(randint(65, 90)) for _ in range(size - 1)) + '\0'

0 comments on commit 284cbc4

Please sign in to comment.