Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ func (c *client) GetServiceDiscovery() ServiceDiscovery {
return c.pdSvcDiscovery
}

// GetTSOServiceDiscovery returns the TSO service discovery object. Only used for testing.
func (c *client) GetTSOServiceDiscovery() ServiceDiscovery {
return c.tsoSvcDiscovery
}

// UpdateOption updates the client option.
func (c *client) UpdateOption(option DynamicOption, value any) error {
switch option {
Expand Down
91 changes: 51 additions & 40 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,6 @@ type tsoServerDiscovery struct {
urls []string
// used for round-robin load balancing
selectIdx int
// failureCount counts the consecutive failures for communicating with the tso servers
failureCount int
}

func (t *tsoServerDiscovery) countFailure() bool {
t.Lock()
defer t.Unlock()
t.failureCount++
return t.failureCount >= len(t.urls)
}

func (t *tsoServerDiscovery) resetFailure() {
t.Lock()
defer t.Unlock()
t.failureCount = 0
}

// tsoServiceDiscovery is the service discovery client of the independent TSO service
Expand Down Expand Up @@ -429,15 +414,12 @@ func (c *tsoServiceDiscovery) updateMember() error {
if len(tsoServerURL) > 0 {
keyspaceGroup, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, updateMemberTimeout)
if err != nil {
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group",
zap.Uint32("keyspace-id-in-request", keyspaceID),
zap.String("tso-server-url", tsoServerURL),
errs.ZapError(err))
}
log.Error("[tso] failed to find the keyspace group",
zap.Uint32("keyspace-id-in-request", keyspaceID),
zap.String("tso-server-url", tsoServerURL),
errs.ZapError(err))
return err
}
c.tsoServerDiscovery.resetFailure()
} else {
// There is no error but no tso server URL found, which means
// the server side hasn't been upgraded to the version that
Expand Down Expand Up @@ -573,36 +555,37 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID(
}

func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) {
c.Lock()
defer c.Unlock()

var (
urls []string
err error
)
urls, err = sd.(*pdServiceDiscovery).discoverMicroservice(tsoService)
if err != nil {
return "", err
}

c.Lock()
defer c.Unlock()
t := c.tsoServerDiscovery
if len(t.urls) == 0 || t.failureCount == len(t.urls) {
urls, err = sd.(*pdServiceDiscovery).discoverMicroservice(tsoService)
if err != nil {
return "", err
}
failpoint.Inject("serverReturnsNoTSOAddrs", func() {
failpoint.Inject("serverReturnsNoTSOAddrs", func() {
if len(t.urls) == 0 {
log.Info("[failpoint] injected error: server returns no tso URLs")
urls = nil
})
if len(urls) == 0 {
// There is no error but no tso server url found, which means
// the server side hasn't been upgraded to the version that
// processes and returns GetClusterInfoResponse.TsoUrls. Return here
// and handle the fallback logic outside of this function.
return "", nil
}
})

log.Info("update tso server URLs", zap.Strings("urls", urls))
if len(urls) == 0 {
// There is no error but no tso server url found, which means
// the server side hasn't been upgraded to the version that
// processes and returns GetClusterInfoResponse.TsoUrls. Return here
// and handle the fallback logic outside of this function.
return "", nil
}

if len(t.urls) == 0 || !EqualWithoutOrder(t.urls, urls) {
log.Info("update tso server URLs", zap.Strings("urls", urls))
t.urls = urls
t.selectIdx = 0
t.failureCount = 0
}

// Pick a TSO server in a round-robin way.
Expand Down Expand Up @@ -641,3 +624,31 @@ func (c *tsoServiceDiscovery) discoverWithLegacyPath() ([]string, error) {
}
return listenUrls, nil
}

// GetURLs returns the URLs of the TSO servers. Only used for testing.
func (c *tsoServiceDiscovery) GetURLs() []string {
return c.urls
}

// EqualWithoutOrder checks if two slices are equal without considering the order.
func EqualWithoutOrder[T comparable](a, b []T) bool {
if len(a) != len(b) {
return false
}
for _, item := range a {
if !Contains(b, item) {
return false
}
}
return true
}

// Contains returns true if the given slice contains the value.
func Contains[T comparable](slice []T, value T) bool {
for _, v := range slice {
if v == value {
return true
}
}
return false
}
47 changes: 47 additions & 0 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,50 @@ func checkTSO(
}()
}
}

func TestTSOServiceDiscovery(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pdCluster, err := tests.NewTestAPICluster(ctx, 1)
re.NoError(err)
defer pdCluster.Destroy()
err = pdCluster.RunInitialServers()
re.NoError(err)
leaderName := pdCluster.WaitLeader()
re.NotEmpty(leaderName)
pdLeader := pdCluster.GetServer(leaderName)
re.NoError(pdLeader.BootstrapCluster())

_, cleanup1 := tests.StartSingleTSOTestServer(ctx, re, pdLeader.GetAddr(), tempurl.Alloc())
defer cleanup1()

pdClient, err := pd.NewClientWithKeyspace(context.Background(),
constant.DefaultKeyspaceID,
[]string{pdLeader.GetAddr()}, pd.SecurityOption{})
re.NoError(err)
defer pdClient.Close()
physical, logical, err := pdClient.GetTS(ctx)
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
re.NotEmpty(ts)
checkServiceDiscovery(re, pdClient, 1)

_, cleanup2 := tests.StartSingleTSOTestServer(ctx, re, pdLeader.GetAddr(), tempurl.Alloc())
defer cleanup2()
checkServiceDiscovery(re, pdClient, 2)
}

func checkServiceDiscovery(re *require.Assertions, client pd.Client, urlsLen int) {
inner, ok := client.(interface{ GetTSOServiceDiscovery() pd.ServiceDiscovery })
if ok {
tsoDiscovery := inner.GetTSOServiceDiscovery()
err := tsoDiscovery.CheckMemberChanged()
re.NoError(err)
if tsoDiscovery != nil {
urls := tsoDiscovery.(interface{ GetURLs() []string }).GetURLs()
re.Len(urls, urlsLen)
}
}
}