diff --git a/scripts/fitdump b/scripts/fitdump index 5e4a4b4..f8b9ad4 100755 --- a/scripts/fitdump +++ b/scripts/fitdump @@ -37,6 +37,75 @@ def format_message(num, message, options): return "".join(s) +def sort_record_types(record_type: str = '') -> int: + # Header order + record_order = {'file_id': 0, + 'sport': 1, + 'workout': 2, + 'activity': 3, + 'session': 4, + 'lap': 5, + 'device_info': 10, + 'hr_zone': 11, + 'power_zone': 12, + 'record': 20, + 'event': 30, + 'field_description': 40, + 'developer_data_id': 100} + DEFAULT_ORDER = max(record_order.values()) + 1 + try: + return record_order[record_type] + except KeyError: + # Not found, last + return DEFAULT_ORDER + + +def field_header_format(field_data) -> str: + s = field_data.name + if field_data.units: + s += ' [{}]'.format(field_data.units) + return s + + +def field_data_format(messages, message_header): + for message in messages: + # Format this line + s = [''] * len(message_header) + for field_data in message: + s[message_header.index(field_data.name)] = str(field_data.value) + yield ','.join(s) + '\n' + + +def write_csv(options, records): + records = list(records) + # Collect all the message types in the file + record_types = set([message.name for message in records]) + # Two-stage sort, header info, then alphabetic + record_types = list(sorted(sorted(record_types), key=sort_record_types)) + + # Cache for all lines + all_lines = list() + + for write_type in record_types: + # Collect all messages of this type + write_messages = [message for message in records if message.name == write_type] + # Collect the data field information in alphabetic order + field_names_header = sorted(set([field_header_format(field_data) for message in write_messages for field_data in message])) + field_names = sorted(set(field_data.name for message in write_messages for field_data in message)) + # Write the overall header line + all_lines.append(write_type + '\n') + # Write the field names header line + all_lines.append(','.join(field_names_header) + '\n') + # Write the data + all_lines.extend(field_data_format(write_messages, field_names)) + # Write a blank line + all_lines.append('\n\n') + + # Write the actual file + options.output.writelines(all_lines) + pass + + def parse_args(args=None): parser = argparse.ArgumentParser( description='Dump .FIT files to various formats', @@ -48,8 +117,8 @@ def parse_args(args=None): help='File to output data into (defaults to stdout)', ) parser.add_argument( - # TODO: csv - '-t', '--type', choices=('readable', 'json', 'gpx'), default='readable', + '-t', '--type', choices=('readable', 'json', 'csv'), default='readable', + '-t', '--type', choices=('readable', 'json', 'gpx', 'csv'), default='readable', help='File type to output. (DEFAULT: %(default)s)', ) parser.add_argument( @@ -193,9 +262,10 @@ def main(args=None): if options.type == "json": json.dump(records, fp=options.output, cls=RecordJSONEncoder) elif options.type == "readable": - options.output.writelines( - format_message(n, record, options) for n, record in enumerate(records, 1) - ) + options.output.writelines(format_message(n, record, options) + for n, record in enumerate(records, 1)) + elif options.type == 'csv': + write_csv(options, records) elif options.type == "gpx": filename = getattr(options.infile, "name") if filename: @@ -207,6 +277,7 @@ def main(args=None): except IOError: pass + if __name__ == '__main__': try: main() diff --git a/tests/files/rollers.fit b/tests/files/rollers.fit new file mode 100644 index 0000000..61cbde3 Binary files /dev/null and b/tests/files/rollers.fit differ diff --git a/tests/test.py b/tests/test.py index 082e9f6..9d8c3da 100755 --- a/tests/test.py +++ b/tests/test.py @@ -413,6 +413,14 @@ def test_speed(self): avg_speed = list(f.get_messages('session'))[0].get_values().get('avg_speed') self.assertEqual(avg_speed, 5.86) + def test_rollers(self): + f = FitFile(testfile('rollers.fit')) + f.parse() + + def test_unterminated_file(self): + f = FitFile(testfile('nick.fit')) + f.parse() + def test_mismatched_field_size(self): f = FitFile(testfile('coros-pace-2-cycling-misaligned-fields.fit')) with warnings.catch_warnings(record=True) as w: