Skip to content
This repository has been archived by the owner on Mar 5, 2022. It is now read-only.

Commit

Permalink
fixed routing table update intervals miss changes
Browse files Browse the repository at this point in the history
I'm not too thrilled with this change...it feels messier...but it's a
start at fetching Routing Tables on demand and caching them for some
period of time. This solves the problem of a new database being
created and it not being accessible immediately.

There are still some issues for newly created databases and they need
to be ironed out, but existing databases should still behave the same
as before.
  • Loading branch information
voutilad committed Dec 23, 2020
1 parent 4fa2eba commit de163dd
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 286 deletions.
100 changes: 60 additions & 40 deletions backend/backend.go
Original file line number Diff line number Diff line change
@@ -1,66 +1,87 @@
package backend

import (
"errors"
"log"
"strings"
"net/url"
"sync"
"time"

"github.com/voutilad/bolt-proxy/bolt"
)

type Backend struct {
monitor *Monitor
routingTable *RoutingTable
tls bool
log *log.Logger
monitor *Monitor
tls bool
log *log.Logger
// map of principals -> hosts -> connections
connectionPool map[string]map[string]bolt.BoltConn
routingCache map[string]RoutingTable
info ClusterInfo
}

func NewBackend(logger *log.Logger, username, password string, uri string, hosts ...string) (*Backend, error) {
monitor, err := NewMonitor(username, password, uri, hosts...)
tls := false
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
routingTable := <-monitor.C

tls := false
switch strings.Split(uri, ":")[0] {
switch u.Scheme {
case "bolt+s", "bolt+ssc", "neo4j+s", "neo4j+ssc":
tls = true
case "bolt", "neo4j":
// ok
default:
return nil, errors.New("invalid neo4j connection scheme")
}

monitor, err := NewMonitor(username, password, uri, hosts...)
if err != nil {
return nil, err
}

return &Backend{
monitor: monitor,
routingTable: routingTable,
tls: tls,
log: logger,
monitor: monitor,
tls: tls,
log: logger,
connectionPool: make(map[string]map[string]bolt.BoltConn),
routingCache: make(map[string]RoutingTable),
info: <-monitor.Info,
}, nil
}

func (b *Backend) Version() Version {
return b.monitor.Version
}

func (b *Backend) RoutingTable() *RoutingTable {
if b.routingTable == nil {
panic("attempting to use uninitialized BackendClient")
func (b *Backend) RoutingTable(db string) (RoutingTable, error) {
table, found := b.routingCache[db]
if found && !table.Expired() {
return table, nil
}

b.log.Println("checking routing table...")
if b.routingTable.Expired() {
table, err := b.monitor.UpdateRoutingTable(db)
if err != nil {
return RoutingTable{}, err
}

b.log.Printf("got routing table for %s: %s", db, table)
return table, nil
}

func (b *Backend) ClusterInfo() (ClusterInfo, error) {
// XXX: this technically isn't thread safe as we mutate b.info

if b.info.CreatedAt.Add(30 * time.Second).Before(time.Now()) {
select {
case rt := <-b.monitor.C:
b.routingTable = rt
case <-time.After(60 * time.Second):
b.log.Fatal("timeout waiting for new routing table!")
case <-time.After(30 * time.Second):
return ClusterInfo{}, errors.New("timeout waiting for updated ClusterInfo")
case info := <-b.monitor.Info:
b.info = info
}
}

b.log.Println("using routing table")
return b.routingTable
return b.info, nil
}

// For now, we'll authenticate to all known hosts up-front to simplify things.
Expand All @@ -74,7 +95,7 @@ func (b *Backend) Authenticate(hello *bolt.Message) (map[string]bolt.BoltConn, e
panic("authenticate requires a Hello message")
}

// TODO: clean up this api...push the dirt into Bolt package
// TODO: clean up this api...push the dirt into Bolt package?
msg, pos, err := bolt.ParseMap(hello.Data[4:])
if err != nil {
b.log.Printf("XXX pos: %d, hello map: %#v\n", pos, msg)
Expand All @@ -86,37 +107,36 @@ func (b *Backend) Authenticate(hello *bolt.Message) (map[string]bolt.BoltConn, e
}
b.log.Println("found principal:", principal)

// refresh routing table
// TODO: this api seems backwards...push down into table?
rt := b.RoutingTable()

// Try authing first with the default db writer before we try others
// Try authing first with a Core cluster member before we try others
// this way we can fail fast and not spam a bad set of credentials
writers, _ := rt.WritersFor(rt.DefaultDb)
defaultWriter := writers[0]
info, err := b.ClusterInfo()
if err != nil {
return nil, err
}
defaultHost := info.Hosts[0]

b.log.Printf("trying to auth %s to host %s\n", principal, defaultWriter)
b.log.Printf("trying to auth %s to host %s\n", principal, defaultHost)
conn, err := authClient(hello.Data, b.Version().Bytes(),
"tcp", defaultWriter, b.tls)
"tcp", defaultHost, b.tls)
if err != nil {
return nil, err
}

// Ok, now to get the rest
conns := make(map[string]bolt.BoltConn, len(rt.Hosts))
conns[defaultWriter] = bolt.NewDirectConn(conn)
conns := make(map[string]bolt.BoltConn, len(info.Hosts))
conns[defaultHost] = bolt.NewDirectConn(conn)

// We'll need a channel to collect results as we're going to auth
// to all hosts asynchronously
type pair struct {
conn bolt.BoltConn
host string
}
c := make(chan pair, len(rt.Hosts)+1)
c := make(chan pair, len(info.Hosts)+1)
var wg sync.WaitGroup
for host := range rt.Hosts {
for _, host := range info.Hosts {
// skip the host we already used to test auth
if host != defaultWriter {
if host != defaultHost {
wg.Add(1)
go func(h string) {
defer wg.Done()
Expand Down
Loading

0 comments on commit de163dd

Please sign in to comment.