Skip to content

Commit

Permalink
Merge pull request #114 from sdementen/add_sample_interval
Browse files Browse the repository at this point in the history
Add updated sample interval
  • Loading branch information
stringertheory authored Feb 5, 2024
2 parents a9b2768 + c3ffaaa commit 378a43c
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 1 deletion.
111 changes: 110 additions & 1 deletion tests/test_traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import pickle
from datetime import datetime

from pandas.util.testing import assert_series_equal

from traces import TimeSeries
import pytest

from traces import TimeSeries
Expand Down Expand Up @@ -76,7 +79,6 @@ def test_merge():
ts_c = TimeSeries.merge([])
assert list(ts_c.items()) == []


def test_set_interval():
ts = TimeSeries()

Expand Down Expand Up @@ -140,6 +142,7 @@ def test_set_interval_datetime():
]



def test_remove_points_from_interval():
ts = TimeSeries(default=0)
ts[0] = 0
Expand Down Expand Up @@ -167,6 +170,111 @@ def test_remove_points_from_interval():
assert ts[5] == 0


def test_sample_interval_days():
import pandas as pd
ts = Domain([(datetime(2012, 1, 1), 400),
(datetime(2012, 3, 1), 400)])

ts[datetime(2012, 1, 4):datetime(2012, 1, 20)] = 10
ts[datetime(2012, 1, 25):datetime(2012, 2, 7)] = 50
ts[datetime(2012, 1, 19):datetime(2012, 1, 27)] = 0

sr = ts.sample_interval(sampling_period=timedelta(days=1), end=datetime(2012, 2, 1))
assert list(sr.iteritems()) == [(pd.Timestamp('2012-01-01 00:00:00'), 400.0),
(pd.Timestamp('2012-01-02 00:00:00'), 400.0),
(pd.Timestamp('2012-01-03 00:00:00'), 400.0),
(pd.Timestamp('2012-01-04 00:00:00'), 10.0),
(pd.Timestamp('2012-01-05 00:00:00'), 10.0),
(pd.Timestamp('2012-01-06 00:00:00'), 10.0),
(pd.Timestamp('2012-01-07 00:00:00'), 10.0),
(pd.Timestamp('2012-01-08 00:00:00'), 10.0),
(pd.Timestamp('2012-01-09 00:00:00'), 10.0),
(pd.Timestamp('2012-01-10 00:00:00'), 10.0),
(pd.Timestamp('2012-01-11 00:00:00'), 10.0),
(pd.Timestamp('2012-01-12 00:00:00'), 10.0),
(pd.Timestamp('2012-01-13 00:00:00'), 10.0),
(pd.Timestamp('2012-01-14 00:00:00'), 10.0),
(pd.Timestamp('2012-01-15 00:00:00'), 10.0),
(pd.Timestamp('2012-01-16 00:00:00'), 10.0),
(pd.Timestamp('2012-01-17 00:00:00'), 10.0),
(pd.Timestamp('2012-01-18 00:00:00'), 10.0),
(pd.Timestamp('2012-01-19 00:00:00'), 0.0),
(pd.Timestamp('2012-01-20 00:00:00'), 0.0),
(pd.Timestamp('2012-01-21 00:00:00'), 0.0),
(pd.Timestamp('2012-01-22 00:00:00'), 0.0),
(pd.Timestamp('2012-01-23 00:00:00'), 0.0),
(pd.Timestamp('2012-01-24 00:00:00'), 0.0),
(pd.Timestamp('2012-01-25 00:00:00'), 0.0),
(pd.Timestamp('2012-01-26 00:00:00'), 0.0),
(pd.Timestamp('2012-01-27 00:00:00'), 50.0),
(pd.Timestamp('2012-01-28 00:00:00'), 50.0),
(pd.Timestamp('2012-01-29 00:00:00'), 50.0),
(pd.Timestamp('2012-01-30 00:00:00'), 50.0),
(pd.Timestamp('2012-01-31 00:00:00'), 50.0)]


def test_sample_interval_hours():
import pandas as pd

