Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTF2 reader using core reader #143

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
173 changes: 173 additions & 0 deletions pipit/readers/core_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from typing import List, Dict

import pandas
import numpy
from pipit.trace import Trace


class CoreTraceReader:
"""
Helper Object to read traces from different sources and convert them into a common
format
"""

def __init__(self, start: int = 0, stride: int = 1):
"""
Should be called by each process to create an empty trace per process in the
reader. Creates the following data structures to represent an empty trace:
- events: Dict[int, Dict[int, List[Dict]]]
- stacks: Dict[int, Dict[int, List[int]]]
"""
# keep stride for how much unique id should be incremented
self.stride = stride

# keep track of a unique id for each event
self.unique_id = start - self.stride

# events are indexed by process number, then thread number
# stores a list of events
self.events: Dict[int, Dict[int, List[Dict]]] = {}

# stacks are indexed by process number, then thread number
# stores indices of events in the event list
self.stacks: Dict[int, Dict[int, List[int]]] = {}

def add_event(self, event: Dict) -> None:
"""
Should be called to add each event to the trace. Will update the event lists and
stacks accordingly.
"""
# get process number -- if not present, set to 0
if "Process" in event:
process = event["Process"]
else:
process = 0

# get thread number -- if not present, set to 0
if "Thread" in event:
thread = event["Thread"]
else:
thread = 0
# event["Thread"] = 0

# assign a unique id to the event
event["unique_id"] = self.__get_unique_id()

# get event list
if process not in self.events:
self.events[process] = {}
if thread not in self.events[process]:
self.events[process][thread] = []
event_list = self.events[process][thread]

# get stack
if process not in self.stacks:
self.stacks[process] = {}
if thread not in self.stacks[process]:
self.stacks[process][thread] = []
stack: List[int] = self.stacks[process][thread]

# if the event is an enter event, add the event to the stack and update the
# parent-child relationships
if event["Event Type"] == "Enter":
self.__update_parent_child_relationships(event, stack, event_list, False)
elif event["Event Type"] == "Instant":
self.__update_parent_child_relationships(event, stack, event_list, True)
# if the event is a leave event, update the matching event and pop from the
# stack
elif event["Event Type"] == "Leave":
self.__update_match_event(event, stack, event_list)

# Finally add the event to the event list
event_list.append(event)

def finalize(self):
"""
Converts the events data structure into a pandas dataframe and returns it
"""
all_events = []
for process in self.events:
for thread in self.events[process]:
all_events.extend(self.events[process][thread])

# create a dataframe
trace_df = pandas.DataFrame(all_events)

trace_df["_matching_event"].fillna(-1, inplace=True)
trace_df["_parent"].fillna(-1, inplace=True)
trace_df["_matching_timestamp"].fillna(-1, inplace=True)

# categorical for memory savings
trace_df = trace_df.astype(
{
"Name": "category",
"Event Type": "category",
"Process": "category",
"_matching_event": "int32",
"_parent": "int32",
"_matching_timestamp": "int32",
}
)
return trace_df

def __update_parent_child_relationships(
self, event: Dict, stack: List[int], event_list: List[Dict], is_instant: bool
) -> None:
"""
This method can be thought of the update upon an "Enter" event. It adds to the
stack and CCT
"""
if len(stack) == 0:
# root event
event["_parent"] = -1
else:
parent_event = event_list[stack[-1]]
event["_parent"] = parent_event["unique_id"]

# update stack
if not is_instant:
stack.append(len(event_list))

def __update_match_event(
self, leave_event: Dict, stack: List[int], event_list: List[Dict]
) -> None:
"""
This method can be thought of the update upon a "Leave" event. It pops from the
stack and updates the event list. We should look into using this function to add
artificial "Leave" events for unmatched "Enter" events
"""

while len(stack) > 0:

# popping matched events from the stack
enter_event = event_list[stack.pop()]

if enter_event["Name"] == leave_event["Name"]:
# matching event found

# update matching event ids
leave_event["_matching_event"] = enter_event["unique_id"]
enter_event["_matching_event"] = leave_event["unique_id"]

# update matching timestamps
leave_event["_matching_timestamp"] = enter_event["Timestamp (ns)"]
enter_event["_matching_timestamp"] = leave_event["Timestamp (ns)"]

break

def __get_unique_id(self) -> int:
self.unique_id += self.stride
return self.unique_id


def concat_trace_data(data_list):
"""
Concatenates the data from multiple trace readers into a single trace reader
"""
trace_data = pandas.concat(data_list, ignore_index=True)
# set index to unique_id
trace_data.set_index("unique_id", inplace=True)
trace_data.sort_values(
by="Timestamp (ns)", axis=0, ascending=True, inplace=True, ignore_index=True
)
return Trace(None, trace_data, None)
89 changes: 42 additions & 47 deletions pipit/readers/otf2_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd
import multiprocessing as mp
import pipit.trace
from pipit.readers.core_reader import CoreTraceReader, concat_trace_data


class OTF2Reader:
Expand Down Expand Up @@ -162,6 +163,9 @@ def events_reader(self, rank_size):
locations = list(trace.definitions._locations)
num_locations = len(locations)

