Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace: Filter #62

Open
wants to merge 81 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
4193cda
Initial query code
hsirkar Feb 17, 2023
ca934cd
WIP
hsirkar Feb 18, 2023
fa32996
Add orderby support
hsirkar Feb 18, 2023
b2430eb
Rewrite everything
hsirkar Feb 18, 2023
398725a
Use abstract "apply" function
hsirkar Feb 18, 2023
a3978fc
Add comments/docstrings
hsirkar Feb 19, 2023
f7141b5
Rename classes to Filter, Sort
hsirkar Feb 19, 2023
1c2d183
Add select and exclude queries
hsirkar Feb 19, 2023
99c482d
Rename "apply" and "query"
hsirkar Feb 19, 2023
b697601
Merge remote-tracking branch 'origin/develop' into query
hsirkar Mar 3, 2023
3079224
Initial query code
hsirkar Feb 17, 2023
266ad31
WIP
hsirkar Feb 18, 2023
4426f3f
Add orderby support
hsirkar Feb 18, 2023
25beb9f
Rewrite everything
hsirkar Feb 18, 2023
d886dd2
Use abstract "apply" function
hsirkar Feb 18, 2023
0be2a36
Add comments/docstrings
hsirkar Feb 19, 2023
0fb8bb6
Rename classes to Filter, Sort
hsirkar Feb 19, 2023
7c402b1
Add select and exclude queries
hsirkar Feb 19, 2023
4e04e9c
Rename "apply" and "query"
hsirkar Feb 19, 2023
444c174
Merge branch 'query' of https://github.com/hpcgroup/pipit into query
hsirkar Mar 3, 2023
860079c
merge
hsirkar Mar 3, 2023
554fd92
Merge remote-tracking branch 'origin/develop' into query
hsirkar Mar 6, 2023
721f6d1
filter both events and cct
hsirkar Mar 7, 2023
2cbc8b9
rewrite api
hsirkar Mar 7, 2023
37d9162
fix flake8
hsirkar Mar 7, 2023
bee682a
Delete query.ipynb
hsirkar Mar 7, 2023
64ac87e
Delete query.py
hsirkar Mar 7, 2023
fa79ade
remove changes to cct
hsirkar Mar 11, 2023
34a7655
undo changes
hsirkar Mar 11, 2023
54a1c64
add docs
hsirkar Mar 12, 2023
1aa762f
Add tests
hsirkar Mar 12, 2023
77198aa
Implement _validate and keep_invalid
hsirkar Mar 12, 2023
6c78c95
add docs
hsirkar Mar 13, 2023
6cc91fa
docs
hsirkar Mar 13, 2023
4dab73f
Merge remote-tracking branch 'origin/develop' into query
hsirkar Mar 13, 2023
838d5dc
fix bug
hsirkar Mar 13, 2023
3162bbd
add tests
hsirkar Mar 13, 2023
6b2d42d
remove support for func argument
hsirkar Mar 17, 2023
0bf7b45
add support for value parsing
hsirkar Mar 19, 2023
39a46c9
wip
hsirkar Mar 19, 2023
659e60d
fix
hsirkar Mar 19, 2023
13de223
add parse_time to filter tests
hsirkar Mar 20, 2023
27b751b
indexing WIP
hsirkar Mar 20, 2023
aae8a31
minor
hsirkar Mar 20, 2023
758c128
remove indexing api
hsirkar Mar 23, 2023
ff00a5c
fix flake8
hsirkar Mar 23, 2023
c603062
keep_invalid --> validate
hsirkar Mar 24, 2023
4dca622
use eval instead of query
hsirkar Mar 26, 2023
e674d77
doc
hsirkar Mar 26, 2023
95c36c7
doc + comments
hsirkar Mar 26, 2023
4b712eb
use eval
hsirkar Mar 26, 2023
3955cc3
Unit tests WIP
hsirkar Mar 26, 2023
d45107b
Add unit tests
hsirkar Mar 27, 2023
d86ffb7
add expr to test
hsirkar Mar 27, 2023
7180933
doc
hsirkar Mar 27, 2023
e43eb3c
Make Trace.eval internal
hsirkar Mar 27, 2023
43d209c
Merge remote-tracking branch 'origin/develop' into query
hsirkar Mar 27, 2023
cfe9147
add validation code
hsirkar Mar 27, 2023
477877e
Get rid of confusing code for eval
hsirkar Mar 27, 2023
ac92ebb
Fix between bug
hsirkar Mar 28, 2023
dc4bc10
Update tests to new OTF2 trace
hsirkar Mar 28, 2023
5727b26
Merge remote-tracking branch 'origin/develop' into query
hsirkar Mar 30, 2023
f431029
Change validate implementation (much faster)
hsirkar Mar 30, 2023
0d0b3d5
minor bug
hsirkar Mar 30, 2023
ccc1aff
Add trim function
hsirkar Mar 30, 2023
7e00dba
fix bugs + doc
hsirkar Mar 30, 2023
6eb2e48
Update API
hsirkar Mar 31, 2023
8c90b3d
Go back to _eval implementation, add trim
hsirkar Apr 3, 2023
8b03cb3
Update doc
hsirkar Apr 3, 2023
f75d3db
Merge remote-tracking branch 'origin/develop' into query
hsirkar Apr 3, 2023
a1b44c8
Get rid of stuff we don't need
hsirkar Apr 3, 2023
31000e7
minor
hsirkar Apr 3, 2023
2490956
Rename selection >> filter
hsirkar Apr 3, 2023
0f9d03b
delete inc/exc time calculations
hsirkar Apr 5, 2023
6182c4c
minor
hsirkar Apr 5, 2023
cb7aa3c
minor
hsirkar Apr 5, 2023
f6240b4
Add docs, comments
hsirkar Apr 22, 2023
22e2e96
Merge remote-tracking branch 'origin/develop' into query
hsirkar Apr 22, 2023
92340b2
clean up
hsirkar Apr 22, 2023
9c6cd94
Rewrite Filter._eval logic
hsirkar Apr 26, 2023
8c95f7a
update functions to work with filtered traces
hsirkar May 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions pipit/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
from .util import parse_time


