Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.24.1
require (
firebase.google.com/go/v4 v4.12.1
github.com/android-sms-gateway/client-go v1.9.5
github.com/android-sms-gateway/core v1.0.1
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/capcom6/go-helpers v0.3.0
github.com/capcom6/go-infra-fx v0.4.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/android-sms-gateway/client-go v1.9.5 h1:fHrE1Pi3rKUdPVMmI9evKW0iyjB5bMIhFRxyq1wVQ+o=
github.com/android-sms-gateway/client-go v1.9.5/go.mod h1:DQsReciU1xcaVW3T5Z2bqslNdsAwCFCtghawmA6g6L4=
github.com/android-sms-gateway/core v1.0.1 h1:7QyqyW3UQSQmEXQuUgXjZwHSnOd65DTxHUyhXQi6gpc=
github.com/android-sms-gateway/core v1.0.1/go.mod h1:HXczGDCKxTeuiwadPElczCx/y3Y6Wamc5kl5nFp5rVM=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM=
Expand Down
8 changes: 8 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
Tasks Tasks `yaml:"tasks"` // tasks config
SSE SSE `yaml:"sse"` // server-sent events config
Cache Cache `yaml:"cache"` // cache (memory or redis) config
PubSub PubSub `yaml:"pubsub"` // pubsub (memory or redis) config
}

type Gateway struct {
Expand Down Expand Up @@ -75,6 +76,10 @@ type Cache struct {
URL string `yaml:"url" envconfig:"CACHE__URL"`
}

type PubSub struct {
URL string `yaml:"url" envconfig:"PUBSUB__URL"`
}

var defaultConfig = Config{
Gateway: Gateway{Mode: GatewayModePublic},
HTTP: HTTP{
Expand Down Expand Up @@ -103,4 +108,7 @@ var defaultConfig = Config{
Cache: Cache{
URL: "memory://",
},
PubSub: PubSub{
URL: "memory://",
},
}
7 changes: 7 additions & 0 deletions internal/config/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/messages"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse"
"github.com/android-sms-gateway/server/internal/sms-gateway/pubsub"
"github.com/capcom6/go-infra-fx/config"
"github.com/capcom6/go-infra-fx/db"
"github.com/capcom6/go-infra-fx/http"
Expand Down Expand Up @@ -121,4 +122,10 @@ var Module = fx.Module(
URL: cfg.Cache.URL,
}
}),
fx.Provide(func(cfg Config) pubsub.Config {
return pubsub.Config{
URL: cfg.PubSub.URL,
BufferSize: 128,
}
}),
)
2 changes: 2 additions & 0 deletions internal/sms-gateway/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/webhooks"
"github.com/android-sms-gateway/server/internal/sms-gateway/online"
"github.com/android-sms-gateway/server/internal/sms-gateway/openapi"
"github.com/android-sms-gateway/server/internal/sms-gateway/pubsub"
"github.com/capcom6/go-infra-fx/cli"
"github.com/capcom6/go-infra-fx/db"
"github.com/capcom6/go-infra-fx/http"
Expand All @@ -45,6 +46,7 @@ var Module = fx.Module(
push.Module,
db.Module,
cache.Module(),
pubsub.Module(),
events.Module,
messages.Module,
health.Module,
Expand Down
14 changes: 7 additions & 7 deletions internal/sms-gateway/cache/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"net/url"

"github.com/android-sms-gateway/core/redis"
"github.com/android-sms-gateway/server/pkg/cache"
)

Expand Down Expand Up @@ -40,13 +39,14 @@ func NewFactory(config Config) (Factory, error) {
},
}, nil
case "redis":
client, err := redis.New(redis.Config{URL: config.URL})
if err != nil {
return nil, fmt.Errorf("can't create redis client: %w", err)
}
return &factory{
new: func(name string) (Cache, error) {
return cache.NewRedis(client, name, 0), nil
return cache.NewRedis(cache.RedisConfig{
Client: nil,
URL: config.URL,
Prefix: keyPrefix + name,
TTL: 0,
})
},
}, nil
default:
Expand All @@ -56,5 +56,5 @@ func NewFactory(config Config) (Factory, error) {

// New implements Factory.
func (f *factory) New(name string) (Cache, error) {
return f.new(keyPrefix + name)
return f.new(name)
}
8 changes: 4 additions & 4 deletions internal/sms-gateway/modules/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
"github.com/android-sms-gateway/client-go/smsgateway"
)

