diff --git a/THIRD-PARTY-NOTICES.txt b/THIRD-PARTY-NOTICES.txt index 2dac54eeb6..01d43433b3 100644 --- a/THIRD-PARTY-NOTICES.txt +++ b/THIRD-PARTY-NOTICES.txt @@ -242,3 +242,23 @@ License Notice for the following: of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS + + +6) License Notice for the csvsort Library +----------------------------- + +Copyright (c) 2018 Richard Penman + +This library is free software; you can redistribute it and/or modify it +under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or (at +your option) any later version. + +This library is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this library; if not, write to the Free Software Foundation, +Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA diff --git a/src/python/genny/__init__.py b/src/python/genny/__init__.py index 0b40b31d23..00b366bf58 100644 --- a/src/python/genny/__init__.py +++ b/src/python/genny/__init__.py @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/src/python/genny/cedar.py b/src/python/genny/cedar.py new file mode 100644 index 0000000000..6967d17eb5 --- /dev/null +++ b/src/python/genny/cedar.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +# +# Copyright 2019-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import csv +import sys +from collections import OrderedDict, defaultdict +from datetime import datetime +from os.path import join as pjoin + +from bson import BSON + +from genny.csv2 import CSV2, IntermediateCSVColumns +from third_party.csvsort import csvsort + +""" +Convert raw genny csv output to a format expected by Cedar +(a.k.a. MongoDB Evergreen Expanded Metrics) + +Sample input: +``` +Clocks +SystemTime,100000000000000000 +MetricsTime,86491166632088 + +OperationThreadCounts +InsertRemove,Insert,5 + +Operations +timestamp , actor , thread, operation, duration, outcome, n, ops, errors, size +86491166632088, InsertRemove, 0 , Insert , 100 , 0 , 1, 6 , 2 , 40 +86491166632403, InsertRemove, 0 , Insert , 310 , 0 , 1, 9 , 3 , 60 +86491166632661, InsertRemove, 0 , Insert , 180 , 0 , 1, 8 , 6 , 20 +86491166632088, InsertRemove, 1 , Insert , 280 , 1 , 1, 3 , 1 , 20 +86491166632316, InsertRemove, 1 , Insert , 190 , 1 , 1, 4 , 1 , 30 +86491166632088, InsertRemove, 2 , Insert , 30 , 0 , 1, 4 , 1 , 30 +86491166632245, InsertRemove, 2 , Insert , 80 , 1 , 1, 8 , 7 , 10 +86491166632088, InsertRemove, 3 , Insert , 110 , 0 , 1, 7 , 2 , 50 +86491166632088, InsertRemove, 4 , Insert , 40 , 0 , 1, 9 , 0 , 90 +``` + +Sample output: +``` +{ts:TS(0),id:0,counters:{n:1,ops:6,size:40,errors:2},timers:{duration:100,total:100},gauges:{workers:5}} +{ts:TS(0),id:1,counters:{n:2,ops:9,size:60,errors:3},timers:{duration:380,total:380},gauges:{workers:5}} +{ts:TS(0),id:2,counters:{n:3,ops:13,size:90,errors:4},timers:{duration:410,total:410},gauges:{workers:5}} +{ts:TS(0),id:3,counters:{n:4,ops:20,size:140,errors:6},timers:{duration:520,total:520},gauges:{workers:5}} +{ts:TS(0),id:4,counters:{n:5,ops:29,size:230,errors:6},timers:{duration:560,total:560},gauges:{workers:5}} +{ts:TS(157),id:2,counters:{n:6,ops:37,size:240,errors:13},timers:{duration:640,total:717},gauges:{workers:5}} +{ts:TS(228),id:1,counters:{n:7,ops:41,size:270,errors:14},timers:{duration:830,total:945},gauges:{workers:5}} +{ts:TS(315),id:0,counters:{n:8,ops:50,size:330,errors:17},timers:{duration:1140,total:1260},gauges:{workers:5}} +{ts:TS(573),id:0,counters:{n:9,ops:58,size:350,errors:23},timers:{duration:1320,total:1518},gauges:{workers:5}} +``` +""" + + +class IntermediateCSVReader: + """ + Class that reads IntermediateCSVColumns and outputs Cedar BSON + """ + + def __init__(self, reader): + self.raw_reader = reader + # dict(operation -> dict(metric_column -> cumulative_value)) + self.cumulatives_for_op = defaultdict(lambda: defaultdict(int)) + + # dict(operation -> total_duration) + self.total_for_op = defaultdict(int) + + # dict(thread -> prev_op_ts)) + self.prev_ts_for_thread = {} + + def __iter__(self): + return self + + def __next__(self): + return self._parse_into_cedar_format(next(self.raw_reader)) + + def _compute_cumulatives(self, line): + op = line[IntermediateCSVColumns.OPERATION] + + n = IntermediateCSVColumns.N + ops = IntermediateCSVColumns.OPS + size = IntermediateCSVColumns.SIZE + err = IntermediateCSVColumns.ERRORS + dur = IntermediateCSVColumns.DURATION + for col in [n, ops, size, err, dur]: + self.cumulatives_for_op[op][col] += line[col] + + ts_col = IntermediateCSVColumns.TS + thread = line[IntermediateCSVColumns.THREAD] + + # total_duration is duration for the first operation on each thread + # because we don't track when each thread starts. + if thread in self.prev_ts_for_thread: + cur_total = line[ts_col] - self.prev_ts_for_thread[thread] + else: + cur_total = line[dur] + + self.total_for_op[op] += cur_total + self.prev_ts_for_thread[thread] = line[ts_col] + + def _parse_into_cedar_format(self, line): + # Compute all cumulative values for simplicity; Not all values are used. + self._compute_cumulatives(line) + + # milliseconds to seconds to datetime.datetime() + ts = datetime.utcfromtimestamp(line[IntermediateCSVColumns.UNIX_TIME] / 1000) + op = line[IntermediateCSVColumns.OPERATION] + + res = OrderedDict([ + ('ts', ts), + ('id', int(line[IntermediateCSVColumns.THREAD])), + ('counters', OrderedDict([ + ('n', int(self.cumulatives_for_op[op][IntermediateCSVColumns.N])), + ('ops', int(self.cumulatives_for_op[op][IntermediateCSVColumns.OPS])), + ('size', int(self.cumulatives_for_op[op][IntermediateCSVColumns.SIZE])), + ('errors', int(self.cumulatives_for_op[op][IntermediateCSVColumns.ERRORS])) + ])), + ('timers', OrderedDict([ + ('duration', int(self.cumulatives_for_op[op][IntermediateCSVColumns.DURATION])), + ('total', int(self.total_for_op[op])) + ])), + ('gauges', OrderedDict([ + ('workers', int(line[IntermediateCSVColumns.WORKERS])) + ])) + ]) + + return res, op + + +def split_into_actor_operation_and_transform_to_cedar_format(file_name, out_dir): + """ + Split up the IntermediateCSV format input into one or more BSON files in Cedar + format; one file for each (actor, operation) pair. + """ + + # remove the ".csv" suffix to get the actor name. + actor_name = file_name[:-4] + + out_files = {} + with open(pjoin(out_dir, file_name)) as in_f: + reader = csv.reader(in_f, quoting=csv.QUOTE_NONNUMERIC) + next(reader) # Ignore the header row. + try: + for ordered_dict, op in IntermediateCSVReader(reader): + out_file_name = pjoin(out_dir, "{}-{}.bson".format(actor_name, op)) + if out_file_name not in out_files: + out_files[out_file_name] = open(out_file_name, 'wb') + out_files[out_file_name].write(BSON.encode(ordered_dict)) + finally: + for fh in out_files.values(): + fh.close() + + +def split_into_actor_csv_files(data_reader, out_dir): + """ + Split up the monolithic genny metrics csv2 file into smaller [actor].csv files + """ + cur_out_csv = None + cur_out_fh = None + cur_actor = (None, None) + output_files = [] + + for line, actor in data_reader: + if actor != cur_actor: + cur_actor = actor + + # Close out old file. + if cur_out_fh: + cur_out_fh.close() + + # Open new csv file. + file_name = actor + '.csv' + output_files.append(file_name) + cur_out_fh = open(pjoin(out_dir, file_name), 'w', newline='') + + # Quote non-numeric values so they get converted to float automatically + cur_out_csv = csv.writer(cur_out_fh, quoting=csv.QUOTE_NONNUMERIC) + cur_out_csv.writerow(IntermediateCSVColumns.default_columns()) + + cur_out_csv.writerow(line) + + cur_out_fh.close() + + return output_files + + +def sort_csv_file(file_name, out_dir): + # Sort on Timestamp and Thread. + csvsort(pjoin(out_dir, file_name), + [IntermediateCSVColumns.TS, IntermediateCSVColumns.THREAD], + quoting=csv.QUOTE_NONNUMERIC, + has_header=True, + show_progress=True) + + +def parse_args(argv): + parser = argparse.ArgumentParser( + description='Convert Genny csv2 perf data to Cedar BSON format', + ) + parser.add_argument('input_file', metavar='input-file', help='path to genny csv2 perf data') + parser.add_argument('output_dir', metavar='output-dir', + help='directory to store output BSON files') + + return parser.parse_args(argv) + + +def main__cedar(argv=sys.argv[1:]): + args = parse_args(argv) + out_dir = args.output_dir + + # Read CSV2 file + my_csv2 = CSV2(args.input_file) + + with my_csv2.data_reader() as data_reader: + # Separate into actor-operation + files = split_into_actor_csv_files(data_reader, out_dir) + + for f in files: + # csvsort by timestamp, thread + sort_csv_file(f, out_dir) + + # compute cumulative and stream output to bson file + split_into_actor_operation_and_transform_to_cedar_format(f, out_dir) diff --git a/src/python/genny/csv2.py b/src/python/genny/csv2.py new file mode 100644 index 0000000000..7fa3e3f0b9 --- /dev/null +++ b/src/python/genny/csv2.py @@ -0,0 +1,262 @@ +# Copyright 2019-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import contextlib +import csv + + +class _Dialect(csv.unix_dialect): + strict = True + skipinitialspace = True + quoting = csv.QUOTE_MINIMAL # Expect no quotes in csv2. + # We don't allow commas in actor names at the moment, so no need + # to escape the delimiter. + # escapechar = '\\' + + +class CSV2ParsingError(BaseException): + pass + + +class CSVColumns(object): + """ + Object oriented access to csv header/column mapping. + """ + _COLUMNS = None + + @classmethod + def add_columns(cls, columns): + for i in range(len(columns)): + cls.add_column(columns[i], i) + + @classmethod + def add_column(cls, col_name, col_index): + upper_col_name = col_name.upper() # Python class constants should be uppercase. + + if not hasattr(cls, upper_col_name): + raise CSV2ParsingError('%s doesn\'t have column %s', cls.__name__, col_name) + + if not isinstance(cls._COLUMNS, set): + raise ValueError('Subclass must have the class property `_COLUMN = set()`') + + setattr(cls, upper_col_name, col_index) + cls._COLUMNS.add(upper_col_name) + + @classmethod + def validate(cls, expected_cols_set): + val = cls._COLUMNS == expected_cols_set + return val + + +class _DataReader: + """ + Thin wrapper around csv.DictReader() that eagerly reads the list of operations + from csv2 and converts any digits to native Python integers and massages + output into IntermediateCSV format. + """ + + def __init__(self, csv_reader_at_op, thread_count_map, ts_offset): + """ + :param csv_reader_at_op: A CSV reader with its cursor on the first operation line. + :param thread_count_map: + :param ts_offset: + """ + self.raw_reader = csv_reader_at_op + self.tc_map = thread_count_map + self.unix_time_offset = ts_offset + + def __iter__(self): + return self + + def __next__(self): + return self._parse_into_intermediate_csv(next(self.raw_reader)) + + def _parse_into_intermediate_csv(self, line): + for i in range(len(line)): + + # Strip spaces from lines + line[i] = line[i].strip() + + # Convert string digits to ints + if line[i].isdigit(): + line[i] = int(line[i]) + + # Eagerly error if OUTCOME is > 1 + if line[_OpColumns.OUTCOME] > 1: + raise CSV2ParsingError('Unexpected outcome on line %d: %s', self.raw_reader.line_num, + line) + + op = line[_OpColumns.OPERATION] + actor = line[_OpColumns.ACTOR] + thread = line[_OpColumns.THREAD] + ts = line[_OpColumns.TIMESTAMP] + duration = line[_OpColumns.DURATION] + # Convert timestamp from ns to ms and add offset. + unix_time = (ts + self.unix_time_offset) / (1000 * 1000) + + # Transform output into IntermediateCSV format. + out = [None for _ in range(len(IntermediateCSVColumns.default_columns()))] + out[IntermediateCSVColumns.UNIX_TIME] = unix_time + out[IntermediateCSVColumns.TS] = ts + out[IntermediateCSVColumns.THREAD] = thread + out[IntermediateCSVColumns.OPERATION] = op + out[IntermediateCSVColumns.DURATION] = duration + out[IntermediateCSVColumns.OUTCOME] = line[_OpColumns.OUTCOME] + out[IntermediateCSVColumns.N] = line[_OpColumns.N] + out[IntermediateCSVColumns.OPS] = line[_OpColumns.OPS] + out[IntermediateCSVColumns.ERRORS] = line[_OpColumns.ERRORS] + out[IntermediateCSVColumns.SIZE] = line[_OpColumns.SIZE] + out[IntermediateCSVColumns.WORKERS] = self.tc_map[(actor, op)] + + return out, actor + + +class _OpColumns(CSVColumns): + _COLUMNS = set() + + TIMESTAMP = None + ACTOR = None + THREAD = None + OPERATION = None + DURATION = None + OUTCOME = None + N = None + OPS = None + ERRORS = None + SIZE = None + + +class _ClockColumns(CSVColumns): + _COLUMNS = set() + + CLOCK = None + NANOSECONDS = None + + +# OperationThreadCount columns. +class _TCColumns(CSVColumns): + _COLUMNS = set() + + ACTOR = None + OPERATION = None + WORKERS = None + + +class CSV2: + """ + Thin object wrapper around a genny metrics' "csv2" file with metadata stored + as native Python variables. + """ + + def __init__(self, csv2_file_name): + # The number to add to the metrics timestamp (a.k.a. c++ system_time) to get + # the UNIX time. + self._unix_epoch_offset_ns = None + + # Map of (actor, operation) to thread count. + self._operation_thread_count_map = {} + + # Store a reader that starts at the first line of actual csv data after the headers + # have been parsed. + self._data_reader = None + + # file handle to raw CSV file; lifecycle is shared with this CSV2 object. + self._csv2_file_name = csv2_file_name + + @contextlib.contextmanager + def data_reader(self): + # parsers for newline-delimited sections in genny's csv2 file. + header_parsers = { + 'Clocks': self._parse_clocks, + 'OperationThreadCounts': self._parse_thread_count, + 'Operations': self._parse_operations + } + + with open(self._csv2_file_name, 'r') as csv2_file: + try: + reader = csv.reader(csv2_file, dialect=_Dialect) + while True: + title = next(reader)[0] + if title not in header_parsers: + raise CSV2ParsingError('Unknown csv2 section title %s', title) + should_stop = header_parsers[title](reader) + if should_stop: + break + except (IndexError, ValueError) as e: + raise CSV2ParsingError('Error parsing CSV file: ', self._csv2_file_name) from e + + yield self._data_reader + + def _parse_clocks(self, reader): + _ClockColumns.add_columns([header.strip() for header in next(reader)]) + + line = next(reader) + + times = { + 'SystemTime': None, + 'MetricsTime': None + } + + while line: + times[line[_ClockColumns.CLOCK]] = int(line[_ClockColumns.NANOSECONDS]) + line = next(reader) + + self._unix_epoch_offset_ns = times['SystemTime'] - times['MetricsTime'] + + return False + + def _parse_thread_count(self, reader): + _TCColumns.add_columns([h.strip() for h in next(reader)]) + + line = next(reader) + while line: + actor = line[0] + op = line[1] + thread_count = int(line[2]) + self._operation_thread_count_map[(actor, op)] = thread_count + line = next(reader) + + return False + + def _parse_operations(self, reader): + _OpColumns.add_columns([h.strip() for h in next(reader)]) + self._data_reader = _DataReader(reader, self._operation_thread_count_map, + self._unix_epoch_offset_ns) + + return True + + +class IntermediateCSVColumns(CSVColumns): + _COLUMNS = set() + + # Declare an explicit default ordering here since this script is writing the intermediate CSV. + UNIX_TIME = 0 + TS = 1 + THREAD = 2 + OPERATION = 3 + DURATION = 4 + OUTCOME = 5 + N = 6 + OPS = 7 + ERRORS = 8 + SIZE = 9 + WORKERS = 10 + + @classmethod + def default_columns(cls): + """ + Ordered list of default columns to write to the CSV, must match the column names in + the class attributes. + """ + return ['unix_time', 'ts', 'thread', 'operation', 'duration', 'outcome', 'n', + 'ops', 'errors', 'size', 'workers'] diff --git a/src/python/genny/metrics_output_parser.py b/src/python/genny/metrics_output_parser.py index 6e2b40a49b..006cf3dab5 100644 --- a/src/python/genny/metrics_output_parser.py +++ b/src/python/genny/metrics_output_parser.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python +# # Copyright 2019-present MongoDB Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,9 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -#!/usr/bin/env python import json + """ Genny's CSV-ish metrics output looks like the following: @@ -169,7 +171,7 @@ def _system_time(self, metrics_time, file_name, line_number): return metrics_time + self.clock_delta if 'Clocks' not in self.sections: - msg = "Can only call _system_time after we've seen the Clocks section. " +\ + msg = "Can only call _system_time after we've seen the Clocks section. " + \ "We've seen sections {}".format(set(self.sections.keys())) raise ParseError(msg, file_name, line_number) diff --git a/src/python/setup.py b/src/python/setup.py index 2976503ffa..68b0e00bd5 100644 --- a/src/python/setup.py +++ b/src/python/setup.py @@ -20,6 +20,7 @@ install_requires=[ 'nose==1.3.7', 'yapf==0.24.0', + 'pymongo==3.7.2' ], setup_requires=[ 'nose==1.3.7' @@ -28,6 +29,7 @@ 'console_scripts': [ 'genny-metrics-summarize = genny.metrics_output_parser:main__sumarize', 'genny-metrics-to-perf-json = genny.perf_json:main__summarize_translate', + 'genny-metrics-to-cedar = genny.metrics.main__cedar' ] }, ) diff --git a/src/python/tests/__init__.py b/src/python/tests/__init__.py index 0b40b31d23..00b366bf58 100644 --- a/src/python/tests/__init__.py +++ b/src/python/tests/__init__.py @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/src/python/tests/cedar_test.py b/src/python/tests/cedar_test.py new file mode 100644 index 0000000000..6133b6acb1 --- /dev/null +++ b/src/python/tests/cedar_test.py @@ -0,0 +1,244 @@ +# Copyright 2019-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import csv +import os +import tempfile +import unittest +from collections import OrderedDict +from datetime import datetime +from os.path import join as pjoin + +from bson import CodecOptions, decode_file_iter + +import genny.csv2 +from genny import cedar + + +def _get_fixture(*csv_file_path): + return pjoin('tests', 'fixtures', *csv_file_path) + + +class CedarTest(unittest.TestCase): + + def test_split_csv2(self): + large_precise_float = 10 ** 15 + num_cols = len(genny.csv2.IntermediateCSVColumns.default_columns()) + mock_data_reader = [ + (['first' for _ in range(num_cols)], 'a1'), + ([1 for _ in range(num_cols)], 'a1'), + # Store a large number to make sure precision is not lost. Python csv converts + # numbers to floats by default, which has 2^53 or 10^15 precision. Unix time in + # milliseconds is currently 10^13. + ([large_precise_float for _ in range(num_cols)], 'a2') + + ] + with tempfile.TemporaryDirectory() as output_dir: + output_files = cedar.split_into_actor_csv_files(mock_data_reader, output_dir) + self.assertEqual(output_files, ['a1.csv', 'a2.csv']) + + a1o1 = pjoin(output_dir, output_files[0]) + self.assertTrue(os.path.isfile(a1o1)) + with open(a1o1) as f: + ll = list(csv.reader(f, quoting=csv.QUOTE_NONNUMERIC)) + self.assertEqual(len(ll), 3) + self.assertEqual(len(ll[0]), 11) + + a2o2 = pjoin(output_dir, output_files[1]) + self.assertTrue(os.path.isfile(a2o2)) + with open(a2o2) as f: + ll = list(csv.reader(f, quoting=csv.QUOTE_NONNUMERIC)) + self.assertEqual(len(ll), 2) + self.assertEqual(ll[1][0], large_precise_float) + self.assertEqual(len(ll[0]), 11) + + +class CedarIntegrationTest(unittest.TestCase): + def verify_output(self, bson_metrics_file_name, expected_results, check_last_row_only=False): + """ + :param bson_metrics_file_name: + :param expected_results: + :param check_last_row_only: Check that the last row is correct. Since the results are + cumulative, this likely means previous rows are all correct as well. + :return: + """ + with open(bson_metrics_file_name, 'rb') as f: + options = CodecOptions(document_class=OrderedDict) + index = 0 + if check_last_row_only: + decoded_bson = list(decode_file_iter(f, options)) + self.assertEqual(expected_results, decoded_bson[-1]) + else: + for doc in decode_file_iter(f, options): + self.assertEqual(doc, expected_results[index]) + index += 1 + + def test_cedar_main(self): + expected_result_insert = OrderedDict([ + ('ts', datetime(1970, 1, 1, 2, 46, 40)), + ('id', 0), + ('counters', OrderedDict([ + ('n', 9), + ('ops', 58), + ('size', 350), + ('errors', 23) + ])), + ('timers', OrderedDict([ + ('duration', 1320), + ('total', 1518) + ])), + ('gauges', OrderedDict([('workers', 5)])) + ]) + + expected_result_remove = OrderedDict([ + ('ts', datetime(1970, 1, 1, 2, 46, 40)), + ('id', 0), + ('counters', OrderedDict([ + ('n', 9), + ('ops', 58), + ('size', 257), + ('errors', 25) + ])), + ('timers', OrderedDict([ + ('duration', 1392), + ('total', 0) + ])), + ('gauges', OrderedDict([('workers', 5)])) + ]) + + with tempfile.TemporaryDirectory() as output_dir: + args = [ + _get_fixture('cedar', 'two_op.csv'), + output_dir + ] + + cedar.main__cedar(args) + + self.verify_output( + pjoin(output_dir, 'InsertRemove-Insert.bson'), + expected_result_insert, + check_last_row_only=True + ) + + self.verify_output( + pjoin(output_dir, 'InsertRemove-Remove.bson'), + expected_result_remove, + check_last_row_only=True + ) + + def test_cedar_main_2(self): + expected_result_greetings = OrderedDict([ + # The operation duration can be ignored because they're a few ns. + ('ts', datetime.utcfromtimestamp(42 / 1000)), + ('id', 3), + ('counters', OrderedDict([ + ('n', 2), + ('ops', 0), + ('size', 0), + ('errors', 0) + ])), + ('timers', OrderedDict([ + ('duration', 13), + ('total', 13) + ])), + ('gauges', OrderedDict([('workers', 1)])) + ]) + + expected_result_insert = [ + OrderedDict([ + ('ts', datetime.utcfromtimestamp(42 / 1000)), + ('id', 1), + ('counters', OrderedDict([ + ('n', 1), + ('ops', 9), + ('size', 300), + ('errors', 0) + ])), + ('timers', OrderedDict([ + ('duration', 23), + ('total', 23) + ])), + ('gauges', OrderedDict([('workers', 2)])) + ]), + OrderedDict([ + ('ts', datetime.utcfromtimestamp(42 / 1000)), + ('id', 2), + ('counters', OrderedDict([ + ('n', 2), + ('ops', 17), + ('size', 500), + ('errors', 0) + ])), + ('timers', OrderedDict([ + ('duration', 43), + ('total', 43) + ])), + ('gauges', OrderedDict([('workers', 2)])) + ]), + ] + + expected_result_remove = [ + OrderedDict([ + ('ts', datetime.utcfromtimestamp(42 / 1000)), + ('id', 2), + ('counters', OrderedDict([ + ('n', 1), + ('ops', 7), + ('size', 30), + ('errors', 0) + ])), + ('timers', OrderedDict([ + ('duration', 10), + ('total', 12) + ])), + ('gauges', OrderedDict([('workers', 2)])) + ]), + OrderedDict([ + ('ts', datetime.utcfromtimestamp(42 / 1000)), + ('id', 1), + ('counters', OrderedDict([ + ('n', 2), + ('ops', 13), + ('size', 70), + ('errors', 0) + ])), + ('timers', OrderedDict([ + ('duration', 27), + ('total', 29) + ])), + ('gauges', OrderedDict([('workers', 2)])) + ]), + ] + + with tempfile.TemporaryDirectory() as output_dir: + args = [ + _get_fixture('cedar', 'shared_with_cxx_metrics_test.csv'), + output_dir + ] + + cedar.main__cedar(args) + + self.verify_output( + pjoin(output_dir, 'HelloWorld-Greetings.bson'), + [expected_result_greetings], + ) + + self.verify_output( + pjoin(output_dir, 'InsertRemove-Insert.bson'), + expected_result_insert, + ) + + self.verify_output( + pjoin(output_dir, 'InsertRemove-Remove.bson'), + expected_result_remove, + ) diff --git a/src/python/tests/csv2_test.py b/src/python/tests/csv2_test.py new file mode 100644 index 0000000000..e65a0b3d22 --- /dev/null +++ b/src/python/tests/csv2_test.py @@ -0,0 +1,38 @@ +import os +import unittest + +from genny import csv2 + + +class CSV2Test(unittest.TestCase): + + @staticmethod + def get_fixture(*file_path): + return os.path.join('tests', 'fixtures', 'cedar', *file_path) + + def test_basic_parsing(self): + test_csv = csv2.CSV2(self.get_fixture('barebones.csv')) + with test_csv.data_reader() as _: + self.assertEqual(test_csv._unix_epoch_offset_ns, 90 * (10 ** 9)) + op_map = test_csv._operation_thread_count_map + self.assertDictEqual(op_map, {('MyActor', 'MyOperation'): 2}) + + def test_data_reader(self): + test_csv = csv2.CSV2(self.get_fixture('barebones.csv')) + with test_csv.data_reader() as dr: + self.assertEqual(next(dr), + ([102345.0, 12345000000, 0, 'MyOperation', 100, 0, 1, 6, 2, 40, 2], + 'MyActor')) + + def test_error_outcome(self): + test_csv = csv2.CSV2(self.get_fixture('error_outcome.csv')) + with test_csv.data_reader() as dr: + next(dr) + with self.assertRaisesRegex(csv2.CSV2ParsingError, 'Unexpected outcome on line'): + next(dr) + next(dr) + + def test_missing_clock_header(self): + with self.assertRaisesRegex(csv2.CSV2ParsingError, 'Unknown csv2 section title'): + with csv2.CSV2(self.get_fixture('invalid_title.csv')).data_reader() as _: + pass diff --git a/src/python/tests/fixtures/cedar/barebones.csv b/src/python/tests/fixtures/cedar/barebones.csv new file mode 100644 index 0000000000..a3854dd7c0 --- /dev/null +++ b/src/python/tests/fixtures/cedar/barebones.csv @@ -0,0 +1,12 @@ +Clocks +clock,nanoseconds +SystemTime,100014000000 +MetricsTime,10014000000 + +OperationThreadCounts +actor,operation,workers +MyActor,MyOperation,2 + +Operations +timestamp, actor , thread, operation, duration, outcome, n, ops, errors, size +12345000000 , MyActor, 0 , MyOperation , 100 , 0 , 1, 6 , 2 , 40 diff --git a/src/python/tests/fixtures/cedar/error_outcome.csv b/src/python/tests/fixtures/cedar/error_outcome.csv new file mode 100644 index 0000000000..f8483120ea --- /dev/null +++ b/src/python/tests/fixtures/cedar/error_outcome.csv @@ -0,0 +1,14 @@ +Clocks +clock,nanoseconds +SystemTime,100014000000 +MetricsTime,10014000000 + +OperationThreadCounts +actor,operation,workers +MyActor,MyOperation,2 + +Operations +timestamp, actor , thread, operation, duration, outcome, n, ops, errors, size +12345000000 , MyActor, 0 , MyOperation , 100 , 0 , 1, 6 , 2 , 40 +12345000000 , MyActor, 0 , MyOperation , 100 , 2 , 1, 6 , 2 , 40 +12345000000 , MyActor, 0 , MyOperation , 100 , 0 , 1, 6 , 2 , 40 diff --git a/src/python/tests/fixtures/cedar/invalid_title.csv b/src/python/tests/fixtures/cedar/invalid_title.csv new file mode 100644 index 0000000000..f266a7125a --- /dev/null +++ b/src/python/tests/fixtures/cedar/invalid_title.csv @@ -0,0 +1 @@ +BAD_TITLE diff --git a/src/python/tests/fixtures/cedar/shared_with_cxx_metrics_test.csv b/src/python/tests/fixtures/cedar/shared_with_cxx_metrics_test.csv new file mode 100644 index 0000000000..58088d4f3b --- /dev/null +++ b/src/python/tests/fixtures/cedar/shared_with_cxx_metrics_test.csv @@ -0,0 +1,18 @@ +Clocks +clock,nanoseconds +SystemTime,42000000 +MetricsTime,45 + +OperationThreadCounts +actor,operation,workers +HelloWorld,Greetings,1 +InsertRemove,Remove,2 +InsertRemove,Insert,2 + +Operations +timestamp,actor,thread,operation,duration,outcome,n,ops,errors,size +26,HelloWorld,3,Greetings,13,0,2,0,0,0 +42,InsertRemove,2,Remove,10,0,1,7,0,30 +45,InsertRemove,1,Remove,17,0,1,6,0,40 +28,InsertRemove,1,Insert,23,0,1,9,0,300 +30,InsertRemove,2,Insert,20,0,1,8,0,200 diff --git a/src/python/tests/fixtures/cedar/two_op.csv b/src/python/tests/fixtures/cedar/two_op.csv new file mode 100644 index 0000000000..e0bbe1c397 --- /dev/null +++ b/src/python/tests/fixtures/cedar/two_op.csv @@ -0,0 +1,30 @@ +Clocks +clock,nanoseconds +SystemTime,10000000000000, +MetricsTime,66632088, + +OperationThreadCounts +actor,operation,workers +InsertRemove,Insert,5 +InsertRemove,Remove,5 + +Operations +timestamp , actor , thread, operation, duration, outcome, n, ops, errors, size +66632088, InsertRemove, 0 , Insert , 100 , 0 , 1, 6 , 2 , 40 +66632403, InsertRemove, 0 , Insert , 310 , 0 , 1, 9 , 3 , 60 +66632661, InsertRemove, 0 , Insert , 180 , 0 , 1, 8 , 6 , 20 +66632088, InsertRemove, 1 , Insert , 280 , 1 , 1, 3 , 1 , 20 +66632316, InsertRemove, 1 , Insert , 190 , 1 , 1, 4 , 1 , 30 +66632088, InsertRemove, 2 , Insert , 30 , 0 , 1, 4 , 1 , 30 +66632245, InsertRemove, 2 , Insert , 80 , 1 , 1, 8 , 7 , 10 +66632088, InsertRemove, 3 , Insert , 110 , 0 , 1, 7 , 2 , 50 +66632088, InsertRemove, 4 , Insert , 40 , 0 , 1, 9 , 0 , 90 +66632088, InsertRemove, 0 , Remove , 100 , 0 , 1, 6 , 2 , 40 +66632403, InsertRemove, 0 , Remove , 410 , 0 , 1, 9 , 3 , 60 +66632661, InsertRemove, 0 , Remove , 80 , 0 , 1, 8 , 6 , 21 +66632088, InsertRemove, 4 , Remove , 280 , 1 , 1, 3 , 2 , 24 +66632316, InsertRemove, 1 , Remove , 123 , 1 , 1, 4 , 2 , 35 +66632088, InsertRemove, 3 , Remove , 130 , 0 , 1, 4 , 1 , 36 +66632245, InsertRemove, 2 , Remove , 139 , 1 , 1, 8 , 7 , 10 +66632088, InsertRemove, 2 , Remove , 110 , 0 , 1, 7 , 2 , 12 +66632088, InsertRemove, 1 , Remove , 20 , 0 , 1, 9 , 0 , 19 diff --git a/src/python/tests/metrics_output_parser_test.py b/src/python/tests/metrics_output_parser_test.py index 674b9fcd9c..f70ddcf562 100644 --- a/src/python/tests/metrics_output_parser_test.py +++ b/src/python/tests/metrics_output_parser_test.py @@ -141,7 +141,6 @@ def test_fixture2(self): def test_fixture3(self): actual = test_lib.parse_fixture('csvoutput3').timers() - print(actual) self.assertEqual( actual, { 'InsertRemoveTest.remove.op-time': { diff --git a/src/python/tests/perf_json_test.py b/src/python/tests/perf_json_test.py index 16b322664e..619c633714 100644 --- a/src/python/tests/perf_json_test.py +++ b/src/python/tests/perf_json_test.py @@ -63,30 +63,30 @@ def test_fixture2(self): } } }, - { - 'name': 'InsertRemoveTest.insert', - 'workload': 'InsertRemoveTest.insert', - 'start': 15402330730.74953, - 'end': 15402333807.63649, - 'results': { - 100: { - 'ops_per_sec': 8656706.69744836, - 'ops_per_sec_values': [8656706.69744836] - } - } - }, - { - 'name': 'Genny.Setup', - 'workload': 'Genny.Setup', - 'start': 15402330355.93684, - 'end': 15402330442.88445, - 'results': { - 1: { - 'ops_per_sec': 8694761.0, - 'ops_per_sec_values': [8694761.0] - } - } - }] + { + 'name': 'InsertRemoveTest.insert', + 'workload': 'InsertRemoveTest.insert', + 'start': 15402330730.74953, + 'end': 15402333807.63649, + 'results': { + 100: { + 'ops_per_sec': 8656706.69744836, + 'ops_per_sec_values': [8656706.69744836] + } + } + }, + { + 'name': 'Genny.Setup', + 'workload': 'Genny.Setup', + 'start': 15402330355.93684, + 'end': 15402330442.88445, + 'results': { + 1: { + 'ops_per_sec': 8694761.0, + 'ops_per_sec_values': [8694761.0] + } + } + }] }) def test_fixture1(self): @@ -106,16 +106,16 @@ def test_fixture1(self): } } }, - { - 'name': 'HelloTest.output', - 'workload': 'HelloTest.output', - 'start': 15378141410.61476, - 'end': 15378141434.57943, - 'results': { - 2: { - 'ops_per_sec': 55527.25, - 'ops_per_sec_values': [55527.25] - } - } - }] + { + 'name': 'HelloTest.output', + 'workload': 'HelloTest.output', + 'start': 15378141410.61476, + 'end': 15378141434.57943, + 'results': { + 2: { + 'ops_per_sec': 55527.25, + 'ops_per_sec_values': [55527.25] + } + } + }] }) diff --git a/src/python/third_party/__init__.py b/src/python/third_party/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/python/third_party/csvsort.py b/src/python/third_party/csvsort.py new file mode 100644 index 0000000000..bb6ca4d946 --- /dev/null +++ b/src/python/third_party/csvsort.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- + +""" +This file is originally from the csvsort project: +https://bitbucket.org/richardpenman/csvsort + +MongoDB Modifications: +1. add the quoting=quoting argument to csv.reader() +""" + +import csv +import heapq +import logging +import os +import sys +import tempfile +from optparse import OptionParser + +csv.field_size_limit(sys.maxsize) + + +class CsvSortError(Exception): + pass + + +def csvsort(input_filename, + columns, + output_filename=None, + max_size=100, + has_header=True, + delimiter=',', + show_progress=False, + quoting=csv.QUOTE_MINIMAL): + """Sort the CSV file on disk rather than in memory. + + The merge sort algorithm is used to break the file into smaller sub files + + Args: + input_filename: the CSV filename to sort. + columns: a list of columns to sort on (can be 0 based indices or header + keys). + output_filename: optional filename for sorted file. If not given then + input file will be overriden. + max_size: the maximum size (in MB) of CSV file to load in memory at + once. + has_header: whether the CSV contains a header to keep separated from + sorting. + delimiter: character used to separate fields, default ','. + show_progress (Boolean): A flag whether or not to show progress. + The default is False, which does not print any merge information. + quoting: How much quoting is needed in the final CSV file. Default is + csv.QUOTE_MINIMAL. + """ + + with open(input_filename) as input_fp: + reader = csv.reader(input_fp, delimiter=delimiter, quoting=quoting) + if has_header: + header = next(reader) + else: + header = None + + columns = parse_columns(columns, header) + + filenames = csvsplit(reader, max_size, quoting) + if show_progress: + logging.info('Merging %d splits' % len(filenames)) + for filename in filenames: + memorysort(filename, columns, quoting) + sorted_filename = mergesort(filenames, columns, quoting) + + # XXX make more efficient by passing quoting, delimiter, and moving result + # generate the final output file + with open(output_filename or input_filename, 'w') as output_fp: + writer = csv.writer(output_fp, delimiter=delimiter, quoting=quoting) + if header: + writer.writerow(header) + with open(sorted_filename) as sorted_fp: + for row in csv.reader(sorted_fp, quoting=quoting): + writer.writerow(row) + + os.remove(sorted_filename) + + +def parse_columns(columns, header): + """check the provided column headers + """ + for i, column in enumerate(columns): + if isinstance(column, int): + if header: + if column >= len(header): + raise CsvSortError( + 'Column index is out of range: "{}"'.format(column)) + else: + # find index of column from header + if header is None: + raise CsvSortError( + 'CSV needs a header to find index of this column name:' + + ' "{}"'.format(column)) + else: + if column in header: + columns[i] = header.index(column) + else: + raise CsvSortError( + 'Column name is not in header: "{}"'.format(column)) + return columns + + +def csvsplit(reader, max_size, quoting): + """Split into smaller CSV files of maximum size and return the filenames. + """ + max_size = max_size * 1024 * 1024 # convert to bytes + writer = None + current_size = 0 + split_filenames = [] + + # break CSV file into smaller merge files + for row in reader: + if writer is None: + ntf = tempfile.NamedTemporaryFile(delete=False, mode='w') + writer = csv.writer(ntf, quoting=quoting) + split_filenames.append(ntf.name) + + writer.writerow(row) + current_size += sys.getsizeof(row) + if current_size > max_size: + writer = None + current_size = 0 + return split_filenames + + +def memorysort(filename, columns, quoting): + """Sort this CSV file in memory on the given columns + """ + with open(filename) as input_fp: + rows = [row for row in csv.reader(input_fp, quoting=quoting)] + rows.sort(key=lambda row: get_key(row, columns)) + with open(filename, 'w') as output_fp: + writer = csv.writer(output_fp, quoting=quoting) + for row in rows: + writer.writerow(row) + + +def get_key(row, columns): + """Get sort key for this row + """ + return [row[column] for column in columns] + + +def decorated_csv(filename, columns, quoting): + """Iterator to sort CSV rows + """ + with open(filename) as fp: + for row in csv.reader(fp, quoting=quoting): + yield get_key(row, columns), row + + +def mergesort(sorted_filenames, columns, quoting, nway=2): + """Merge these 2 sorted csv files into a single output file + """ + merge_n = 0 + while len(sorted_filenames) > 1: + merge_filenames, sorted_filenames = \ + sorted_filenames[:nway], sorted_filenames[nway:] + + with tempfile.NamedTemporaryFile(delete=False, mode='w') as output_fp: + writer = csv.writer(output_fp, quoting=quoting) + merge_n += 1 + for _, row in heapq.merge(*[decorated_csv(filename, columns, quoting) + for filename in merge_filenames]): + writer.writerow(row) + + sorted_filenames.append(output_fp.name) + + for filename in merge_filenames: + os.remove(filename) + return sorted_filenames[0] + + +def main(): + parser = OptionParser() + parser.add_option( + '-c', + '--column', + dest='columns', + action='append', + help='column of CSV to sort on') + parser.add_option( + '-s', + '--size', + dest='max_size', + type='float', + default=100, + help='maximum size of each split CSV file in MB (default 100)') + parser.add_option( + '-n', + '--no-header', + dest='has_header', + action='store_false', + default=True, + help='set CSV file has no header') + parser.add_option( + '-d', + '--delimiter', + default=',', + help='set CSV delimiter (default ",")') + args, input_files = parser.parse_args() + + if not input_files: + parser.error('What CSV file should be sorted?') + elif not args.columns: + parser.error('Which columns should be sorted on?') + else: + # escape backslashes + args.delimiter = args.delimiter.decode('string_escape') + args.columns = [int(column) if column.isdigit() else column + for column in args.columns] + csvsort( + input_files[0], + columns=args.columns, + max_size=args.max_size, + has_header=args.has_header, + delimiter=args.delimiter) + + +if __name__ == '__main__': + main()