class Filter:
"""
A filter that can be used to select a subset of events from a Trace instance
based on a condition on a field, like `Name == "MPI_Init"` or `Process > 5`.

Filter instances can be modified with the AND, OR, and NOT logical operators.
"""

def __init__(
self,
field=None,
operator=None,
value=None,
expr=None,
):
"""
Args:
field (str, optional): DataFrame column to filter on.

operator (str, optional): The comparison operator to use for filtering.
Available operators are `<`, `<=`, `==`, `>=`, `>`, `!=`, `in`, `not-in`,
and `between`.

value (optional): The value to compare against when filtering. If operator
is `in` or `not-in`, this must be a list of values. If operator is
`between`, this must be a list of 2 elements, containing the start
and end values.

expr (str, optional): Pandas expression that can be provided as an
alternative to the field, operator, and value parameters. When evaluated
with `pandas.DataFrame.eval`, it should return a boolean mask indicating
whether each event should be included in the filtered Trace.
See https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.eval.html. # noqa: E501
"""
self.field = field
self.operator = operator
self.value = value
self.expr = expr

def __and__(self, other):
"""Returns a new Filter instance that combines this filter and another filter
with the logical AND operator.
"""
return And(self, other)

def __or__(self, other):
"""Returns a new Filter instance that combines this filter and another filter
with the logical OR operator.
"""
return Or(self, other)

def __invert__(self):
"""Returns a new Filter instance that negates this filter with the logical NOT
operator.
"""
return Not(self)

def __repr__(self):
"""Returns a string representation of this filter."""
if self.expr is not None:
return f"Filter {self.expr.__repr__()}"

else:
return (
f"Filter {self.field.__repr__()} "
+ f"{self.operator} {self.value.__repr__()}"
)

def _eval(self, trace):
"""Evaluatea this filter on a Trace.

Returns:
pd.Series: Boolean mask that indicates whether each event should be included
in the filtered Trace.
"""
# If an expression is provided, evaluate it using pd.DataFrame.eval
if self.expr is not None:
return trace.events.eval(self.expr)

