Skip to content

Commit fb47d78

Browse files
author
Doug Schmidt
authored
Merge pull request #77 from jessemcdowell-AI/BatchLargeAppends
Issue-73 Batch large appends
2 parents 1612962 + 6bca2f4 commit fb47d78

File tree

3 files changed

+75
-38
lines changed

3 files changed

+75
-38
lines changed

TimeSeries/PublicApis/SdkExamples/PointZilla/Context.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class Context
3434
public Instant StartTime { get; set; } = Instant.FromDateTimeUtc(DateTime.UtcNow);
3535
public TimeSpan PointInterval { get; set; } = TimeSpan.FromMinutes(1);
3636
public int NumberOfPoints { get; set; } // 0 means "derive the point count from number of periods"
37+
public int BatchSize { get; set; } = 500_000;
3738
public double NumberOfPeriods { get; set; } = 1;
3839
public WaveformType WaveformType { get; set; } = WaveformType.SineWave;
3940
public double WaveformOffset { get; set; } = 0;

TimeSeries/PublicApis/SdkExamples/PointZilla/PointsAppender.cs

Lines changed: 73 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -60,60 +60,95 @@ public void AppendPoints()
6060
? $"Appending {Points.Count} {pointExtents} within TimeRange={GetTimeRange()} to {timeSeries.Identifier} ({timeSeries.TimeSeriesType}) ..."
6161
: $"Appending {Points.Count} {pointExtents} to {timeSeries.Identifier} ({timeSeries.TimeSeriesType}) ...");
6262

63+
var numberOfPointsAppended = 0;
64+
var numberOfPointsDeleted = 0;
65+
var batchCount = 0;
6366
var stopwatch = Stopwatch.StartNew();
6467