func NewMessageEnqueuedEvent() *Event {
func NewMessageEnqueuedEvent() Event {
return NewEvent(smsgateway.PushMessageEnqueued, nil)
}

func NewWebhooksUpdatedEvent() *Event {
func NewWebhooksUpdatedEvent() Event {
return NewEvent(smsgateway.PushWebhooksUpdated, nil)
}

func NewMessagesExportRequestedEvent(since, until time.Time) *Event {
func NewMessagesExportRequestedEvent(since, until time.Time) Event {
return NewEvent(
smsgateway.PushMessagesExportRequested,
map[string]string{
Expand All @@ -24,6 +24,6 @@ func NewMessagesExportRequestedEvent(since, until time.Time) *Event {
)
}

func NewSettingsUpdatedEvent() *Event {
func NewSettingsUpdatedEvent() Event {
return NewEvent(smsgateway.PushSettingsUpdated, nil)
}
7 changes: 5 additions & 2 deletions internal/sms-gateway/modules/events/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ const (
DeliveryTypeSSE = "sse"
DeliveryTypeUnknown = "unknown"

FailureReasonQueueFull = "queue_full"
FailureReasonProviderFailed = "provider_failed"
FailureReasonSerializationError = "serialization_error"
FailureReasonPublishError = "publish_error"
FailureReasonProviderFailed = "provider_failed"

EventTypeUnknown = "unknown"
)

// metrics contains all Prometheus metrics for the events module
Expand Down
11 changes: 9 additions & 2 deletions internal/sms-gateway/modules/events/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@ var Module = fx.Module(
}),
fx.Provide(newMetrics, fx.Private),
fx.Provide(NewService),
fx.Invoke(func(lc fx.Lifecycle, svc *Service) {
fx.Invoke(func(lc fx.Lifecycle, svc *Service, logger *zap.Logger, sh fx.Shutdowner) {
ctx, cancel := context.WithCancel(context.Background())
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
go svc.Run(ctx)
go func() {
if err := svc.Run(ctx); err != nil {
logger.Error("Error running events service", zap.Error(err))
if err := sh.Shutdown(fx.ExitCode(1)); err != nil {
logger.Error("Failed to shutdown", zap.Error(err))
}
}
}()
return nil
},
OnStop: func(_ context.Context) error {
Expand Down
82 changes: 58 additions & 24 deletions internal/sms-gateway/modules/events/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,106 @@ package events
import (
"context"
"fmt"
"time"

"github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse"
"github.com/android-sms-gateway/server/internal/sms-gateway/pubsub"
"go.uber.org/zap"
)

const (
pubsubTopic = "events"
)

type Service struct {
deviceSvc *devices.Service

sseSvc *sse.Service
pushSvc *push.Service

queue chan eventWrapper
pubsub pubsub.PubSub

metrics *metrics

logger *zap.Logger
}

func NewService(devicesSvc *devices.Service, sseSvc *sse.Service, pushSvc *push.Service, metrics *metrics, logger *zap.Logger) *Service {
func NewService(devicesSvc *devices.Service, sseSvc *sse.Service, pushSvc *push.Service, pubsub pubsub.PubSub, metrics *metrics, logger *zap.Logger) *Service {
return &Service{
deviceSvc: devicesSvc,
sseSvc: sseSvc,
pushSvc: pushSvc,

metrics: metrics,

queue: make(chan eventWrapper, 128),
pubsub: pubsub,

logger: logger,
}
}

func (s *Service) Notify(userID string, deviceID *string, event *Event) error {
func (s *Service) Notify(userID string, deviceID *string, event Event) error {
if event.EventType == "" {
return fmt.Errorf("event type is empty")
}

subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

wrapper := eventWrapper{
UserID: userID,
DeviceID: deviceID,
Event: event,
}

select {
case s.queue <- wrapper:
// Successfully enqueued
s.metrics.IncrementEnqueued(string(event.eventType))
default:
s.metrics.IncrementFailed(string(event.eventType), DeliveryTypeUnknown, FailureReasonQueueFull)
return fmt.Errorf("event queue is full")
wrapperBytes, err := wrapper.serialize()
if err != nil {
s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonSerializationError)
return fmt.Errorf("can't serialize event wrapper: %w", err)
}

if err := s.pubsub.Publish(subCtx, pubsubTopic, wrapperBytes); err != nil {
s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonPublishError)
return fmt.Errorf("can't publish event: %w", err)
}

s.metrics.IncrementEnqueued(string(event.EventType))

return nil
}

func (s *Service) Run(ctx context.Context) {
func (s *Service) Run(ctx context.Context) error {
sub, err := s.pubsub.Subscribe(ctx, pubsubTopic)
if err != nil {
return fmt.Errorf("can't subscribe to pubsub: %w", err)
}
defer sub.Close()

ch := sub.Receive()
for {
select {
case wrapper := <-s.queue:
s.processEvent(wrapper)
case <-ctx.Done():
s.logger.Info("Event service stopped")
return
return nil
case msg, ok := <-ch:
if !ok {
s.logger.Info("Subscription closed")
return nil
}
wrapper := new(eventWrapper)
if err := wrapper.deserialize(msg.Data); err != nil {
s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
s.logger.Error("Failed to deserialize event wrapper", zap.Error(err))
continue
}
s.processEvent(wrapper)
}
}
}

func (s *Service) processEvent(wrapper eventWrapper) {
func (s *Service) processEvent(wrapper *eventWrapper) {
// Load devices from database
filters := []devices.SelectFilter{}
if wrapper.DeviceID != nil {
Expand All @@ -91,26 +125,26 @@ func (s *Service) processEvent(wrapper eventWrapper) {
if device.PushToken != nil && *device.PushToken != "" {
// Device has push token, use push service
if err := s.pushSvc.Enqueue(*device.PushToken, push.Event{
Type: wrapper.Event.eventType,
Data: wrapper.Event.data,
Type: wrapper.Event.EventType,
Data: wrapper.Event.Data,
}); err != nil {
s.logger.Error("Failed to enqueue push notification", zap.String("user_id", wrapper.UserID), zap.String("device_id", device.ID), zap.Error(err))
s.metrics.IncrementFailed(string(wrapper.Event.eventType), DeliveryTypePush, FailureReasonProviderFailed)
s.metrics.IncrementFailed(string(wrapper.Event.EventType), DeliveryTypePush, FailureReasonProviderFailed)
} else {
s.metrics.IncrementSent(string(wrapper.Event.eventType), DeliveryTypePush)
s.metrics.IncrementSent(string(wrapper.Event.EventType), DeliveryTypePush)
}
continue
}

// No push token, use SSE service
if err := s.sseSvc.Send(device.ID, sse.Event{
Type: wrapper.Event.eventType,
Data: wrapper.Event.data,
Type: wrapper.Event.EventType,
Data: wrapper.Event.Data,
}); err != nil {
s.logger.Error("Failed to send SSE notification", zap.String("user_id", wrapper.UserID), zap.String("device_id", device.ID), zap.Error(err))
s.metrics.IncrementFailed(string(wrapper.Event.eventType), DeliveryTypeSSE, FailureReasonProviderFailed)
s.metrics.IncrementFailed(string(wrapper.Event.EventType), DeliveryTypeSSE, FailureReasonProviderFailed)
} else {
s.metrics.IncrementSent(string(wrapper.Event.eventType), DeliveryTypeSSE)
s.metrics.IncrementSent(string(wrapper.Event.EventType), DeliveryTypeSSE)
}
}
}
28 changes: 19 additions & 9 deletions internal/sms-gateway/modules/events/types.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
package events

import (
"encoding/json"

"github.com/android-sms-gateway/client-go/smsgateway"
)

type Event struct {
eventType smsgateway.PushEventType
data map[string]string
EventType smsgateway.PushEventType `json:"event_type"`
Data map[string]string `json:"data"`
}

func NewEvent(eventType smsgateway.PushEventType, data map[string]string) *Event {
return &Event{
eventType: eventType,
data: data,
func NewEvent(eventType smsgateway.PushEventType, data map[string]string) Event {
return Event{
EventType: eventType,
Data: data,
}
}

type eventWrapper struct {
UserID string
DeviceID *string
Event *Event
UserID string `json:"user_id"`
DeviceID *string `json:"device_id,omitempty"`
Event Event `json:"event"`
}

func (w *eventWrapper) serialize() ([]byte, error) {
return json.Marshal(w)
}

func (w *eventWrapper) deserialize(data []byte) error {
return json.Unmarshal(data, w)
}
7 changes: 7 additions & 0 deletions internal/sms-gateway/pubsub/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pubsub

// Config controls the PubSub backend via a URL (e.g., "memory://", "redis://...").
type Config struct {
URL string
BufferSize uint
}
Loading
Loading