# start core reader
core_reader = CoreTraceReader(rank, size)

# base number of locations read by each process
per_process = int(num_locations // size)

Expand Down Expand Up @@ -190,11 +194,6 @@ def events_reader(self, rank_size):
# select the locations to read based on above calculations
loc_events = list(trace.events(locations[begin_int:end_int]).__iter__())

# columns of the DataFrame
timestamps, event_types, event_attributes, names = [], [], [], []

# note: the below lists are for storing logical ids
process_ids, thread_ids = [], []

"""
Relevant Documentation for Metrics:
Expand Down Expand Up @@ -227,7 +226,7 @@ def events_reader(self, rank_size):
)

# maps each metric to a list of its values
metrics_dict = {metric_name: [] for metric_name in metric_names}
metrics_dict = {metric_name: float("nan") for metric_name in metric_names}

# used to keep track of time that the
# most recent metrics that were read at
Expand All @@ -239,6 +238,10 @@ def events_reader(self, rank_size):
# location could be thread, process, etc
loc, event = loc_event[0], loc_event[1]

timestamp, event_t, event_attribute, name = None, None, None, None

process_id, thread_id = None, None

# To Do:
# Support for GPU events has to be
# added and unified across readers.
Expand All @@ -254,54 +257,56 @@ def events_reader(self, rank_size):
)
metric_values = event.values

# append the values for the metrics
# to their appropriate lists
# Set the values for the metrics
for i in range(len(metrics)):
metrics_dict[metrics[i]].append(metric_values[i])
metrics_dict[metrics[i]] = metric_values[i]

# store the metrics and their timestamp
prev_metric_time = event.time
else:
new_event = {}
# MetricClass metric events are synchronous
# and coupled with an enter or leave event that
# has the same timestamp
if event.time != prev_metric_time:
# if the event is not paired with any metric, then
# add placeholders for all the metric lists
# add placeholder
for metric in metric_names:
metrics_dict[metric].append(float("nan"))
new_event[metric] = float("nan")
else:
for metric, metric_value in metrics_dict.items():
new_event[metric] = metric_value


# reset this as a metric event was not read
prev_metric_time = -1
metrics_dict = {metric_name: float("nan") for metric_name in metric_names}

"""
Below is code to read the primary information about the
non-metric event, such as location, attributes, etc.
"""

process_id = loc.group._ref
process_ids.append(process_id)

# subtract the minimum location number of a process
# from the location number to get threads numbered
# 0 to (num_threads per process - 1) for each process.
thread_ids.append(
loc._ref - self.process_threads_map[process_id]
)
thread_id = loc._ref - self.process_threads_map[process_id]

# type of event - enter, leave, or other types
event_type = str(type(event))[20:-2]
if event_type == "Enter" or event_type == "Leave":
event_types.append(event_type)
event_t = event_type
else:
event_types.append("Instant")
event_t = "Instant"

if event_type in ["Enter", "Leave"]:
names.append(event.region.name)
name = event.region.name
else:
names.append(event_type)
name = event_type

timestamps.append(event.time)
timestamp = event.time

# only add attributes for non-leave rows so that
# there aren't duplicate attributes for a single event
Expand All @@ -319,34 +324,28 @@ def events_reader(self, rank_size):
attributes_dict[self.field_to_val(key)] = (
self.handle_data(value)
)
event_attributes.append(attributes_dict)
event_attribute = attributes_dict
else:
# nan attributes for leave rows
# attributes column is of object dtype
event_attributes.append(None)
event_attribute = None

columns = {
"Name": name,
"Event Type": event_t,
"Timestamp (ns)": timestamp,
"Thread": thread_id,
"Process": process_id,
"Attributes": event_attribute,
}

new_event.update(columns)
core_reader.add_event(new_event)

trace.close() # close event files

# returns dataframe with all events and their fields
trace_df = pd.DataFrame(
{
"Timestamp (ns)": timestamps,
"Event Type": event_types,
"Name": names,
"Thread": thread_ids,
"Process": process_ids,
"Attributes": event_attributes,
}
)

for metric, metric_values in metrics_dict.items():
# only add columns of metrics which are populated with
# some values (sometimes a metric could be defined but not
# appear in the trace itself)
if not np.isnan(metric_values).all():
trace_df[metric] = metric_values

return trace_df
return core_reader.finalize()

def read_definitions(self, trace):
"""
Expand Down Expand Up @@ -452,8 +451,8 @@ def read_events(self):
pool.close()

# merges the dataframe into one events dataframe
events_dataframe = pd.concat(events_dataframes)
del events_dataframes
trace = concat_trace_data(events_dataframes)
events_dataframe = trace.events

# accessing the clock properties of the trace using the definitions
clock_properties = self.definitions.loc[
Expand All @@ -470,10 +469,6 @@ def read_events(self):
events_dataframe["Timestamp (ns)"] -= offset
events_dataframe["Timestamp (ns)"] *= (10**9) / resolution

# ensures the DataFrame is in order of increasing timestamp
events_dataframe.sort_values(
by="Timestamp (ns)", axis=0, ascending=True, inplace=True, ignore_index=True
)

# convert these to ints
# (sometimes they get converted to floats
Expand Down
Loading