# Otherwise, evaluate the filter using speficied field, operator, and value
field, operator, value = self.field, self.operator, self.value

# Convert value to float if filtering on a time field
if field and "time" in field.lower():
value = parse_time(value)

# Evaluate the filter
# If field is not Timestamp, then evaluation is straightforward
if field != "Timestamp (ns)":
if operator == "==":
result = trace.events[field] == value
elif operator == "!=":
result = trace.events[field] != value
elif operator == "<":
result = trace.events[field] < value
elif operator == "<=":
result = trace.events[field] <= value
elif operator == ">":
result = trace.events[field] > value
elif operator == ">=":
result = trace.events[field] >= value
elif operator == "in":
result = trace.events[field].isin(value)
elif operator == "not-in":
result = ~trace.events[field].isin(value)
elif operator == "between":
result = (trace.events[field] >= value[0]) & (
trace.events[field] <= value[1]
)
else:
raise ValueError(
f'Invalid comparison operator "{operator}" for field "{field}"'
)
else:
# We need to ensure that if any of function duration is in the
# time range, then both Enter and Leave events are included in the mask
trace._match_events()

# Extract start and end timestamps if operator is <, <=, >, >=, or between
start, end = float("-inf"), float("inf")

if operator == "<" or operator == "<=":
end = value
elif operator == ">" or operator == ">=":
start = value
elif operator == "between":
start, end = value
else:
raise ValueError(
f'Invalid comparison operator "{operator}" for field "{field}"'
)

# Handle each event type separately
result = (
(
(trace.events["Event Type"] == "Instant")
& (trace.events["Timestamp (ns)"] >= start)
& (trace.events["Timestamp (ns)"] <= end)
)
| (
(trace.events["Event Type"] == "Enter")
& (trace.events["_matching_timestamp"] >= start)
& (trace.events["Timestamp (ns)"] <= end)
)
| (
(trace.events["Event Type"] == "Leave")
& (trace.events["Timestamp (ns)"] >= start)
& (trace.events["_matching_timestamp"] <= end)
)
)

return result


class And(Filter):
"""Combines multiple Filter objects with a logical AND, such that all of the
filters must be met."""

def __init__(self, *args):
super().__init__()
self.filters = args

def _eval(self, trace):
# Evaluate the first filter on the trace
results = self.filters[0]._eval(trace)

# Evaluate the rest of the filters, one at a time,
# and AND the result each time
for i in range(1, len(self.filters)):
results = results & self.filters[i]._eval(trace)

return results

def __repr__(self):
return " And ".join(f"({x.__repr__()})" for x in self.filters)


class Or(Filter):
"""Combines multiple Filter objects with a logical OR, such that any of the
filters must be met."""

def __init__(self, *args):
super().__init__()
self.filters = args

def _eval(self, trace):
# Evaluate the first filter on the trace
results = self.filters[0]._eval(trace)

# Evaluate the rest of the filters, one at a time,
# and OR the result each time
for i in range(1, len(self.filters)):
results = results | self.filters[i]._eval(trace)

return results

def __repr__(self):
return " Or ".join(f"({x.__repr__()})" for x in self.filters)


class Not(Filter):
"""Inverts Filter object with a logical NOT, such that the filter must not be
met."""

def __init__(self, filter):
super().__init__()
self.filter = filter

def _eval(self, trace):
return ~self.filter._eval(trace)

def __repr__(self):
return f"Not ({self.filter.__repr__()})"
102 changes: 102 additions & 0 deletions pipit/tests/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from pipit import Trace


def all_equal(*dfs):
return all([dfs[0].equals(df) for df in dfs])


def test_eval(data_dir, ping_pong_otf2_trace):
trace = Trace.from_otf2(str(ping_pong_otf2_trace))

