Skip to content

Commit 400c8b8

Browse files
authored
Store NextToken and increase LRU size (#97)
1 parent 0dcb04f commit 400c8b8

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

lib/cwreader.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ func NewCloudwatchLogsReader(group string, streamPrefix string, start time.Time,
5454

5555
svc := cloudwatchlogs.NewFromConfig(cfg)
5656

57-
cache, err := lru.New[string, any](MaxEventsPerCall)
57+
// Twice the size of the MaxEventsPerCall to be on the safe side
58+
cache, err := lru.New[string, any](MaxEventsPerCall * 2)
5859
if err != nil {
5960
return nil, err
6061
}
@@ -219,7 +220,6 @@ func (c *CloudwatchLogsReader) StreamEvents(ctx context.Context, follow bool) <-
219220
}
220221

221222
func (c *CloudwatchLogsReader) pumpEvents(ctx context.Context, eventChan chan<- Event, follow bool) {
222-
223223
startTime := c.start.Unix() * 1e3
224224
params := &cloudwatchlogs.FilterLogEventsInput{
225225
Interleaved: aws.Bool(true),
@@ -254,8 +254,9 @@ func (c *CloudwatchLogsReader) pumpEvents(ctx context.Context, eventChan chan<-
254254
close(eventChan)
255255
return
256256
} else {
257+
params.NextToken = page.NextToken
257258
for _, event := range page.Events {
258-
if _, ok := c.eventCache.Peek(*event.EventId); !ok {
259+
if !c.eventCache.Contains(*event.EventId) {
259260
eventChan <- NewEvent(event, c.logGroupName)
260261
c.eventCache.Add(*event.EventId, nil)
261262
}

0 commit comments

Comments
 (0)