65-
AppendResponse appendResponse;
68+
foreach (var batch in GetPointBatches())
69+
{
70+
var result = AppendPointBatch(client, timeSeries, batch.Item1, batch.Item2, isReflected, hasTimeRange);
71+
numberOfPointsAppended += result.NumberOfPointsAppended;
72+
numberOfPointsDeleted += result.NumberOfPointsDeleted;
73+
batchCount++;
74+
75+
if (result.AppendStatus != AppendStatusCode.Completed)
76+
throw new ExpectedException($"Unexpected append status={result.AppendStatus}");
77+
}
78+
79+
var batchText = batchCount > 1 ? $" using {batchCount} appends" : "";
80+
Log.Info($"Appended {numberOfPointsAppended} points (deleting {numberOfPointsDeleted} points) in {stopwatch.ElapsedMilliseconds / 1000.0:F1} seconds{batchText}.");
81+
}
82+
}
83+
84+
private TimeSeriesAppendStatus AppendPointBatch(IAquariusClient client, TimeSeries timeSeries, List<ReflectedTimeSeriesPoint> points, Interval timeRange, bool isReflected, bool hasTimeRange)
85+
{
86+
AppendResponse appendResponse;
87+
88+
if (isReflected)
89+
{
90+
appendResponse = client.Acquisition.Post(new PostReflectedTimeSeries
91+
{
92+
UniqueId = timeSeries.UniqueId,
93+
TimeRange = timeRange,
94+
Points = points
95+
});
96+
}
97+
else
98+
{
99+
var basicPoints = points
100+
.Select(p => new TimeSeriesPoint
101+
{
102+
Time = p.Time,
103+
Value = p.Value
104+
})
105+
.ToList();
66106

67-
if (isReflected)
107+
if (hasTimeRange)
68108
{
69-
appendResponse = client.Acquisition.Post(new PostReflectedTimeSeries
109+
appendResponse = client.Acquisition.Post(new PostTimeSeriesOverwriteAppend
70110
{
71111
UniqueId = timeSeries.UniqueId,
72-
TimeRange = GetTimeRange(),
73-
Points = Points
112+
TimeRange = timeRange,
113+
Points = basicPoints
74114
});
75115
}
76116
else
77117
{
78-
var basicPoints = Points
79-
.Select(p => new TimeSeriesPoint
80-
{
81-
Time = p.Time,
82-
Value = p.Value
83-
})
84-
.ToList();
85-
86-
if (hasTimeRange)
87-
{
88-
appendResponse = client.Acquisition.Post(new PostTimeSeriesOverwriteAppend
89-
{
90-
UniqueId = timeSeries.UniqueId,
91-
TimeRange = GetTimeRange(),
92-
Points = basicPoints
93-
});
94-
}
95-
else
118+
appendResponse = client.Acquisition.Post(new PostTimeSeriesAppend
96119
{
97-
appendResponse = client.Acquisition.Post(new PostTimeSeriesAppend
98-
{
99-
UniqueId = timeSeries.UniqueId,
100-
Points = basicPoints
101-
});
102-
}
120+
UniqueId = timeSeries.UniqueId,
121+
Points = basicPoints
122+
});
103123
}
124+
}
104125

105-
var result = client.Acquisition.RequestAndPollUntilComplete(
106-
acquisition => appendResponse,
107-
(acquisition, response) => acquisition.Get(new GetTimeSeriesAppendStatus { AppendRequestIdentifier = response.AppendRequestIdentifier }),
108-
polledStatus => polledStatus.AppendStatus != AppendStatusCode.Pending,
109-
null,
110-
Context.AppendTimeout);
126+
return client.Acquisition.RequestAndPollUntilComplete(
127+
acquisition => appendResponse,
128+
(acquisition, response) => acquisition.Get(new GetTimeSeriesAppendStatus { AppendRequestIdentifier = response.AppendRequestIdentifier }),
129+
polledStatus => polledStatus.AppendStatus != AppendStatusCode.Pending,
130+
null,
131+
Context.AppendTimeout);
132+
}
133+
134+
private IEnumerable<Tuple<List<ReflectedTimeSeriesPoint>, Interval>> GetPointBatches()
135+
{
136+
var points = GetPoints();
137+
var remainingTimeRange = GetTimeRange();
111138

112-
if (result.AppendStatus != AppendStatusCode.Completed)
113-
throw new ExpectedException($"Unexpected append status={result.AppendStatus}");
139+
var index = 0;
140+
while (points.Count - index > Context.BatchSize)
141+
{
142+
var batchPoints = points.Skip(index).Take(Context.BatchSize).ToList();
143+
var batchTimeRange = new Interval(remainingTimeRange.Start, batchPoints.Last().Time.GetValueOrDefault().PlusTicks(1));
144+
remainingTimeRange = new Interval(batchTimeRange.End, remainingTimeRange.End);
145+
146+
yield return new Tuple<List<ReflectedTimeSeriesPoint>, Interval>(batchPoints, batchTimeRange);
114147

115-
Log.Info($"Appended {result.NumberOfPointsAppended} points (deleting {result.NumberOfPointsDeleted} points) in {stopwatch.ElapsedMilliseconds / 1000.0:F1} seconds.");
148+
index += Context.BatchSize;
116149
}
150+
151+
yield return new Tuple<List<ReflectedTimeSeriesPoint>, Interval>(points.Skip(index).ToList(), remainingTimeRange);
117152
}
118153

119154
private List<ReflectedTimeSeriesPoint> GetPoints()

TimeSeries/PublicApis/SdkExamples/PointZilla/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ private static Context ParseArgs(string[] args)
9797

9898
new Option {Key = nameof(context.Wait), Setter = value => context.Wait = bool.Parse(value), Getter = () => context.Wait.ToString(), Description = "Wait for the append request to complete"},
9999
new Option {Key = nameof(context.AppendTimeout), Setter = value => context.AppendTimeout = TimeSpan.Parse(value), Getter = () => context.AppendTimeout.ToString(), Description = "Timeout period for append completion, in .NET TimeSpan format."},
100+
new Option {Key = nameof(context.BatchSize), Setter = value => context.BatchSize = int.Parse(value), Getter = () => context.BatchSize.ToString(), Description = "Maximum number of points to send in a single append request"},
100101

101102
new Option(), new Option {Description = "Time-series options:"},
102103
new Option {Key = nameof(context.TimeSeries), Setter = value => context.TimeSeries = value, Getter = () => context.TimeSeries, Description = "Target time-series identifier or unique ID"},

0 commit comments

Comments
 (0)