Skip to content

Commit e07ed92

Browse files
authored
feat(spanner): add SavedSearchState table and locking logic (#2056)
This commit introduces the schema and client logic for managing the state of Saved Search notifications, specifically focusing on the ingestion worker locking mechanism. Changes: - Infrastructure: Added migration 000026.sql to create the `SavedSearchState` table with `SavedSearchId` and `SnapshotType` composite PK. - Client: Added `entityMutator` generic helper to `client.go` to support transactional "Read-Inspect-Write" patterns required for conditional locking. - Client: Added `SetTimeNowForTesting` to allow deterministic TTL expiration testing. - Feature: Implemented `saved_search_state.go` containing: - `TryAcquireSavedSearchStateWorkerLock` - `ReleaseSavedSearchStateWorkerLock` - `GetSavedSearchState` - `UpdateSavedSearchStateLastKnownStateBlobPath` - Tests: Added `saved_search_state_test.go` covering lock acquisition, expiration, re-acquisition, and release scenarios.
1 parent 508443c commit e07ed92

File tree

4 files changed

+699
-0
lines changed

4 files changed

+699
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
-- Copyright 2025 Google LLC
2+
--
3+
-- Licensed under the Apache License, Version 2.0 (the "License");
4+
-- you may not use this file except in compliance with the License.
5+
-- You may obtain a copy of the License at
6+
--
7+
-- http://www.apache.org/licenses/LICENSE-2.0
8+
--
9+
-- Unless required by applicable law or agreed to in writing, software
10+
-- distributed under the License is distributed on an "AS IS" BASIS,
11+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
-- See the License for the specific language governing permissions and
13+
-- limitations under the License.
14+
15+
-- SavedSearchState
16+
-- This table tracks the "Last Known State" for the diff engine and handles worker locking.
17+
CREATE TABLE IF NOT EXISTS SavedSearchState (
18+
SavedSearchId STRING(36) NOT NULL,
19+
SnapshotType STRING(MAX) NOT NULL, -- Enum: IMMEDIATE, WEEKLY, MONTHLY
20+
LastKnownStateBlobPath STRING(MAX),
21+
WorkerLockId STRING(MAX),
22+
WorkerLockExpiresAt TIMESTAMP,
23+
24+
CONSTRAINT FK_SavedSearchState_SavedSearch
25+
FOREIGN KEY (SavedSearchId) REFERENCES SavedSearches(ID) ON DELETE CASCADE
26+
27+
) PRIMARY KEY (SavedSearchId, SnapshotType);

lib/gcpspanner/client.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ type Client struct {
8484
batchWriter
8585
batchSize int
8686
batchWriters int
87+
timeNow func() time.Time
88+
}
89+
90+
func (c *Client) setTimeNowForTesting(timeNow func() time.Time) {
91+
c.timeNow = timeNow
8792
}
8893

8994
type batchWriter interface {
@@ -217,6 +222,7 @@ func NewSpannerClient(projectID string, instanceID string, name string) (*Client
217222
bw,
218223
defaultBatchSize,
219224
defaultBatchWriters,
225+
time.Now,
220226
}, nil
221227
}
222228

@@ -990,6 +996,84 @@ func (c *entityRemover[M, ExternalStruct, SpannerStruct, Key]) removeWithTransac
990996
return nil
991997
}
992998

