Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions infra/storage/spanner/migrations/000026.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Copyright 2025 Google LLC
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- SavedSearchState
-- This table tracks the "Last Known State" for the diff engine and handles worker locking.
CREATE TABLE IF NOT EXISTS SavedSearchState (
SavedSearchId STRING(36) NOT NULL,
SnapshotType STRING(MAX) NOT NULL, -- Enum: IMMEDIATE, WEEKLY, MONTHLY
LastKnownStateBlobPath STRING(MAX),
WorkerLockId STRING(MAX),
WorkerLockExpiresAt TIMESTAMP,

CONSTRAINT FK_SavedSearchState_SavedSearch
FOREIGN KEY (SavedSearchId) REFERENCES SavedSearches(ID) ON DELETE CASCADE

) PRIMARY KEY (SavedSearchId, SnapshotType);
84 changes: 84 additions & 0 deletions lib/gcpspanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type Client struct {
batchWriter
batchSize int
batchWriters int
timeNow func() time.Time
}

func (c *Client) setTimeNowForTesting(timeNow func() time.Time) {
c.timeNow = timeNow
}

type batchWriter interface {
Expand Down Expand Up @@ -217,6 +222,7 @@ func NewSpannerClient(projectID string, instanceID string, name string) (*Client
bw,
defaultBatchSize,
defaultBatchWriters,
time.Now,
}, nil
}

Expand Down Expand Up @@ -990,6 +996,84 @@ func (c *entityRemover[M, ExternalStruct, SpannerStruct, Key]) removeWithTransac
return nil
}

// entityMutator is a client for performing transactional Read-Inspect-Write operations.
type entityMutator[
M readOneMapper[Key],
SpannerStruct any,
Key comparable,
] struct {
*Client
}

func newEntityMutator[
M readOneMapper[Key],
SpannerStruct any,
Key comparable,
](c *Client) *entityMutator[M, SpannerStruct, Key] {
return &entityMutator[M, SpannerStruct, Key]{c}
}

// readInspectMutateWithTransaction is a generic helper for "Read-Modify-Write" transactions.
func (c *entityMutator[M, SpannerStruct, Key]) readInspectMutateWithTransaction(
ctx context.Context,
key Key,
// The callback receives the pointer to the struct (or nil if not found).
// It returns a Mutation to apply, or an error to Abort the transaction.
logicFn func(ctx context.Context, existing *SpannerStruct) (*spanner.Mutation, error),
txn *spanner.ReadWriteTransaction,
) error {
var mapper M
stmt := mapper.SelectOne(key)

// 1. READ
it := txn.Query(ctx, stmt)
defer it.Stop()
row, err := it.Next()

var existing *SpannerStruct
if err == nil {
// Found the row
existing = new(SpannerStruct)
if err := row.ToStruct(existing); err != nil {
return errors.Join(ErrInternalQueryFailure, err)
}
} else if errors.Is(err, iterator.Done) {
// Row not found (valid for Insert scenarios)
existing = nil
} else {
return errors.Join(ErrInternalQueryFailure, err)
}

// 2. INSPECT & LOGIC
mutation, err := logicFn(ctx, existing)
if err != nil {
return err // This aborts the transaction
}

// 3. MUTATE (If logic didn't error, apply the change)
if mutation != nil {
return txn.BufferWrite([]*spanner.Mutation{mutation})
}

return nil
}

// readInspectMutate is a generic helper for "Read-Modify-Write" transactions.
// It reads the row (if it exists), passes it to your callback logic, and applies the returned mutation.
func (c *entityMutator[M, SpannerStruct, Key]) readInspectMutate(
ctx context.Context,
key Key,
// The callback receives the pointer to the struct (or nil if not found).
// It returns a Mutation to apply, or an error to Abort the transaction.
logicFn func(ctx context.Context, existing *SpannerStruct) (*spanner.Mutation, error),
) error {
_, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
return c.readInspectMutateWithTransaction(ctx, key, logicFn, txn)
})

return err
}

