Skip to content

Commit

Permalink
server: remove the conflict between DB updates and running queries
Browse files Browse the repository at this point in the history
Each database handle has a shared lock that queries hold while they run, and
that updating the state of the handle with a new database, label, and named
queries acquire exclusively. This meant that handing off a new database would
block for a long-running query.

I dealt with this by setting a timeout on user queries. That works, but can be
annoying when users have a complex query to run that may take a while.

To address this, change the way updates are done:

- If an update arrives and the handle is not locked, it is applied immediately
  (as before).

- But if the handle is locked (busy), record the intended update in an atomic
  field.

- Each time a query exits, it checks whether there is a pending update and
  tries to apply it.

- Each time the UI browses the handles, it checks whether there are pending
  updates and tries to apply them.

If an update cannot be applied immediately, then one or more running queries
hold the lock that prevents it. Either the last such query, or a subsequent UI
visit, will notice the pending query and apply it.
  • Loading branch information
creachadair committed Mar 14, 2024
1 parent fe491d5 commit 8400c81
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 20 deletions.
82 changes: 71 additions & 11 deletions server/tailsql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/tailscale/hujson"
Expand Down Expand Up @@ -286,6 +287,11 @@ type dbHandle struct {
src string
driver string

// If not nil, the value of this field is a database update that arrived
// while the handle was busy running a query. The concrete type is *dbUpdate
// once initialized.
update atomic.Value

// mu protects the fields below.
// Hold shared to read the label and issue queries against db.
// Hold exclusive to replace or close db or to update label.
Expand All @@ -311,12 +317,45 @@ func (h *dbHandle) handleUpdates(name string, w setec.Watcher, logf logger.Logf)
}
logf("[tailsql] opened new connection for source %q", h.src)
h.mu.Lock()
// Close the existing active handle.
h.db.Close()
// If there's a pending update, close it too.
if up := h.checkUpdate(); up != nil {
up.newDB.Close()
}
h.db = db
h.mu.Unlock()
}
}

// checkUpdate returns nil if there is no pending update, otherwise it swaps
// out the pending database update and returns it.
func (h *dbHandle) checkUpdate() *dbUpdate {
if up := h.update.Swap((*dbUpdate)(nil)); up != nil {
return up.(*dbUpdate)
}
return nil
}

// tryUpdate checks whether h is busy with a query. If not, and there is a
// handle update pending, tryUpdate applies it.
func (h *dbHandle) tryUpdate() {
if h.mu.TryLock() { // if not, the handle is busy; try again later
defer h.mu.Unlock()
if up := h.checkUpdate(); up != nil {
h.applyUpdateLocked(up)
}
}
}

// applyUpdateLocked applies up to h, which must be locked exclusively.
func (h *dbHandle) applyUpdateLocked(up *dbUpdate) {
h.label = up.label
h.named = up.named
h.db.Close()
h.db = up.newDB
}

// Source returns the source name defined for h.
func (h *dbHandle) Source() string { return h.src }

Expand Down Expand Up @@ -383,23 +422,44 @@ func lookupNamedQuery(ctx context.Context, name string) (string, bool) {
}

// swap locks the handle, swaps the current contents of the handle with newDB
// and newLabel, and returns the original value. The caller is responsible for
// and newLabel, and closes the original value. The caller is responsible for
// closing a database handle when it is no longer in use. It will panic if
// newDB == nil, or if h is closed.
func (h *dbHandle) swap(newDB *sql.DB, newOpts *DBOptions) *sql.DB {
func (h *dbHandle) swap(newDB *sql.DB, newOpts *DBOptions) {
if newDB == nil {
panic("new database is nil")
}
h.mu.Lock()
defer h.mu.Unlock()
old := h.db
if old == nil {
panic("handle is closed")

up := &dbUpdate{
newDB: newDB,
label: newOpts.label(),
named: newOpts.namedQueries(),
}

// If the handle is not busy, do the swap now.
if h.mu.TryLock() {
defer h.mu.Unlock()
if h.db == nil {
panic("handle is closed")
}
h.applyUpdateLocked(up)
return
}
h.db = newDB
h.label = newOpts.label()
h.named = newOpts.namedQueries()
return old

// Reaching here, the handle is busy on a query. Record an update to be
// plumbed in later. It's possible we already had a pending update -- if
// that happens close out the old one.
if old := h.update.Swap(up); old != nil {
old.(*dbUpdate).newDB.Close()
}
}

// A dbUpdate is an open database handle, label, and set of named queries that
// are ready to be installed in a database handle.
type dbUpdate struct {
newDB *sql.DB
label string
named map[string]string
}

// close closes the handle, calling Close on the underlying database and
Expand Down
17 changes: 8 additions & 9 deletions server/tailsql/tailsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,12 @@ func (s *Server) SetDB(source string, db *sql.DB, opts *DBOptions) bool {
panic("new database is nil")
}
s.mu.Lock()
defer s.mu.Unlock()

for _, src := range s.dbs {
if src.Source() == source {
s.mu.Unlock()

// Perform the swap outside the service lock, since it may wait if a
// query is in-flight and we don't need or want to block the rest of
// the UI while that's happening.
old := src.swap(db, opts)
old.Close()
return false
src.swap(db, opts)
return true
}
}
s.dbs = append(s.dbs, &dbHandle{
Expand All @@ -205,7 +200,6 @@ func (s *Server) SetDB(source string, db *sql.DB, opts *DBOptions) bool {
label: opts.label(),
named: opts.namedQueries(),
})
s.mu.Unlock()
return false
}

Expand Down Expand Up @@ -609,6 +603,11 @@ func (s *Server) getHandles() []*dbHandle {
s.mu.Lock()
defer s.mu.Unlock()

// Check for pending updates.
for _, h := range s.dbs {
h.tryUpdate()
}

// It is safe to return the slice because we never remove any elements, new
// data are only ever appended to the end.
return s.dbs
Expand Down
5 changes: 5 additions & 0 deletions server/tailsql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func runQueryInTx[T any](ctx context.Context, h *dbHandle, query func(context.Co
out, err = query(fctx, tx)
return err
})

// Check for updates. If this is the last query in flight and there is an
// update, this will succeed; otherwise some other query holds the lock and
// will succeed later.
h.tryUpdate()
return out, err
}

Expand Down

0 comments on commit 8400c81

Please sign in to comment.