Skip to content

Commit

Permalink
Merge pull request #1 from pecigonzalo/fix/stream-detection
Browse files Browse the repository at this point in the history
Use LastIngestionTime as LastEventTimestamp has eventual consistency
  • Loading branch information
pecigonzalo authored Jan 14, 2019
2 parents 6d04f79 + 98e9d68 commit 0640385
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0
0.1.1
4 changes: 2 additions & 2 deletions cmd/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func streams(cmd *cobra.Command, args []string) error {
}

sort.Slice(streams, func(i, j int) bool {
return *streams[i].LastEventTimestamp > *streams[j].LastEventTimestamp
return *streams[i].LastIngestionTime > *streams[j].LastIngestionTime
})

w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, '\t', 0)
Expand All @@ -59,7 +59,7 @@ func streams(cmd *cobra.Command, args []string) error {
for _, stream := range streams {
fmt.Fprintf(w, "%s\t%s\t%s\n",
*stream.LogStreamName,
lib.ParseAWSTimestamp(stream.LastEventTimestamp).Local().Format(lib.ShortTimeFormat),
lib.ParseAWSTimestamp(stream.LastIngestionTime).Local().Format(lib.ShortTimeFormat),
lib.ParseAWSTimestamp(stream.CreationTime).Local().Format(lib.ShortTimeFormat),
)
}
Expand Down
14 changes: 6 additions & 8 deletions lib/cwreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru"
)

const (
Expand Down Expand Up @@ -150,9 +150,9 @@ func (c *CloudwatchLogsReader) getLogStreams() ([]*cloudwatchlogs.LogStream, err
if len(streams) >= MaxStreams {
return false
}
if s.LastEventTimestamp == nil {
if s.LastIngestionTime == nil {
// treat nil timestamps as 0
s.LastEventTimestamp = aws.Int64(0)
s.LastIngestionTime = aws.Int64(0)
}

// if we are sorting by time, we can do some shortcuts to end
Expand All @@ -162,7 +162,7 @@ func (c *CloudwatchLogsReader) getLogStreams() ([]*cloudwatchlogs.LogStream, err
if s.CreationTime != nil && *s.CreationTime > endTimestamp {
continue
}
if *s.LastEventTimestamp < startTimestamp {
if *s.LastIngestionTime < startTimestamp {
pastWindow = true
break
}
Expand All @@ -172,7 +172,7 @@ func (c *CloudwatchLogsReader) getLogStreams() ([]*cloudwatchlogs.LogStream, err
// otherwise we have to check all pages, but there are fewer because
// we are prefix matching
if s.CreationTime != nil && *s.CreationTime < endTimestamp &&
*s.LastEventTimestamp > startTimestamp {
*s.LastIngestionTime > startTimestamp {
streams = append(streams, s)
}
}
Expand All @@ -187,7 +187,7 @@ func (c *CloudwatchLogsReader) getLogStreams() ([]*cloudwatchlogs.LogStream, err
}); err != nil {
return nil, err
}
sort.Slice(streams[:], func(i, j int) bool { return *streams[i].LastEventTimestamp > *streams[j].LastEventTimestamp })
sort.Slice(streams[:], func(i, j int) bool { return *streams[i].LastIngestionTime > *streams[j].LastIngestionTime })
if len(streams) == 0 {
if c.streamPrefix != "" {
return nil, fmt.Errorf("No log streams found matching task prefix '%s' in your time window. Consider adjusting your time window with --since and/or --until", c.streamPrefix)
Expand Down Expand Up @@ -270,11 +270,9 @@ func (c *CloudwatchLogsReader) Error() error {
}

func streamsToNames(streams []*cloudwatchlogs.LogStream) []*string {
fmt.Println(streams)
names := make([]*string, 0, len(streams))
for _, s := range streams {
names = append(names, s.LogStreamName)
}
fmt.Println(names)
return names
}

0 comments on commit 0640385

Please sign in to comment.