From 20d0fa127b52ab0711a32b71c5a4231ec6386985 Mon Sep 17 00:00:00 2001 From: James Scott Date: Wed, 3 Dec 2025 22:02:30 +0000 Subject: [PATCH] feat(spanner): add SavedSearchState table and locking logic This commit introduces the schema and client logic for managing the state of Saved Search notifications, specifically focusing on the ingestion worker locking mechanism. Changes: - Infrastructure: Added migration 000026.sql to create the `SavedSearchState` table with `SavedSearchId` and `SnapshotType` composite PK. - Client: Added `entityMutator` generic helper to `client.go` to support transactional "Read-Inspect-Write" patterns required for conditional locking. - Client: Added `SetTimeNowForTesting` to allow deterministic TTL expiration testing. - Feature: Implemented `saved_search_state.go` containing: - `TryAcquireSavedSearchStateWorkerLock` - `ReleaseSavedSearchStateWorkerLock` - `GetSavedSearchState` - `UpdateSavedSearchStateLastKnownStateBlobPath` - Tests: Added `saved_search_state_test.go` covering lock acquisition, expiration, re-acquisition, and release scenarios. --- infra/storage/spanner/migrations/000026.sql | 27 ++ lib/gcpspanner/client.go | 84 ++++ lib/gcpspanner/saved_search_state.go | 183 +++++++++ lib/gcpspanner/saved_search_state_test.go | 405 ++++++++++++++++++++ 4 files changed, 699 insertions(+) create mode 100644 infra/storage/spanner/migrations/000026.sql create mode 100644 lib/gcpspanner/saved_search_state.go create mode 100644 lib/gcpspanner/saved_search_state_test.go diff --git a/infra/storage/spanner/migrations/000026.sql b/infra/storage/spanner/migrations/000026.sql new file mode 100644 index 000000000..13a6664c7 --- /dev/null +++ b/infra/storage/spanner/migrations/000026.sql @@ -0,0 +1,27 @@ +-- Copyright 2025 Google LLC +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- SavedSearchState +-- This table tracks the "Last Known State" for the diff engine and handles worker locking. +CREATE TABLE IF NOT EXISTS SavedSearchState ( + SavedSearchId STRING(36) NOT NULL, + SnapshotType STRING(MAX) NOT NULL, -- Enum: IMMEDIATE, WEEKLY, MONTHLY + LastKnownStateBlobPath STRING(MAX), + WorkerLockId STRING(MAX), + WorkerLockExpiresAt TIMESTAMP, + + CONSTRAINT FK_SavedSearchState_SavedSearch + FOREIGN KEY (SavedSearchId) REFERENCES SavedSearches(ID) ON DELETE CASCADE + +) PRIMARY KEY (SavedSearchId, SnapshotType); diff --git a/lib/gcpspanner/client.go b/lib/gcpspanner/client.go index 5875029e1..f6cdd12e5 100644 --- a/lib/gcpspanner/client.go +++ b/lib/gcpspanner/client.go @@ -84,6 +84,11 @@ type Client struct { batchWriter batchSize int batchWriters int + timeNow func() time.Time +} + +func (c *Client) setTimeNowForTesting(timeNow func() time.Time) { + c.timeNow = timeNow } type batchWriter interface { @@ -217,6 +222,7 @@ func NewSpannerClient(projectID string, instanceID string, name string) (*Client bw, defaultBatchSize, defaultBatchWriters, + time.Now, }, nil } @@ -990,6 +996,84 @@ func (c *entityRemover[M, ExternalStruct, SpannerStruct, Key]) removeWithTransac return nil } +// entityMutator is a client for performing transactional Read-Inspect-Write operations. +type entityMutator[ + M readOneMapper[Key], + SpannerStruct any, + Key comparable, +] struct { + *Client +} + +func newEntityMutator[ + M readOneMapper[Key], + SpannerStruct any, + Key comparable, +](c *Client) *entityMutator[M, SpannerStruct, Key] { + return &entityMutator[M, SpannerStruct, Key]{c} +} + +// readInspectMutateWithTransaction is a generic helper for "Read-Modify-Write" transactions. +func (c *entityMutator[M, SpannerStruct, Key]) readInspectMutateWithTransaction( + ctx context.Context, + key Key, + // The callback receives the pointer to the struct (or nil if not found). + // It returns a Mutation to apply, or an error to Abort the transaction. + logicFn func(ctx context.Context, existing *SpannerStruct) (*spanner.Mutation, error), + txn *spanner.ReadWriteTransaction, +) error { + var mapper M + stmt := mapper.SelectOne(key) + + // 1. READ + it := txn.Query(ctx, stmt) + defer it.Stop() + row, err := it.Next() + + var existing *SpannerStruct + if err == nil { + // Found the row + existing = new(SpannerStruct) + if err := row.ToStruct(existing); err != nil { + return errors.Join(ErrInternalQueryFailure, err) + } + } else if errors.Is(err, iterator.Done) { + // Row not found (valid for Insert scenarios) + existing = nil + } else { + return errors.Join(ErrInternalQueryFailure, err) + } + + // 2. INSPECT & LOGIC + mutation, err := logicFn(ctx, existing) + if err != nil { + return err // This aborts the transaction + } + + // 3. MUTATE (If logic didn't error, apply the change) + if mutation != nil { + return txn.BufferWrite([]*spanner.Mutation{mutation}) + } + + return nil +} + +// readInspectMutate is a generic helper for "Read-Modify-Write" transactions. +// It reads the row (if it exists), passes it to your callback logic, and applies the returned mutation. +func (c *entityMutator[M, SpannerStruct, Key]) readInspectMutate( + ctx context.Context, + key Key, + // The callback receives the pointer to the struct (or nil if not found). + // It returns a Mutation to apply, or an error to Abort the transaction. + logicFn func(ctx context.Context, existing *SpannerStruct) (*spanner.Mutation, error), +) error { + _, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { + return c.readInspectMutateWithTransaction(ctx, key, logicFn, txn) + }) + + return err +} + // allByKeysEntityReader handles the reading of a Spanner table with a set of key(s). type allByKeysEntityReader[ M readAllByKeysMapper[KeysContainer], diff --git a/lib/gcpspanner/saved_search_state.go b/lib/gcpspanner/saved_search_state.go new file mode 100644 index 000000000..d2ac701bf --- /dev/null +++ b/lib/gcpspanner/saved_search_state.go @@ -0,0 +1,183 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcpspanner + +import ( + "context" + "errors" + "time" + + "cloud.google.com/go/spanner" +) + +const savedSearchStateTableName = "SavedSearchState" + +type savedSearchStateMapper struct{} + +type savedSearchStateKey struct { + SavedSearchID string + SnapshotType SavedSearchSnapshotType +} + +type SavedSearchSnapshotType string + +const ( + SavedSearchSnapshotTypeImmediate SavedSearchSnapshotType = "IMMEDIATE" + SavedSearchSnapshotTypeWeekly SavedSearchSnapshotType = "WEEKLY" + SavedSearchSnapshotTypeMonthly SavedSearchSnapshotType = "MONTHLY" +) + +type SavedSearchState struct { + SavedSearchID string `spanner:"SavedSearchId"` + SnapshotType SavedSearchSnapshotType `spanner:"SnapshotType"` + LastKnownStateBlobPath *string `spanner:"LastKnownStateBlobPath"` + WorkerLockID *string `spanner:"WorkerLockId"` + WorkerLockExpiresAt *time.Time `spanner:"WorkerLockExpiresAt"` +} + +func (m savedSearchStateMapper) SelectOne(key savedSearchStateKey) spanner.Statement { + return spanner.Statement{ + SQL: "SELECT * FROM SavedSearchState WHERE SavedSearchId = @SavedSearchId AND SnapshotType = @SnapshotType", + Params: map[string]any{"SavedSearchId": key.SavedSearchID, "SnapshotType": key.SnapshotType}, + } +} + +var ( + ErrAlreadyLocked = errors.New("resource already locked by another worker") + ErrLockNotOwned = errors.New("cannot release lock not owned by worker") +) + +// TryAcquireSavedSearchStateWorkerLock attempts to acquire a worker lock for the given saved search and snapshot type. +// If the lock is already held by another worker and is still active, ErrAlreadyLocked is returned. +// A caller can re-acquire a lock it already holds it (thereby extending the expiration). +func (c *Client) TryAcquireSavedSearchStateWorkerLock( + ctx context.Context, + savedSearchID string, + snapshotType SavedSearchSnapshotType, + workerID string, + ttl time.Duration) (bool, error) { + writer := newEntityMutator[savedSearchStateMapper, SavedSearchState](c) + key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType} + + err := writer.readInspectMutate(ctx, key, + func(_ context.Context, existing *SavedSearchState) (*spanner.Mutation, error) { + now := c.timeNow() + + // If row exists, is it locked by someone else not the caller? + if existing != nil { + isLocked := existing.WorkerLockID != nil && *existing.WorkerLockID != workerID + isActive := existing.WorkerLockExpiresAt != nil && existing.WorkerLockExpiresAt.After(now) + + if isLocked && isActive { + return nil, ErrAlreadyLocked + } + } + + expiration := now.Add(ttl) + + // We can take the lock. + newState := SavedSearchState{ + SavedSearchID: savedSearchID, + SnapshotType: snapshotType, + WorkerLockID: &workerID, + WorkerLockExpiresAt: &expiration, + LastKnownStateBlobPath: nil, + } + if existing != nil { + newState.LastKnownStateBlobPath = existing.LastKnownStateBlobPath + } + + return spanner.InsertOrUpdateStruct(savedSearchStateTableName, newState) + }) + + if err != nil { + return false, err + } + + return true, nil +} + +// ReleaseSavedSearchStateWorkerLock releases the worker lock for the given saved search and snapshot type. +// The caller must own the lock. If not, ErrLockNotOwned is returned. +func (c *Client) ReleaseSavedSearchStateWorkerLock( + ctx context.Context, + savedSearchID string, + snapshotType SavedSearchSnapshotType, + workerID string) error { + mutator := newEntityMutator[savedSearchStateMapper, SavedSearchState](c) + key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType} + + return mutator.readInspectMutate(ctx, key, + func(_ context.Context, existing *SavedSearchState) (*spanner.Mutation, error) { + // If row is gone, nothing to release + if existing == nil { + return nil, nil + } + + // Verify the caller owns this lock + if existing.WorkerLockID == nil || *existing.WorkerLockID != workerID { + return nil, ErrLockNotOwned + } + + newState := SavedSearchState{ + SavedSearchID: savedSearchID, + SnapshotType: snapshotType, + // Release the lock + WorkerLockID: nil, + WorkerLockExpiresAt: nil, + LastKnownStateBlobPath: nil, + } + + // Preserve the existing blob path + newState.LastKnownStateBlobPath = existing.LastKnownStateBlobPath + + return spanner.InsertOrUpdateStruct(savedSearchStateTableName, newState) + }) +} + +// GetSavedSearchState retrieves the SavedSearchState for the given saved search and snapshot type. +// If no such row exists, ErrQueryReturnedNoResults is returned. +func (c *Client) GetSavedSearchState( + ctx context.Context, + savedSearchID string, + snapshotType SavedSearchSnapshotType) (*SavedSearchState, error) { + r := newEntityReader[savedSearchStateMapper, SavedSearchState, savedSearchStateKey](c) + key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType} + + return r.readRowByKey(ctx, key) +} + +// UpdateSavedSearchStateLastKnownStateBlobPath updates the LastKnownStateBlobPath +// for the given saved search and snapshot type. +// The row must already exist. Else, ErrQueryReturnedNoResults is returned. +func (c *Client) UpdateSavedSearchStateLastKnownStateBlobPath( + ctx context.Context, + savedSearchID string, + snapshotType SavedSearchSnapshotType, + blobPath string) error { + mutator := newEntityMutator[savedSearchStateMapper, SavedSearchState](c) + key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType} + + return mutator.readInspectMutate(ctx, key, + func(_ context.Context, existing *SavedSearchState) (*spanner.Mutation, error) { + if existing == nil { + return nil, ErrQueryReturnedNoResults + } + // Update existing row + existing.LastKnownStateBlobPath = &blobPath + + return spanner.UpdateStruct(savedSearchStateTableName, *existing) + }) +} diff --git a/lib/gcpspanner/saved_search_state_test.go b/lib/gcpspanner/saved_search_state_test.go new file mode 100644 index 000000000..6e1e39d3a --- /dev/null +++ b/lib/gcpspanner/saved_search_state_test.go @@ -0,0 +1,405 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcpspanner + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +const worker1 = "worker-1" +const worker2 = "worker-2" +const snapshotType = "compat-stats" + +func noopSavedSearchStateHelper(t *testing.T, _ string) { + t.Helper() +} + +func createSavedSearchForSavedSearchStateTests(ctx context.Context, t *testing.T) string { + t.Helper() + id, err := spannerClient.CreateNewUserSavedSearch(ctx, CreateUserSavedSearchRequest{ + Name: "test search", + Query: "group:test", + OwnerUserID: "owner-1", + Description: nil, + }) + if err != nil { + t.Fatalf("CreateNewUserSavedSearch() returned unexpected error: %v", err) + } + + return *id +} + +// Asserts for TestTryAcquireSavedSearchStateWorkerLock. +func assertAbleToAcquireLock(ctx context.Context, t *testing.T, savedSearchID string, + snapshotType SavedSearchSnapshotType, workerID string, + initialTime time.Time, ttl time.Duration) { + state, err := spannerClient.GetSavedSearchState(ctx, savedSearchID, snapshotType) + if err != nil { + t.Fatalf("GetSavedSearchState() got unexpected error = %v", err) + } + if state == nil { + t.Fatal("GetSavedSearchState() state was nil") + } + if *state.WorkerLockID != workerID { + t.Errorf("WorkerLockID mismatch: got %s, want %s", *state.WorkerLockID, workerID) + } + expectedExpiration := initialTime.Add(ttl) + if !expectedExpiration.Equal(*state.WorkerLockExpiresAt) { + t.Errorf("WorkerLockExpiresAt mismatch: got %v, want %v", *state.WorkerLockExpiresAt, + expectedExpiration) + } +} + +func TestTryAcquireSavedSearchStateWorkerLock(t *testing.T) { + ctx := context.Background() + + // Fixed time for deterministic tests + fixedTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + + ttl := 10 * time.Minute + + testCases := []struct { + name string + setup func(t *testing.T, savedSearchID string) + snapshotType SavedSearchSnapshotType + workerID string + ttl time.Duration + expectedSuccess bool + expectedErr error + assertAfterAction func(t *testing.T, savedSearchID string) + }{ + { + name: "acquire lock when none exists", + snapshotType: snapshotType, + workerID: worker1, + ttl: ttl, + expectedSuccess: true, + expectedErr: nil, + setup: noopSavedSearchStateHelper, + assertAfterAction: func(t *testing.T, savedSearchID string) { + t.Helper() + assertAbleToAcquireLock(ctx, t, savedSearchID, snapshotType, worker1, fixedTime, ttl) + }, + }, + { + name: "re-acquire existing lock", + snapshotType: snapshotType, + workerID: worker1, + ttl: 20 * time.Minute, // Extend TTL + setup: func(t *testing.T, savedSearchID string) { + t.Helper() + // Pre-acquire the lock + _, err := spannerClient.TryAcquireSavedSearchStateWorkerLock(ctx, savedSearchID, snapshotType, + worker1, ttl) + if err != nil { + t.Fatalf("setup: TryAcquireSavedSearchStateWorkerLock failed: %v", err) + } + }, + expectedSuccess: true, + expectedErr: nil, + assertAfterAction: func(t *testing.T, savedSearchID string) { + t.Helper() + assertAbleToAcquireLock(ctx, t, savedSearchID, snapshotType, worker1, fixedTime, + // New TTL + 20*time.Minute) + }, + }, + { + name: "fail to acquire lock held by another active worker", + snapshotType: snapshotType, + workerID: worker2, + ttl: ttl, + setup: func(t *testing.T, savedSearchID string) { + t.Helper() + // worker1 acquires the lock + _, err := spannerClient.TryAcquireSavedSearchStateWorkerLock(ctx, savedSearchID, snapshotType, worker1, + ttl) + if err != nil { + t.Fatalf("setup: TryAcquireSavedSearchStateWorkerLock failed: %v", err) + } + }, + expectedSuccess: false, + expectedErr: ErrAlreadyLocked, + assertAfterAction: func(t *testing.T, savedSearchID string) { + t.Helper() + // State should be unchanged + state, err := spannerClient.GetSavedSearchState(ctx, savedSearchID, snapshotType) + if err != nil { + t.Fatalf("GetSavedSearchState() got unexpected error = %v", err) + } + if state == nil { + t.Fatal("GetSavedSearchState() state was nil") + } + if *state.WorkerLockID != worker1 { + t.Errorf("WorkerLockID mismatch: got %s, want %s", *state.WorkerLockID, worker1) + } + }, + }, + { + name: "acquire lock held by another worker but expired", + snapshotType: snapshotType, + workerID: worker2, + ttl: ttl, + setup: func(t *testing.T, savedSearchID string) { + t.Helper() + // worker1 acquires the lock + _, err := spannerClient.TryAcquireSavedSearchStateWorkerLock(ctx, savedSearchID, snapshotType, worker1, + ttl) + if err != nil { + t.Fatalf("setup: TryAcquireSavedSearchStateWorkerLock failed: %v", err) + } + // Time moves forward, making the lock expire + spannerClient.setTimeNowForTesting(func() time.Time { return fixedTime.Add(15 * time.Minute) }) + }, + expectedSuccess: true, + expectedErr: nil, + assertAfterAction: func(t *testing.T, savedSearchID string) { + t.Helper() + assertAbleToAcquireLock(ctx, t, savedSearchID, snapshotType, worker2, + // Current time is 15 minutes later. The new lock should reflect that. + fixedTime.Add(15*time.Minute), + // TTL is the same. + ttl) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + restartDatabaseContainer(t) + savedSearchID := createSavedSearchForSavedSearchStateTests(ctx, t) + spannerClient.setTimeNowForTesting(func() time.Time { return fixedTime }) + + tc.setup(t, savedSearchID) + + success, err := spannerClient.TryAcquireSavedSearchStateWorkerLock( + ctx, savedSearchID, tc.snapshotType, tc.workerID, tc.ttl) + + if success != tc.expectedSuccess { + t.Errorf("TryAcquireSavedSearchStateWorkerLock() success = %v, want %v", success, tc.expectedSuccess) + } + if !errors.Is(err, tc.expectedErr) { + t.Errorf("TryAcquireSavedSearchStateWorkerLock() error = %v, want %v", err, tc.expectedErr) + } + + tc.assertAfterAction(t, savedSearchID) + }) + } +} + +func TestReleaseSavedSearchStateWorkerLock(t *testing.T) { + ctx := context.Background() + + ttl := 10 * time.Minute + + testCases := []struct { + name string + setup func(t *testing.T, savedSearchID string) + otherSavedSearchID *string + snapshotType SavedSearchSnapshotType + workerID string + expectedErr error + assertAfterAction func(t *testing.T, savedSearchID string) + }{ + { + name: "successfully release an owned lock", + snapshotType: snapshotType, + workerID: worker1, + setup: func(t *testing.T, savedSearchID string) { + t.Helper() + _, err := spannerClient.TryAcquireSavedSearchStateWorkerLock(ctx, savedSearchID, snapshotType, worker1, + ttl) + if err != nil { + t.Fatalf("setup: TryAcquireSavedSearchStateWorkerLock failed: %v", err) + } + }, + expectedErr: nil, + otherSavedSearchID: nil, + assertAfterAction: func(t *testing.T, savedSearchID string) { + t.Helper() + state, err := spannerClient.GetSavedSearchState(ctx, savedSearchID, snapshotType) + if err != nil { + t.Fatalf("GetSavedSearchState() got unexpected error = %v", err) + } + if state == nil { + t.Fatal("GetSavedSearchState() state was nil") + } + if state.WorkerLockID != nil { + t.Errorf("WorkerLockID should be nil, got %s", *state.WorkerLockID) + } + if state.WorkerLockExpiresAt != nil { + t.Errorf("WorkerLockExpiresAt should be nil, got %v", *state.WorkerLockExpiresAt) + } + }, + }, + { + name: "fail to release a lock owned by another worker", + snapshotType: snapshotType, + workerID: worker2, + setup: func(t *testing.T, savedSearchID string) { + t.Helper() + _, err := spannerClient.TryAcquireSavedSearchStateWorkerLock(ctx, savedSearchID, snapshotType, worker1, + ttl) + if err != nil { + t.Fatalf("setup: TryAcquireSavedSearchStateWorkerLock failed: %v", err) + } + }, + expectedErr: ErrLockNotOwned, + otherSavedSearchID: nil, + assertAfterAction: func(t *testing.T, savedSearchID string) { + t.Helper() + // State should be unchanged + state, err := spannerClient.GetSavedSearchState(ctx, savedSearchID, snapshotType) + if err != nil { + t.Fatalf("GetSavedSearchState() got unexpected error = %v", err) + } + if state == nil { + t.Fatal("GetSavedSearchState() state was nil") + } + if *state.WorkerLockID != worker1 { + t.Errorf("WorkerLockID mismatch: got %s, want %s", *state.WorkerLockID, worker1) + } + }, + }, + { + name: "attempt to release a lock that does not exist", + otherSavedSearchID: valuePtr("non-existent-search"), + snapshotType: snapshotType, + workerID: worker1, + expectedErr: nil, // Should be a no-op + assertAfterAction: noopSavedSearchStateHelper, + setup: noopSavedSearchStateHelper, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + restartDatabaseContainer(t) + savedSearchID := createSavedSearchForSavedSearchStateTests(ctx, t) + tc.setup(t, savedSearchID) + + if tc.otherSavedSearchID != nil { + savedSearchID = *tc.otherSavedSearchID + } + + err := spannerClient.ReleaseSavedSearchStateWorkerLock(ctx, savedSearchID, tc.snapshotType, tc.workerID) + + if !errors.Is(err, tc.expectedErr) { + t.Errorf("ReleaseSavedSearchStateWorkerLock() error = %v, want %v", err, tc.expectedErr) + } + + tc.assertAfterAction(t, savedSearchID) + }) + } +} + +func TestGetAndUpdateSavedSearchState(t *testing.T) { + ctx := context.Background() + restartDatabaseContainer(t) + + // Fixed time for deterministic tests + fixedTime := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC) + spannerClient.setTimeNowForTesting(func() time.Time { return fixedTime }) + + savedSearchID := createSavedSearchForSavedSearchStateTests(ctx, t) + + workerID := "worker-1" + ttl := 10 * time.Minute + initialBlobPath := "path/to/blob/1" + updatedBlobPath := "path/to/blob/2" + + // Setup: Create an initial state + _, err := spannerClient.TryAcquireSavedSearchStateWorkerLock(ctx, savedSearchID, snapshotType, workerID, ttl) + if err != nil { + t.Fatalf("setup: TryAcquireSavedSearchStateWorkerLock failed: %v", err) + } + err = spannerClient.UpdateSavedSearchStateLastKnownStateBlobPath(ctx, savedSearchID, snapshotType, initialBlobPath) + if err != nil { + t.Fatalf("setup: UpdateSavedSearchStateLastKnownStateBlobPath failed: %v", err) + } + + t.Run("GetSavedSearchState - success", func(t *testing.T) { + state, err := spannerClient.GetSavedSearchState(ctx, savedSearchID, snapshotType) + if err != nil { + t.Fatalf("GetSavedSearchState() got unexpected error = %v", err) + } + if state == nil { + t.Fatal("GetSavedSearchState() state was nil") + } + if state.SavedSearchID != savedSearchID { + t.Errorf("SavedSearchID mismatch: got %s, want %s", state.SavedSearchID, savedSearchID) + } + if state.SnapshotType != snapshotType { + t.Errorf("SnapshotType mismatch: got %s, want %s", state.SnapshotType, snapshotType) + } + if *state.WorkerLockID != workerID { + t.Errorf("WorkerLockID mismatch: got %s, want %s", *state.WorkerLockID, workerID) + } + if *state.LastKnownStateBlobPath != initialBlobPath { + t.Errorf("LastKnownStateBlobPath mismatch: got %s, want %s", *state.LastKnownStateBlobPath, initialBlobPath) + } + }) + + t.Run("GetSavedSearchState - not found", func(t *testing.T) { + _, err := spannerClient.GetSavedSearchState(ctx, "non-existent", snapshotType) + if !errors.Is(err, ErrQueryReturnedNoResults) { + t.Errorf("GetSavedSearchState() with non-existent key returned error = %v, want %v", err, + ErrQueryReturnedNoResults) + } + }) + + t.Run("UpdateSavedSearchStateLastKnownStateBlobPath - success", func(t *testing.T) { + err := spannerClient.UpdateSavedSearchStateLastKnownStateBlobPath(ctx, savedSearchID, + snapshotType, updatedBlobPath) + if err != nil { + t.Fatalf("UpdateSavedSearchStateLastKnownStateBlobPath() returned an error: %v", err) + } + + // Verify update + state, err := spannerClient.GetSavedSearchState(ctx, savedSearchID, snapshotType) + if err != nil { + t.Fatalf("GetSavedSearchState() after update returned an error: %v", err) + } + if state == nil { + t.Fatal("GetSavedSearchState() after update state was nil") + } + expectedExpiration := fixedTime.Add(ttl) + expectedState := SavedSearchState{ + SavedSearchID: savedSearchID, + SnapshotType: snapshotType, + WorkerLockID: valuePtr(workerID), + LastKnownStateBlobPath: valuePtr(updatedBlobPath), + WorkerLockExpiresAt: &expectedExpiration, + } + if diff := cmp.Diff(expectedState, *state); diff != "" { + t.Errorf("GetSavedSearchState() after update mismatch (-want +got):\n%s", diff) + } + }) + + t.Run("UpdateSavedSearchStateLastKnownStateBlobPath - not found", func(t *testing.T) { + err := spannerClient.UpdateSavedSearchStateLastKnownStateBlobPath(ctx, "non-existent", + snapshotType, updatedBlobPath) + if !errors.Is(err, ErrQueryReturnedNoResults) { + t.Errorf( + "UpdateSavedSearchStateLastKnownStateBlobPath() with non-existent key returned error = %v, want %v", + err, ErrQueryReturnedNoResults) + } + }) +}