Skip to content

Commit

Permalink
Add WebSocket sync proxy support
Browse files Browse the repository at this point in the history
  • Loading branch information
duo committed Mar 3, 2023
1 parent d8d0f12 commit d3c7c3f
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.19-alpine AS builder
FROM golang:1.20-alpine AS builder

RUN apk add --no-cache git ca-certificates build-base olm-dev

Expand Down
5 changes: 5 additions & 0 deletions example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ homeserver:
# The domain of the homeserver (for MXIDs, etc).
domain: example.com

# Set to null to disable using the websocket. When not using the websocket, make sure hostname and port are set in the appservice section.
websocket_proxy:
# How often should the websocket be pinged? Pinging will be disabled if this is zero.
ping_interval_seconds: 0

# What software is the homeserver running?
# Standard Matrix homeservers like Synapse, Dendrite and Conduit should just use "standard" here.
software: standard
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ require (
github.com/lib/pq v1.10.7
github.com/mattn/go-sqlite3 v1.14.16
github.com/wdvxdr1123/go-silk v0.0.0-20220304095002-f67345df09ea
maunium.net/go/maulogger/v2 v2.4.0
maunium.net/go/maulogger/v2 v2.4.1
maunium.net/go/mautrix v0.14.0
)

require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/gabriel-vasile/mimetype v1.4.1 h1:TRWk7se+TOjCYgRth7+1/OYLNiRNIotknkFtf/dnN7Q=
github.com/gabriel-vasile/mimetype v1.4.1/go.mod h1:05Vi0w3Y9c/lNvJOdmIwvrrAhX3rYhfQQCaf9VJcv7M=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -87,8 +87,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/maulogger/v2 v2.4.0 h1:Hg9clkipyjq+EjpWLEKx1zuK7KksysR6I9cHqRgqH1k=
maunium.net/go/maulogger/v2 v2.4.0/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho=
maunium.net/go/maulogger/v2 v2.4.1 h1:N7zSdd0mZkB2m2JtFUsiGTQQAdP0YeFWT7YMc80yAL8=
maunium.net/go/maulogger/v2 v2.4.1/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho=
maunium.net/go/mautrix v0.14.0 h1:kdQ06HzmMaLGZqmSh/ykDhp5C2gIREQL9TS8hY+FqLs=
maunium.net/go/mautrix v0.14.0/go.mod h1:voJPvnTkA60rxBl6mvdPxcP7y7iY5w3d/K55IoX+2oY=
modernc.org/libc v1.8.1/go.mod h1:U1eq8YWr/Kc1RWCMFUWEdkTg8OTcfLw2kY8EDwl039w=
Expand Down
112 changes: 112 additions & 0 deletions internal/matrix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package internal

import (
"encoding/json"
"sync/atomic"
"time"

"maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/id"
)

const DefaultSyncProxyBackoff = 1 * time.Second
const MaxSyncProxyBackoff = 60 * time.Second

const BridgeStatusConnected = "CONNECTED"

type WebsocketCommandHandler struct {
bridge *WechatBridge
log maulogger.Logger
errorTxnIDC *appservice.TransactionIDCache

lastSyncProxyError time.Time
syncProxyBackoff time.Duration
syncProxyWaiting int64
}

type BridgeStatus struct {
StateEvent string `json:"state_event"`
Timestamp int64 `json:"timestamp"`
TTL int `json:"ttl"`
Source string `json:"source"`
Error string `json:"error,omitempty"`
Message string `json:"message,omitempty"`
UserID id.UserID `json:"user_id,omitempty"`
RemoteID string `json:"remote_id,omitempty"`
RemoteName string `json:"remote_name,omitempty"`

Info map[string]interface{} `json:"info,omitempty"`
}

func NewWebsocketCommandHandler(br *WechatBridge) *WebsocketCommandHandler {
handler := &WebsocketCommandHandler{
bridge: br,
log: br.Log.Sub("MatrixWebsocket"),
errorTxnIDC: appservice.NewTransactionIDCache(8),
syncProxyBackoff: DefaultSyncProxyBackoff,
}
br.AS.PrepareWebsocket()
br.AS.SetWebsocketCommandHandler("ping", handler.handleWSPing)
br.AS.SetWebsocketCommandHandler("syncproxy_error", handler.handleWSSyncProxyError)
return handler
}

func (mx *WebsocketCommandHandler) handleWSPing(cmd appservice.WebsocketCommand) (bool, interface{}) {
mx.log.Warnfln("Receive ws ping")
status := BridgeStatus{
StateEvent: BridgeStatusConnected,
Timestamp: time.Now().Unix(),
TTL: 600,
Source: "bridge",
}

return true, &status
}

func (mx *WebsocketCommandHandler) handleWSSyncProxyError(cmd appservice.WebsocketCommand) (bool, interface{}) {
var data mautrix.RespError
err := json.Unmarshal(cmd.Data, &data)

if err != nil {
mx.log.Warnln("Failed to unmarshal syncproxy_error data:", err)
} else if txnID, ok := data.ExtraData["txn_id"].(string); !ok {
mx.log.Warnln("Got syncproxy_error data with no transaction ID")
} else if mx.errorTxnIDC.IsProcessed(txnID) {
mx.log.Debugln("Ignoring syncproxy_error with duplicate transaction ID", txnID)
} else {
go mx.HandleSyncProxyError(&data, nil)
mx.errorTxnIDC.MarkProcessed(txnID)
}

return true, &data
}

func (mx *WebsocketCommandHandler) HandleSyncProxyError(syncErr *mautrix.RespError, startErr error) {
if !atomic.CompareAndSwapInt64(&mx.syncProxyWaiting, 0, 1) {
var err interface{} = startErr
if err == nil {
err = syncErr.Err
}
mx.log.Debugfln("Got sync proxy error (%v), but there's already another thread waiting to restart sync proxy", err)
return
}
if time.Since(mx.lastSyncProxyError) < MaxSyncProxyBackoff {
mx.syncProxyBackoff *= 2
if mx.syncProxyBackoff > MaxSyncProxyBackoff {
mx.syncProxyBackoff = MaxSyncProxyBackoff
}
} else {
mx.syncProxyBackoff = DefaultSyncProxyBackoff
}
mx.lastSyncProxyError = time.Now()
if syncErr != nil {
mx.log.Errorfln("Syncproxy told us that syncing failed: %s - Requesting a restart in %s", syncErr.Err, mx.syncProxyBackoff)
} else if startErr != nil {
mx.log.Errorfln("Failed to request sync proxy to start syncing: %v - Requesting a restart in %s", startErr, mx.syncProxyBackoff)
}
time.Sleep(mx.syncProxyBackoff)
atomic.StoreInt64(&mx.syncProxyWaiting, 0)
mx.bridge.RequestStartSync()
}
Loading

0 comments on commit d3c7c3f

Please sign in to comment.