Skip to content

Commit

Permalink
Merge pull request #657 from ably/feature/realtime-fallbacks
Browse files Browse the repository at this point in the history
[ECO-4327] Feature - realtime fallbacks
  • Loading branch information
sacOO7 committed Aug 27, 2024
2 parents f749c38 + 63cb110 commit 673864c
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 76 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,6 @@ See [jwt auth issue](https://github.com/ably/ably-go/issues/569) for more detail
- Inband reauthentication is not supported; expiring tokens will trigger a disconnection and resume of a realtime
connection. See [server initiated auth](https://github.com/ably/ably-go/issues/228) for more details.

- Realtime connection failure handling is partially implemented. See [host fallback](https://github.com/ably/ably-go/issues/225) for more details.

- Channel suspended state is partially implemented. See [suspended channel state](https://github.com/ably/ably-go/issues/568).

- Realtime Ping function is not implemented.
Expand Down
11 changes: 11 additions & 0 deletions ably/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func errFromUnprocessableBody(resp *http.Response) error {
return &ErrorInfo{Code: ErrBadRequest, StatusCode: resp.StatusCode, err: err}
}

func isTimeoutOrDnsErr(err error) bool {
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() { // RSC15l2
return true
}
}
var dnsErr *net.DNSError
return errors.As(err, &dnsErr) // RSC15l1
}

func checkValidHTTPResponse(resp *http.Response) error {
type errorBody struct {
Error errorInfo `json:"error,omitempty" codec:"error,omitempty"`
Expand Down
21 changes: 17 additions & 4 deletions ably/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ func (opts *clientOptions) RestURL() string {
return opts.restURL()
}

func (opts *clientOptions) RealtimeURL() string {
return opts.realtimeURL()
}

func (c *REST) Post(ctx context.Context, path string, in, out interface{}) (*http.Response, error) {
return c.post(ctx, path, in, out)
}
Expand Down Expand Up @@ -93,6 +89,10 @@ func (c *REST) GetCachedFallbackHost() string {
return c.hostCache.get()
}

func (c *REST) ActiveRealtimeHost() string {
return c.activeRealtimeHost
}

func (c *RealtimeChannel) GetChannelSerial() string {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down Expand Up @@ -121,6 +121,10 @@ func (opts *clientOptions) GetFallbackRetryTimeout() time.Duration {
return opts.fallbackRetryTimeout()
}

func (opts *clientOptions) HasActiveInternetConnection() bool {
return opts.hasActiveInternetConnection()
}

func NewErrorInfo(code ErrorCode, err error) *ErrorInfo {
return newError(code, err)
}
Expand Down Expand Up @@ -222,6 +226,10 @@ func (c *Connection) SetKey(key string) {
c.key = key
}

func (r *Realtime) Rest() *REST {
return r.rest
}

func (c *RealtimePresence) Members() map[string]*PresenceMessage {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down Expand Up @@ -272,6 +280,11 @@ type DurationFromMsecs = durationFromMsecs
type ProtoErrorInfo = errorInfo
type ProtoFlag = protoFlag
type ProtocolMessage = protocolMessage
type WebsocketErr = websocketErr

func (w *WebsocketErr) HttpResp() *http.Response {
return w.resp
}

const (
DefaultCipherKeyLength = defaultCipherKeyLength
Expand Down
26 changes: 23 additions & 3 deletions ably/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package ably

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
Expand All @@ -28,6 +30,10 @@ const (
Port = 80
TLSPort = 443
maxMessageSize = 65536 // 64kb, default value TO3l8

// RTN17c
internetCheckUrl = "https://internet-up.ably-realtime.com/is-the-internet-up.txt"
internetCheckOk = "yes"
)

var defaultOptions = clientOptions{
Expand Down Expand Up @@ -482,10 +488,10 @@ func (opts *clientOptions) restURL() (restUrl string) {
return "https://" + baseUrl
}

func (opts *clientOptions) realtimeURL() (realtimeUrl string) {
baseUrl := opts.getRealtimeHost()
func (opts *clientOptions) realtimeURL(realtimeHost string) (realtimeUrl string) {
baseUrl := realtimeHost
_, _, err := net.SplitHostPort(baseUrl)
if err != nil { // set port if not set in baseUrl
if err != nil { // set port if not set in provided realtimeHost
port, _ := opts.activePort()
baseUrl = net.JoinHostPort(baseUrl, strconv.Itoa(port))
}
Expand Down Expand Up @@ -595,6 +601,20 @@ func (opts *clientOptions) idempotentRESTPublishing() bool {
return opts.IdempotentRESTPublishing
}

// RTN17c
func (opts *clientOptions) hasActiveInternetConnection() bool {
res, err := opts.httpclient().Get(internetCheckUrl)
if err != nil || res.StatusCode != 200 {
return false
}
defer res.Body.Close()
data, err := io.ReadAll(res.Body)
if err != nil {
return false
}
return bytes.Contains(data, []byte(internetCheckOk))
}

type ScopeParams struct {
Start time.Time
End time.Time
Expand Down
45 changes: 23 additions & 22 deletions ably/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,32 @@ import (
)

func TestDefaultFallbacks_RSC15h(t *testing.T) {
t.Run("with env should return environment fallback hosts", func(t *testing.T) {
expectedFallBackHosts := []string{
"a.ably-realtime.com",
"b.ably-realtime.com",
"c.ably-realtime.com",
"d.ably-realtime.com",
"e.ably-realtime.com",
}
hosts := ably.DefaultFallbackHosts()
assert.Equal(t, expectedFallBackHosts, hosts)
})
expectedFallBackHosts := []string{
"a.ably-realtime.com",
"b.ably-realtime.com",
"c.ably-realtime.com",
"d.ably-realtime.com",
"e.ably-realtime.com",
}
hosts := ably.DefaultFallbackHosts()
assert.Equal(t, expectedFallBackHosts, hosts)
}

func TestEnvFallbackHosts_RSC15i(t *testing.T) {
t.Run("with env should return environment fallback hosts", func(t *testing.T) {
expectedFallBackHosts := []string{
"sandbox-a-fallback.ably-realtime.com",
"sandbox-b-fallback.ably-realtime.com",
"sandbox-c-fallback.ably-realtime.com",
"sandbox-d-fallback.ably-realtime.com",
"sandbox-e-fallback.ably-realtime.com",
}
hosts := ably.GetEnvFallbackHosts("sandbox")
assert.Equal(t, expectedFallBackHosts, hosts)
})
expectedFallBackHosts := []string{
"sandbox-a-fallback.ably-realtime.com",
"sandbox-b-fallback.ably-realtime.com",
"sandbox-c-fallback.ably-realtime.com",
"sandbox-d-fallback.ably-realtime.com",
"sandbox-e-fallback.ably-realtime.com",
}
hosts := ably.GetEnvFallbackHosts("sandbox")
assert.Equal(t, expectedFallBackHosts, hosts)
}

func TestInternetConnectionCheck_RTN17c(t *testing.T) {
clientOptions := ably.NewClientOptions()
assert.True(t, clientOptions.HasActiveInternetConnection())
}

func TestFallbackHosts_RSC15b(t *testing.T) {
Expand Down
Loading

0 comments on commit 673864c

Please sign in to comment.