# Test each operator
assert all_equal(
trace._eval("Process", "==", 0),
trace._eval(expr="`Process` == 0"),
trace.events["Process"] == 0,
)
assert all_equal(
trace._eval("Name", "!=", "MPI_Init"),
trace._eval(expr="`Name` != 'MPI_Init'"),
trace.events["Name"] != "MPI_Init",
)
assert all_equal(
trace._eval("Timestamp (ns)", "<", "500 ns"),
trace._eval("Timestamp (ns)", "<", 500),
trace._eval(expr="`Timestamp (ns)` < 500"),
trace.events["Timestamp (ns)"] < 500,
)
assert all_equal(
trace._eval("Timestamp (ns)", ">", "199.6 ms"),
trace._eval("Timestamp (ns)", ">", 1.996e8),
trace._eval(expr="`Timestamp (ns)` > 1.996e8"),
trace.events["Timestamp (ns)"] > 1.996e8,
)
assert all_equal(
trace._eval("Name", "in", ["MPI_Send", "MPI_Recv"]),
trace._eval(expr="`Name`.isin(['MPI_Send', 'MPI_Recv'])"),
trace.events["Name"].isin(["MPI_Send", "MPI_Recv"]),
)
assert all_equal(
trace._eval("Name", "not-in", ["MPI_Send", "MPI_Recv"]),
trace._eval(expr="~(`Name`.isin(['MPI_Send', 'MPI_Recv']))"),
~trace.events["Name"].isin(["MPI_Send", "MPI_Recv"]),
)
assert all_equal(
trace._eval("Timestamp (ns)", "between", ["50 ns", "199.6 ms"]),
(trace.events["Timestamp (ns)"] > 50)
& (trace.events["Timestamp (ns)"] < 1.996e8),
)

# Test logical operators NOT, AND, and OR
from pipit.filter import Filter

f1 = Filter("Timestamp (ns)", "between", ["130.52 ms", "136.57 ms"])
f2 = Filter("Name", "in", ["MPI_Send", "MPI_Recv"])
f3 = Filter("Process", "==", 0)

assert all_equal(trace._eval(~f3), ~trace._eval(f3))

assert all_equal(
trace._eval(f1 & f2 & f3),
trace._eval(f1) & trace._eval(f2) & trace._eval(f3),
)
assert all_equal(
trace._eval(f1 | f2 | f3),
trace._eval(f1) | trace._eval(f2) | trace._eval(f3),
)


def test_filter(data_dir, ping_pong_otf2_trace):
trace = Trace.from_otf2(str(ping_pong_otf2_trace))

assert all_equal(
trace.filter("Process", "==", 0).definitions,
trace.definitions,
)

assert all_equal(
trace.filter("Process", "==", 0).events,
trace.events[trace.events["Process"] == 0],
)


def test_slice(data_dir, ping_pong_otf2_trace):
trace = Trace.from_otf2(str(ping_pong_otf2_trace))

assert all_equal(trace.slice().events, trace.events)
assert all_equal(
trace.slice(0, 1).events, trace.events[trace.events["Timestamp (ns)"] == 0]
)

filtered = trace.filter("Timestamp (ns)", "between", [1e5, 1e6]).events
sliced_not_clipped = trace.slice(1e5, 1e6, clip_values=False).events
sliced = trace.slice(1e5, 1e6).events

assert all_equal(filtered, sliced_not_clipped)
assert all_equal(filtered.index, sliced.index)
assert not all_equal(filtered, sliced)

assert sliced["Timestamp (ns)"].min() >= 1e5
assert sliced["_matching_timestamp"].min() >= 1e5
assert sliced["Timestamp (ns)"].max() <= 1e6
assert sliced["_matching_timestamp"].max() <= 1e6
30 changes: 30 additions & 0 deletions pipit/tests/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from pipit.util import parse_time
import numpy as np


def test_parse_time():
ns = [1.23456789 * 10.0**x for x in np.arange(-2, 17)]
hr = [
"0.01 ns",
"0.12 ns",
"1.23 ns",
"12.35 ns",
"123.46 ns",
"1.23 us",
"12.35 us",
"123.46 us",
"1.23 ms",
"12.35 ms",
"123.46 ms",
"1.23 s",
"12.35 s",
"123.46 s",
"20m 34s",
"3hr 25m",
"34hr 17m",
"14d 6hr",
"142d 21hr",
]

for i in range(0, len(ns)):
np.testing.assert_approx_equal(parse_time(hr[i]), ns[i], 1)
Loading