forked from mongodb/genny
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
TIG-1189 script to convert genny metrics to cedar format (mongodb#175)
* TIG-1189 add skeleton csv2 parser * finish csv2 parser * add more tests for csv2 parsing * add csv processor * split csv2 into multiple csv * sort csv * vendor csvsort * modify csvsort * add test * format * add thread count * add unix epoch offset * WIP * finish implementation * add end-to-end integration test * add more tests * add new shared fixture with cxx test * modify existing fixtures to new format * allow column order to change * add header to intermediate csv format for readability * allow intermediate csv columns to be reordered in the future * better parsing of headers * allow arbitrary time ordering * implement datareader as context * format * compute total time * store duration and total in NS * ixx total again * remove testing of intermediate files * fix total computation * don't use unix time, use TS for total * add test for remove * format
- Loading branch information
Showing
18 changed files
with
1,145 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,238 @@ | ||
#!/usr/bin/env python3 | ||
# | ||
# Copyright 2019-present MongoDB Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import argparse | ||
import csv | ||
import sys | ||
from collections import OrderedDict, defaultdict | ||
from datetime import datetime | ||
from os.path import join as pjoin | ||
|
||
from bson import BSON | ||
|
||
from genny.csv2 import CSV2, IntermediateCSVColumns | ||
from third_party.csvsort import csvsort | ||
|
||
""" | ||
Convert raw genny csv output to a format expected by Cedar | ||
(a.k.a. MongoDB Evergreen Expanded Metrics) | ||
Sample input: | ||
``` | ||
Clocks | ||
SystemTime,100000000000000000 | ||
MetricsTime,86491166632088 | ||
OperationThreadCounts | ||
InsertRemove,Insert,5 | ||
Operations | ||
timestamp , actor , thread, operation, duration, outcome, n, ops, errors, size | ||
86491166632088, InsertRemove, 0 , Insert , 100 , 0 , 1, 6 , 2 , 40 | ||
86491166632403, InsertRemove, 0 , Insert , 310 , 0 , 1, 9 , 3 , 60 | ||
86491166632661, InsertRemove, 0 , Insert , 180 , 0 , 1, 8 , 6 , 20 | ||
86491166632088, InsertRemove, 1 , Insert , 280 , 1 , 1, 3 , 1 , 20 | ||
86491166632316, InsertRemove, 1 , Insert , 190 , 1 , 1, 4 , 1 , 30 | ||
86491166632088, InsertRemove, 2 , Insert , 30 , 0 , 1, 4 , 1 , 30 | ||
86491166632245, InsertRemove, 2 , Insert , 80 , 1 , 1, 8 , 7 , 10 | ||
86491166632088, InsertRemove, 3 , Insert , 110 , 0 , 1, 7 , 2 , 50 | ||
86491166632088, InsertRemove, 4 , Insert , 40 , 0 , 1, 9 , 0 , 90 | ||
``` | ||
Sample output: | ||
``` | ||
{ts:TS(0),id:0,counters:{n:1,ops:6,size:40,errors:2},timers:{duration:100,total:100},gauges:{workers:5}} | ||
{ts:TS(0),id:1,counters:{n:2,ops:9,size:60,errors:3},timers:{duration:380,total:380},gauges:{workers:5}} | ||
{ts:TS(0),id:2,counters:{n:3,ops:13,size:90,errors:4},timers:{duration:410,total:410},gauges:{workers:5}} | ||
{ts:TS(0),id:3,counters:{n:4,ops:20,size:140,errors:6},timers:{duration:520,total:520},gauges:{workers:5}} | ||
{ts:TS(0),id:4,counters:{n:5,ops:29,size:230,errors:6},timers:{duration:560,total:560},gauges:{workers:5}} | ||
{ts:TS(157),id:2,counters:{n:6,ops:37,size:240,errors:13},timers:{duration:640,total:717},gauges:{workers:5}} | ||
{ts:TS(228),id:1,counters:{n:7,ops:41,size:270,errors:14},timers:{duration:830,total:945},gauges:{workers:5}} | ||
{ts:TS(315),id:0,counters:{n:8,ops:50,size:330,errors:17},timers:{duration:1140,total:1260},gauges:{workers:5}} | ||
{ts:TS(573),id:0,counters:{n:9,ops:58,size:350,errors:23},timers:{duration:1320,total:1518},gauges:{workers:5}} | ||
``` | ||
""" | ||
|
||
|
||
class IntermediateCSVReader: | ||
""" | ||
Class that reads IntermediateCSVColumns and outputs Cedar BSON | ||
""" | ||
|
||
def __init__(self, reader): | ||
self.raw_reader = reader | ||
# dict(operation -> dict(metric_column -> cumulative_value)) | ||
self.cumulatives_for_op = defaultdict(lambda: defaultdict(int)) | ||
|
||
# dict(operation -> total_duration) | ||
self.total_for_op = defaultdict(int) | ||
|
||
# dict(thread -> prev_op_ts)) | ||
self.prev_ts_for_thread = {} | ||
|
||
def __iter__(self): | ||
return self | ||
|
||
def __next__(self): | ||
return self._parse_into_cedar_format(next(self.raw_reader)) | ||
|
||
def _compute_cumulatives(self, line): | ||
op = line[IntermediateCSVColumns.OPERATION] | ||
|
||
n = IntermediateCSVColumns.N | ||
ops = IntermediateCSVColumns.OPS | ||
size = IntermediateCSVColumns.SIZE | ||
err = IntermediateCSVColumns.ERRORS | ||
dur = IntermediateCSVColumns.DURATION | ||
for col in [n, ops, size, err, dur]: | ||
self.cumulatives_for_op[op][col] += line[col] | ||
|
||
ts_col = IntermediateCSVColumns.TS | ||
thread = line[IntermediateCSVColumns.THREAD] | ||
|
||
# total_duration is duration for the first operation on each thread | ||
# because we don't track when each thread starts. | ||
if thread in self.prev_ts_for_thread: | ||
cur_total = line[ts_col] - self.prev_ts_for_thread[thread] | ||
else: | ||
cur_total = line[dur] | ||
|
||
self.total_for_op[op] += cur_total | ||
self.prev_ts_for_thread[thread] = line[ts_col] | ||
|
||
def _parse_into_cedar_format(self, line): | ||
# Compute all cumulative values for simplicity; Not all values are used. | ||
self._compute_cumulatives(line) | ||
|
||
# milliseconds to seconds to datetime.datetime() | ||
ts = datetime.utcfromtimestamp(line[IntermediateCSVColumns.UNIX_TIME] / 1000) | ||
op = line[IntermediateCSVColumns.OPERATION] | ||
|
||
res = OrderedDict([ | ||
('ts', ts), | ||
('id', int(line[IntermediateCSVColumns.THREAD])), | ||
('counters', OrderedDict([ | ||
('n', int(self.cumulatives_for_op[op][IntermediateCSVColumns.N])), | ||
('ops', int(self.cumulatives_for_op[op][IntermediateCSVColumns.OPS])), | ||
('size', int(self.cumulatives_for_op[op][IntermediateCSVColumns.SIZE])), | ||
('errors', int(self.cumulatives_for_op[op][IntermediateCSVColumns.ERRORS])) | ||
])), | ||
('timers', OrderedDict([ | ||
('duration', int(self.cumulatives_for_op[op][IntermediateCSVColumns.DURATION])), | ||
('total', int(self.total_for_op[op])) | ||
])), | ||
('gauges', OrderedDict([ | ||
('workers', int(line[IntermediateCSVColumns.WORKERS])) | ||
])) | ||
]) | ||
|
||
return res, op | ||
|
||
|
||
def split_into_actor_operation_and_transform_to_cedar_format(file_name, out_dir): | ||
""" | ||
Split up the IntermediateCSV format input into one or more BSON files in Cedar | ||
format; one file for each (actor, operation) pair. | ||
""" | ||
|
||
# remove the ".csv" suffix to get the actor name. | ||
actor_name = file_name[:-4] | ||
|
||
out_files = {} | ||
with open(pjoin(out_dir, file_name)) as in_f: | ||
reader = csv.reader(in_f, quoting=csv.QUOTE_NONNUMERIC) | ||
next(reader) # Ignore the header row. | ||
try: | ||
for ordered_dict, op in IntermediateCSVReader(reader): | ||
out_file_name = pjoin(out_dir, "{}-{}.bson".format(actor_name, op)) | ||
if out_file_name not in out_files: | ||
out_files[out_file_name] = open(out_file_name, 'wb') | ||
out_files[out_file_name].write(BSON.encode(ordered_dict)) | ||
finally: | ||
for fh in out_files.values(): | ||
fh.close() | ||
|
||
|
||
def split_into_actor_csv_files(data_reader, out_dir): | ||
""" | ||
Split up the monolithic genny metrics csv2 file into smaller [actor].csv files | ||
""" | ||
cur_out_csv = None | ||
cur_out_fh = None | ||
cur_actor = (None, None) | ||
output_files = [] | ||
|
||
for line, actor in data_reader: | ||
if actor != cur_actor: | ||
cur_actor = actor | ||
|
||
# Close out old file. | ||
if cur_out_fh: | ||
cur_out_fh.close() | ||
|
||
# Open new csv file. | ||
file_name = actor + '.csv' | ||
output_files.append(file_name) | ||
cur_out_fh = open(pjoin(out_dir, file_name), 'w', newline='') | ||
|
||
# Quote non-numeric values so they get converted to float automatically | ||
cur_out_csv = csv.writer(cur_out_fh, quoting=csv.QUOTE_NONNUMERIC) | ||
cur_out_csv.writerow(IntermediateCSVColumns.default_columns()) | ||
|
||
cur_out_csv.writerow(line) | ||
|
||
cur_out_fh.close() | ||
|
||
return output_files | ||
|
||
|
||
def sort_csv_file(file_name, out_dir): | ||
# Sort on Timestamp and Thread. | ||
csvsort(pjoin(out_dir, file_name), | ||
[IntermediateCSVColumns.TS, IntermediateCSVColumns.THREAD], | ||
quoting=csv.QUOTE_NONNUMERIC, | ||
has_header=True, | ||
show_progress=True) | ||
|
||
|
||
def parse_args(argv): | ||
parser = argparse.ArgumentParser( | ||
description='Convert Genny csv2 perf data to Cedar BSON format', | ||
) | ||
parser.add_argument('input_file', metavar='input-file', help='path to genny csv2 perf data') | ||
parser.add_argument('output_dir', metavar='output-dir', | ||
help='directory to store output BSON files') | ||
|
||
return parser.parse_args(argv) | ||
|
||
|
||
def main__cedar(argv=sys.argv[1:]): | ||
args = parse_args(argv) | ||
out_dir = args.output_dir | ||
|
||
# Read CSV2 file | ||
my_csv2 = CSV2(args.input_file) | ||
|
||
with my_csv2.data_reader() as data_reader: | ||
# Separate into actor-operation | ||
files = split_into_actor_csv_files(data_reader, out_dir) | ||
|
||
for f in files: | ||
# csvsort by timestamp, thread | ||
sort_csv_file(f, out_dir) | ||
|
||
# compute cumulative and stream output to bson file | ||
split_into_actor_operation_and_transform_to_cedar_format(f, out_dir) |
Oops, something went wrong.