// allByKeysEntityReader handles the reading of a Spanner table with a set of key(s).
type allByKeysEntityReader[
M readAllByKeysMapper[KeysContainer],
Expand Down
183 changes: 183 additions & 0 deletions lib/gcpspanner/saved_search_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gcpspanner

import (
"context"
"errors"
"time"

"cloud.google.com/go/spanner"
)

const savedSearchStateTableName = "SavedSearchState"

type savedSearchStateMapper struct{}

type savedSearchStateKey struct {
SavedSearchID string
SnapshotType SavedSearchSnapshotType
}

type SavedSearchSnapshotType string

const (
SavedSearchSnapshotTypeImmediate SavedSearchSnapshotType = "IMMEDIATE"
SavedSearchSnapshotTypeWeekly SavedSearchSnapshotType = "WEEKLY"
SavedSearchSnapshotTypeMonthly SavedSearchSnapshotType = "MONTHLY"
)

type SavedSearchState struct {
SavedSearchID string `spanner:"SavedSearchId"`
SnapshotType SavedSearchSnapshotType `spanner:"SnapshotType"`
LastKnownStateBlobPath *string `spanner:"LastKnownStateBlobPath"`
WorkerLockID *string `spanner:"WorkerLockId"`
WorkerLockExpiresAt *time.Time `spanner:"WorkerLockExpiresAt"`
}

func (m savedSearchStateMapper) SelectOne(key savedSearchStateKey) spanner.Statement {
return spanner.Statement{
SQL: "SELECT * FROM SavedSearchState WHERE SavedSearchId = @SavedSearchId AND SnapshotType = @SnapshotType",
Params: map[string]any{"SavedSearchId": key.SavedSearchID, "SnapshotType": key.SnapshotType},
}
}

var (
ErrAlreadyLocked = errors.New("resource already locked by another worker")
ErrLockNotOwned = errors.New("cannot release lock not owned by worker")
)

// TryAcquireSavedSearchStateWorkerLock attempts to acquire a worker lock for the given saved search and snapshot type.
// If the lock is already held by another worker and is still active, ErrAlreadyLocked is returned.
// A caller can re-acquire a lock it already holds it (thereby extending the expiration).
func (c *Client) TryAcquireSavedSearchStateWorkerLock(
ctx context.Context,
savedSearchID string,
snapshotType SavedSearchSnapshotType,
workerID string,
ttl time.Duration) (bool, error) {
writer := newEntityMutator[savedSearchStateMapper, SavedSearchState](c)
key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType}

err := writer.readInspectMutate(ctx, key,
func(_ context.Context, existing *SavedSearchState) (*spanner.Mutation, error) {
now := c.timeNow()

// If row exists, is it locked by someone else not the caller?
if existing != nil {
isLocked := existing.WorkerLockID != nil && *existing.WorkerLockID != workerID
isActive := existing.WorkerLockExpiresAt != nil && existing.WorkerLockExpiresAt.After(now)

if isLocked && isActive {
return nil, ErrAlreadyLocked
}
}

expiration := now.Add(ttl)

// We can take the lock.
newState := SavedSearchState{
SavedSearchID: savedSearchID,
SnapshotType: snapshotType,
WorkerLockID: &workerID,
WorkerLockExpiresAt: &expiration,
LastKnownStateBlobPath: nil,
}
if existing != nil {
newState.LastKnownStateBlobPath = existing.LastKnownStateBlobPath
}

return spanner.InsertOrUpdateStruct(savedSearchStateTableName, newState)
})

if err != nil {
return false, err
}

return true, nil
}

// ReleaseSavedSearchStateWorkerLock releases the worker lock for the given saved search and snapshot type.
// The caller must own the lock. If not, ErrLockNotOwned is returned.
func (c *Client) ReleaseSavedSearchStateWorkerLock(
ctx context.Context,
savedSearchID string,
snapshotType SavedSearchSnapshotType,
workerID string) error {
mutator := newEntityMutator[savedSearchStateMapper, SavedSearchState](c)
key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType}

return mutator.readInspectMutate(ctx, key,
func(_ context.Context, existing *SavedSearchState) (*spanner.Mutation, error) {
// If row is gone, nothing to release
if existing == nil {
return nil, nil
}

// Verify the caller owns this lock
if existing.WorkerLockID == nil || *existing.WorkerLockID != workerID {
return nil, ErrLockNotOwned
}

newState := SavedSearchState{
SavedSearchID: savedSearchID,
SnapshotType: snapshotType,
// Release the lock
WorkerLockID: nil,
WorkerLockExpiresAt: nil,
LastKnownStateBlobPath: nil,
}

// Preserve the existing blob path
newState.LastKnownStateBlobPath = existing.LastKnownStateBlobPath

return spanner.InsertOrUpdateStruct(savedSearchStateTableName, newState)
})
}

// GetSavedSearchState retrieves the SavedSearchState for the given saved search and snapshot type.
// If no such row exists, ErrQueryReturnedNoResults is returned.
func (c *Client) GetSavedSearchState(
ctx context.Context,
savedSearchID string,
snapshotType SavedSearchSnapshotType) (*SavedSearchState, error) {
r := newEntityReader[savedSearchStateMapper, SavedSearchState, savedSearchStateKey](c)
key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType}

return r.readRowByKey(ctx, key)
}

// UpdateSavedSearchStateLastKnownStateBlobPath updates the LastKnownStateBlobPath
// for the given saved search and snapshot type.
// The row must already exist. Else, ErrQueryReturnedNoResults is returned.
func (c *Client) UpdateSavedSearchStateLastKnownStateBlobPath(
ctx context.Context,
savedSearchID string,
snapshotType SavedSearchSnapshotType,
blobPath string) error {
mutator := newEntityMutator[savedSearchStateMapper, SavedSearchState](c)
key := savedSearchStateKey{SavedSearchID: savedSearchID, SnapshotType: snapshotType}

return mutator.readInspectMutate(ctx, key,
func(_ context.Context, existing *SavedSearchState) (*spanner.Mutation, error) {
if existing == nil {
return nil, ErrQueryReturnedNoResults
}
// Update existing row
existing.LastKnownStateBlobPath = &blobPath

return spanner.UpdateStruct(savedSearchStateTableName, *existing)
})
}
Loading
Loading