ts = Domain([(datetime(2012, 1, 1), 400),
(datetime(2012, 1, 10), 400)])

ts[datetime(2012, 1, 4, 12):datetime(2012, 1, 6, 20)] = 10
ts[datetime(2012, 1, 7, 9):datetime(2012, 1, 10)] = 50

sr = ts.sample_interval(sampling_period=timedelta(days=1))
assert list(sr.iteritems()) == [(pd.Timestamp('2012-01-01 00:00:00'), 400.0),
(pd.Timestamp('2012-01-02 00:00:00'), 400.0),
(pd.Timestamp('2012-01-03 00:00:00'), 400.0),
(pd.Timestamp('2012-01-04 00:00:00'), 205.0),
(pd.Timestamp('2012-01-05 00:00:00'), 10.0),
(pd.Timestamp('2012-01-06 00:00:00'), 75.0),
(pd.Timestamp('2012-01-07 00:00:00'), 181.25),
(pd.Timestamp('2012-01-08 00:00:00'), 50.0),
(pd.Timestamp('2012-01-09 00:00:00'), 50.0)]

sr = ts.sample_interval(sampling_period=timedelta(days=1), operation="max")
assert list(sr.iteritems()) == [(pd.Timestamp('2012-01-01 00:00:00'), 400.0),
(pd.Timestamp('2012-01-02 00:00:00'), 400.0),
(pd.Timestamp('2012-01-03 00:00:00'), 400.0),
(pd.Timestamp('2012-01-04 00:00:00'), 400.0),
(pd.Timestamp('2012-01-05 00:00:00'), 10.0),
(pd.Timestamp('2012-01-06 00:00:00'), 400.0),
(pd.Timestamp('2012-01-07 00:00:00'), 400.0),
(pd.Timestamp('2012-01-08 00:00:00'), 50.0),
(pd.Timestamp('2012-01-09 00:00:00'), 50.0)]

sr = ts.sample_interval(sampling_period=timedelta(days=1), operation="min")
assert list(sr.iteritems()) == [(pd.Timestamp('2012-01-01 00:00:00'), 400.0),
(pd.Timestamp('2012-01-02 00:00:00'), 400.0),
(pd.Timestamp('2012-01-03 00:00:00'), 400.0),
(pd.Timestamp('2012-01-04 00:00:00'), 10.0),
(pd.Timestamp('2012-01-05 00:00:00'), 10.0),
(pd.Timestamp('2012-01-06 00:00:00'), 10.0),
(pd.Timestamp('2012-01-07 00:00:00'), 50.0),
(pd.Timestamp('2012-01-08 00:00:00'), 50.0),
(pd.Timestamp('2012-01-09 00:00:00'), 50.0)]


def test_sample_interval_index():
import pandas as pd

start = datetime(2012, 1, 1)
end = datetime(2012, 1, 10)

ts = Domain([(start, 400),
(end, 400)])

ts[datetime(2012, 1, 4, 12):datetime(2012, 1, 6, 20)] = 10
ts[datetime(2012, 1, 7, 9):datetime(2012, 1, 10)] = 50

idx = pd.date_range(start, end, freq="D")
sr = ts.sample_interval(sampling_period=timedelta(days=1))
sr2 = ts.sample_interval(idx=idx)

assert_series_equal(sr, sr2)


def test_pickle():
ts = TimeSeries(default=False)
ts[1] = True
Expand Down Expand Up @@ -256,3 +364,4 @@ def test_convenience_access_methods():
assert ts.last_item() == (8, 4)
assert ts.get_item_by_index(0) == (1, 2)
assert ts.get_item_by_index(-1) == (8, 4)

128 changes: 128 additions & 0 deletions traces/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import sortedcontainers
from infinity import inf


from . import histogram, operations, plot, utils

NotGiven = object()
Expand Down Expand Up @@ -343,6 +344,7 @@ def iterperiods(self, start=None, end=None, value=None):
value_function = self._value_function(value)

# get start index and value

