Skip to content

Commit

Permalink
feat: dedupe duplicate delete requests while loading them for process…
Browse files Browse the repository at this point in the history
…ing in compactor (#15852)
  • Loading branch information
sandeepsukhani authored Jan 22, 2025
1 parent ebc84ca commit 81940c8
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 0 deletions.
28 changes: 28 additions & 0 deletions pkg/compactor/deletion/delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

Expand Down Expand Up @@ -160,6 +161,33 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, filter.Func
return true, ff
}

func (d *DeleteRequest) IsDuplicate(o *DeleteRequest) (bool, error) {
// we would never have duplicates from same request
if d.RequestID == o.RequestID {
return false, nil
}
if d.UserID != o.UserID || d.StartTime != o.StartTime || d.EndTime != o.EndTime {
return false, nil
}

if d.logSelectorExpr == nil {
if err := d.SetQuery(d.Query); err != nil {
return false, errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", d.RequestID, d.UserID)
}
}
if o.logSelectorExpr == nil {
if err := o.SetQuery(o.Query); err != nil {
return false, errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", o.RequestID, o.UserID)
}
}

if d.logSelectorExpr.String() != o.logSelectorExpr.String() {
return false, nil
}

return true, nil
}

func intervalsOverlap(interval1, interval2 model.Interval) bool {
if interval1.Start > interval2.End || interval2.Start > interval1.End {
return false
Expand Down
141 changes: 141 additions & 0 deletions pkg/compactor/deletion/delete_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,144 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) })
})
}

func TestDeleteRequest_IsDuplicate(t *testing.T) {
query1 := `{foo="bar", fizz="buzz"} |= "foo"`
query2 := `{foo="bar", fizz="buzz2"} |= "foo"`

for _, tc := range []struct {
name string
req1, req2 DeleteRequest
expIsDuplicate bool
}{
{
name: "not duplicate - different user id",
req1: DeleteRequest{
RequestID: "1",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
req2: DeleteRequest{
RequestID: "1",
UserID: user2,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
expIsDuplicate: false,
},
{
name: "not duplicate - same request id",
req1: DeleteRequest{
RequestID: "1",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
req2: DeleteRequest{
RequestID: "1",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
expIsDuplicate: false,
},
{
name: "not duplicate - different start time",
req1: DeleteRequest{
RequestID: "1",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
req2: DeleteRequest{
RequestID: "2",
UserID: user1,
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
},
{
name: "not duplicate - different end time",
req1: DeleteRequest{
RequestID: "1",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
req2: DeleteRequest{
RequestID: "2",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-11 * time.Hour),
Query: query1,
},
},
{
name: "not duplicate - different labels",
req1: DeleteRequest{
RequestID: "1",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
req2: DeleteRequest{
RequestID: "2",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query2,
},
},
{
name: "duplicate - same request",
req1: DeleteRequest{
RequestID: "1",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
req2: DeleteRequest{
RequestID: "2",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
expIsDuplicate: true,
},
{
name: "duplicate - same request with irregularities in query",
req1: DeleteRequest{
RequestID: "1",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: query1,
},
req2: DeleteRequest{
RequestID: "2",
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Query: "{foo=\"bar\", fizz=`buzz`} |= `foo`",
},
expIsDuplicate: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
isDuplicate, err := tc.req1.IsDuplicate(&tc.req2)
require.NoError(t, err)
require.Equal(t, tc.expIsDuplicate, isDuplicate)
})
}
}
27 changes: 27 additions & 0 deletions pkg/compactor/deletion/delete_requests_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type DeleteRequestsManager struct {

deleteRequestsToProcess map[string]*userDeleteRequests
deleteRequestsToProcessMtx sync.Mutex
duplicateRequests []DeleteRequest
metrics *deleteRequestsManagerMetrics
wg sync.WaitGroup
done chan struct{}
Expand Down Expand Up @@ -153,6 +154,23 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
continue
}
}
if ur, ok := d.deleteRequestsToProcess[deleteRequest.UserID]; ok {
for _, requestLoadedForProcessing := range ur.requests {
isDuplicate, err := requestLoadedForProcessing.IsDuplicate(&deleteRequest)
if err != nil {
return err
}
if isDuplicate {
level.Info(util_log.Logger).Log(
"msg", "found duplicate request of one of the requests loaded for processing",
"loaded_request_id", requestLoadedForProcessing.RequestID,
"duplicate_request_id", deleteRequest.RequestID,
"user", deleteRequest.UserID,
)
d.duplicateRequests = append(d.duplicateRequests, deleteRequest)
}
}
}
if reqCount >= d.batchSize {
logBatchTruncation(reqCount, len(deleteRequests))
break
Expand Down Expand Up @@ -366,6 +384,15 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
d.markRequestAsProcessed(*deleteRequest)
}
}

for _, req := range d.duplicateRequests {
level.Info(util_log.Logger).Log("msg", "marking duplicate delete request as processed",
"delete_request_id", req.RequestID,
"sequence_num", req.SequenceNum,
"user", req.UserID,
)
d.markRequestAsProcessed(req)
}
}

func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool {
Expand Down
39 changes: 39 additions & 0 deletions pkg/compactor/deletion/delete_requests_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
expectedResp resp
expectedDeletionRangeByUser map[string]model.Interval
expectedRequestsMarkedAsProcessed []int
expectedDuplicateRequestsCount int
}{
{
name: "no delete requests",
Expand Down Expand Up @@ -895,6 +896,43 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
expectedRequestsMarkedAsProcessed: []int{0, 1},
},
{
name: "duplicate delete request marked as processed with loaded request",
deletionMode: deletionmode.FilterAndDelete,
batchSize: 1,
deleteRequestsFromStore: []DeleteRequest{
{
RequestID: "1",
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
{
RequestID: "2",
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, s string, _ ...labels.Label) bool {
return strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
expectedRequestsMarkedAsProcessed: []int{0, 1},
expectedDuplicateRequestsCount: 1,
},
} {
t.Run(tc.name, func(t *testing.T) {
mockDeleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}
Expand Down Expand Up @@ -947,6 +985,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
for i, reqIdx := range tc.expectedRequestsMarkedAsProcessed {
require.True(t, requestsAreEqual(tc.deleteRequestsFromStore[reqIdx], processedRequests[i]))
}
require.Len(t, mgr.duplicateRequests, tc.expectedDuplicateRequestsCount)
})
}
}
Expand Down

0 comments on commit 81940c8

Please sign in to comment.