999+
// entityMutator is a client for performing transactional Read-Inspect-Write operations.
1000+
type entityMutator[
1001+
M readOneMapper[Key],
1002+
SpannerStruct any,
1003+
Key comparable,
1004+
] struct {
1005+
*Client
1006+
}
1007+
1008+
func newEntityMutator[
1009+
M readOneMapper[Key],
1010+
SpannerStruct any,
1011+
Key comparable,
1012+
](c *Client) *entityMutator[M, SpannerStruct, Key] {
1013+
return &entityMutator[M, SpannerStruct, Key]{c}
1014+
}
1015+
1016+
// readInspectMutateWithTransaction is a generic helper for "Read-Modify-Write" transactions.
1017+
func (c *entityMutator[M, SpannerStruct, Key]) readInspectMutateWithTransaction(
1018+
ctx context.Context,
1019+
key Key,
1020+
// The callback receives the pointer to the struct (or nil if not found).
1021+
// It returns a Mutation to apply, or an error to Abort the transaction.
1022+
logicFn func(ctx context.Context, existing *SpannerStruct) (*spanner.Mutation, error),
1023+
txn *spanner.ReadWriteTransaction,
1024+
) error {
1025+
var mapper M
1026+
stmt := mapper.SelectOne(key)
1027+
1028+
// 1. READ
1029+
it := txn.Query(ctx, stmt)
1030+
defer it.Stop()
1031+
row, err := it.Next()
1032+
1033+
var existing *SpannerStruct
1034+
if err == nil {
1035+
// Found the row
1036+
existing = new(SpannerStruct)
1037+
if err := row.ToStruct(existing); err != nil {
1038+
return errors.Join(ErrInternalQueryFailure, err)
1039+
}
1040+
} else if errors.Is(err, iterator.Done) {
1041+
// Row not found (valid for Insert scenarios)
1042+
existing = nil
1043+
} else {
1044+
return errors.Join(ErrInternalQueryFailure, err)
1045+
}
1046+
1047+
// 2. INSPECT & LOGIC
1048+
mutation, err := logicFn(ctx, existing)
1049+
if err != nil {
1050+
return err // This aborts the transaction
1051+
}
1052+
1053+
// 3. MUTATE (If logic didn't error, apply the change)
1054+
if mutation != nil {
1055+
return txn.BufferWrite([]*spanner.Mutation{mutation})
1056+
}
1057+
1058+
return nil
1059+
}
1060+
1061+
// readInspectMutate is a generic helper for "Read-Modify-Write" transactions.
1062+
// It reads the row (if it exists), passes it to your callback logic, and applies the returned mutation.
1063+
func (c *entityMutator[M, SpannerStruct, Key]) readInspectMutate(
1064+
ctx context.Context,
1065+
key Key,
1066+
// The callback receives the pointer to the struct (or nil if not found).
1067+
// It returns a Mutation to apply, or an error to Abort the transaction.
1068+
logicFn func(ctx context.Context, existing *SpannerStruct) (*spanner.Mutation, error),
1069+
) error {
1070+
_, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
1071+
return c.readInspectMutateWithTransaction(ctx, key, logicFn, txn)
1072+
})
1073+
1074+
return err
1075+
}
1076+
9931077
// allByKeysEntityReader handles the reading of a Spanner table with a set of key(s).
9941078
type allByKeysEntityReader[
9951079
M readAllByKeysMapper[KeysContainer],
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gcpspanner
16+
17+
import (
18+
"context"
19+
"errors"
20+
"time"
21+
22+
"cloud.google.com/go/spanner"
23+
)
24+
25+
const savedSearchStateTableName = "SavedSearchState"
26+
27+
type savedSearchStateMapper struct{}
28+
29+
type savedSearchStateKey struct {
30+
SavedSearchID string
31+
SnapshotType SavedSearchSnapshotType
32+
}
33+
34+
type SavedSearchSnapshotType string
35+
36+
const (
37+
SavedSearchSnapshotTypeImmediate SavedSearchSnapshotType = "IMMEDIATE"
38+
SavedSearchSnapshotTypeWeekly SavedSearchSnapshotType = "WEEKLY"
39+
SavedSearchSnapshotTypeMonthly SavedSearchSnapshotType = "MONTHLY"
40+
)
41+
42+
type SavedSearchState struct {
43+
SavedSearchID string `spanner:"SavedSearchId"`
44+
SnapshotType SavedSearchSnapshotType `spanner:"SnapshotType"`
45+
LastKnownStateBlobPath *string `spanner:"LastKnownStateBlobPath"`
46+
WorkerLockID *string `spanner:"WorkerLockId"`
47+
WorkerLockExpiresAt *time.Time `spanner:"WorkerLockExpiresAt"`
48+
}
49+
50+
func (m savedSearchStateMapper) SelectOne(key savedSearchStateKey) spanner.Statement {
51+
return spanner.Statement{
52+
SQL: "SELECT * FROM SavedSearchState WHERE SavedSearchId = @SavedSearchId AND SnapshotType = @SnapshotType",
53+
Params: map[string]any{"SavedSearchId": key.SavedSearchID, "SnapshotType": key.SnapshotType},
54+
}
55+
}
56+
57+
var (
58+
ErrAlreadyLocked = errors.New("resource already locked by another worker")
59+
ErrLockNotOwned = errors.New("cannot release lock not owned by worker")
60+
)
61+
62+
// TryAcquireSavedSearchStateWorkerLock attempts to acquire a worker lock for the given saved search and snapshot type.
63+
// If the lock is already held by another worker and is still active, ErrAlreadyLocked is returned.
64+
// A caller can re-acquire a lock it already holds it (thereby extending the expiration).
65+
func (c *Client) TryAcquireSavedSearchStateWorkerLock(
66+
ctx context.Context,
67+
savedSearchID string,
68+
snapshotType SavedSearchSnapshotType,
69+
workerID string,
70+
ttl time.Duration) (bool, error) {
71+
writer := newEntityMutator[savedSearchStateMapper, SavedSearchState](c)
72+
key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType}
73+
74+
err := writer.readInspectMutate(ctx, key,
75+
func(_ context.Context, existing *SavedSearchState) (*spanner.Mutation, error) {
76+
now := c.timeNow()
77+
78+
// If row exists, is it locked by someone else not the caller?
79+
if existing != nil {
80+
isLocked := existing.WorkerLockID != nil && *existing.WorkerLockID != workerID
81+
isActive := existing.WorkerLockExpiresAt != nil && existing.WorkerLockExpiresAt.After(now)
82+
83+
if isLocked && isActive {
84+
return nil, ErrAlreadyLocked
85+
}
86+
}
87+
88+
expiration := now.Add(ttl)
89+
90+
// We can take the lock.
91+
newState := SavedSearchState{
92+
SavedSearchID: savedSearchID,
93+
SnapshotType: snapshotType,
94+
WorkerLockID: &workerID,
95+
WorkerLockExpiresAt: &expiration,
96+
LastKnownStateBlobPath: nil,
97+
}
98+
if existing != nil {
99+
newState.LastKnownStateBlobPath = existing.LastKnownStateBlobPath
100+
}
101+
102+
return spanner.InsertOrUpdateStruct(savedSearchStateTableName, newState)
103+
})
104+
105+
if err != nil {
106+
return false, err
107+
}
108+
109+
return true, nil
110+
}
111+
112+
// ReleaseSavedSearchStateWorkerLock releases the worker lock for the given saved search and snapshot type.
113+
// The caller must own the lock. If not, ErrLockNotOwned is returned.
114+
func (c *Client) ReleaseSavedSearchStateWorkerLock(
115+
ctx context.Context,
116+
savedSearchID string,
117+
snapshotType SavedSearchSnapshotType,
118+
workerID string) error {
119+
mutator := newEntityMutator[savedSearchStateMapper, SavedSearchState](c)
120+
key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType}
121+
122+
return mutator.readInspectMutate(ctx, key,
123+
func(_ context.Context, existing *SavedSearchState) (*spanner.Mutation, error) {
124+
// If row is gone, nothing to release
125+
if existing == nil {
126+
return nil, nil
127+
}
128+
129+
// Verify the caller owns this lock
130+
if existing.WorkerLockID == nil || *existing.WorkerLockID != workerID {
131+
return nil, ErrLockNotOwned
132+
}
133+
134+
newState := SavedSearchState{
135+
SavedSearchID: savedSearchID,
136+
SnapshotType: snapshotType,
137+
// Release the lock
138+
WorkerLockID: nil,
139+
WorkerLockExpiresAt: nil,
140+
LastKnownStateBlobPath: nil,
141+
}
142+
143+
// Preserve the existing blob path
144+
newState.LastKnownStateBlobPath = existing.LastKnownStateBlobPath
145+
146+
return spanner.InsertOrUpdateStruct(savedSearchStateTableName, newState)
147+
})
148+
}
149+
150+
// GetSavedSearchState retrieves the SavedSearchState for the given saved search and snapshot type.
151+
// If no such row exists, ErrQueryReturnedNoResults is returned.
152+
func (c *Client) GetSavedSearchState(
153+
ctx context.Context,
154+
savedSearchID string,
155+
snapshotType SavedSearchSnapshotType) (*SavedSearchState, error) {
156+
r := newEntityReader[savedSearchStateMapper, SavedSearchState, savedSearchStateKey](c)
157+
key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType}
158+
159+
return r.readRowByKey(ctx, key)
160+
}
161+
162+
// UpdateSavedSearchStateLastKnownStateBlobPath updates the LastKnownStateBlobPath
163+
// for the given saved search and snapshot type.
164+
// The row must already exist. Else, ErrQueryReturnedNoResults is returned.
165+
func (c *Client) UpdateSavedSearchStateLastKnownStateBlobPath(
166+
ctx context.Context,
167+
savedSearchID string,
168+
snapshotType SavedSearchSnapshotType,
169+
blobPath string) error {
170+
mutator := newEntityMutator[savedSearchStateMapper, SavedSearchState](c)
171+
key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType}
172+
173+
return mutator.readInspectMutate(ctx, key,
174+
func(_ context.Context, existing *SavedSearchState) (*spanner.Mutation, error) {
175+
if existing == nil {
176+
return nil, ErrQueryReturnedNoResults
177+
}
178+
// Update existing row
179+
existing.LastKnownStateBlobPath = &blobPath
180+
181+
return spanner.UpdateStruct(savedSearchStateTableName, *existing)
182+
})
183+
}

0 commit comments

Comments
 (0)