start_index = self._d.bisect_right(start)
if start_index:
_, start_value = self._d.peekitem(start_index - 1)
Expand Down Expand Up @@ -444,6 +446,132 @@ def sample(
current_time += sampling_period
return result

def sample_interval(self, sampling_period=None,
start=None, end=None,
idx=None,
operation="mean"):
"""Sampling on intervals by using some operation (mean,max,min).
It can be called either with sampling_period, [start], [end]
or with a idx as a DateTimeIndex.
The returing pandas.Series will be indexed either on
pandas.date_range(start,end,freq=sampling_period) or on idx.
:param sampling_period: the sampling period
:param start: the start time of the sampling
:param end: the end time of the sampling
:param idx: a DateTimeIndex with the start times of the intervals
:param operation: "mean", "max" or "min"
:return: a pandas Series with the Trace sampled
"""

try:
import pandas as pd
except ImportError:
msg = "sample_interval need pandas to be installed"
raise ImportError(msg)

if idx is None:
start, end, mask = self._check_boundaries(start, end)
sampling_period = self._check_regularization(start, end,
sampling_period)
# create index on [start, end)
idx = pd.date_range(start, end, freq=sampling_period, closed=None)
else:
start, end, mask = self._check_boundaries(idx[0], idx[-1])

idx_list = idx.values # list(idx)

# create all inflexion points
def items_in_horizon():
# yields all items between start and end as well as start and end
yield (start, self[start])
for t, v in self.items():
if t <= start:
continue
if t >= end:
break
yield t, v
yield (end, self[end])

inflexion_times, inflexion_values = zip(*items_in_horizon())
inflexion_times = pd.DatetimeIndex(inflexion_times)

# identify all inflexion intervals
# by index: point i is in interval [idx[ifl_int[i]], idx[ifl_int[i]+1]
# TODO: look to use searchsorted as it operates more
# TODO: efficienly (but offset of 1 in most cases)
inflexion_intervals = inflexion_times.map(
lambda t: idx.get_loc(t, method="ffill"))

# convert DatetimeIndex to numpy array for faster indexation
inflexion_times = inflexion_times.values

Np1 = len(idx_list) - 1

# convert to timestamp
# (to make interval arithmetic faster, no need for total_seconds)
inflexion_times = (inflexion_times.astype("int64"))
idx_times = (idx.astype("int64"))

# initialise init, update and finish functions depending
# on the aggregation operator
init, update, finish = {
"mean": (
lambda t, v: 0.0,
lambda agg, t0, t1, v: agg + (t1 - t0) * v,
lambda agg, t_start, t_end: agg / (t_end - t_start),
),
"max": (
lambda t, v: v,
lambda agg, t0, t1, v: max(agg, v),
lambda agg, t_start, t_end: agg,
),
"min": (
lambda t, v: v,
lambda agg, t0, t1, v: min(agg, v),
lambda agg, t_start, t_end: agg,
),
}[operation]

# initialise first interval
t_start, t_end = idx_times[0:2]
i0, t0, v0 = 0, t_start, self[start]
agg = init(t0, v0)

result = []
for i1, t1, v1 in zip(inflexion_intervals,
inflexion_times,
inflexion_values):
if i0 != i1:
# change of interval

# finish previous interval
agg = update(agg, t0, t_end, v0)
agg = finish(agg, t_start, t_end)
result.append((idx_list[i0], agg))

# handle all intervals between t_end and t1
if i1 != i0 + 1:
result.append((idx_list[i0 + 1], v0))

# if last_point, break
if i1 == Np1:
break

# set up new interval
t_start, t_end = idx_times[i1:i1 + 2]
i0, t0 = i1, t_start
agg = init(t0, v0)

agg = update(agg, t0, t1, v0)

i0, t0, v0 = i1, t1, v1

df = pd.DataFrame.from_records(result)
return df.set_index(0).iloc[:, 0].reindex(idx[:-1]).ffill()

def moving_average( # noqa: C901
self,
sampling_period,
Expand Down

0 comments on commit 378a43c

Please sign in to comment.