-
Notifications
You must be signed in to change notification settings - Fork 27
[pubsub] introduce memory and redis implementations #181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
WalkthroughAdds a pub/sub abstraction with memory and Redis backends, an options pattern, fx wiring and factory for sms-gateway, migrates events delivery to Pub/Sub with JSON-serializable wrappers and value-semantics for Event, and updates cache backends and interfaces to support configurable Redis creation and Close. Changes
Sequence Diagram(s)sequenceDiagram
participant Producer as Service.Notify
participant PubSub as PubSub (Memory/Redis)
participant Consumer as Service.Run
rect rgb(250,247,237)
Producer->>Producer: build eventWrapper & json.Marshal (5s timeout)
Producer->>PubSub: Publish(ctx, "events", data)
PubSub-->>Producer: return (err / nil)
end
rect rgb(237,250,243)
Consumer->>PubSub: Subscribe(ctx, "events")
PubSub-->>Consumer: *Subscription (chan Message)
loop for each message
Consumer->>Subscription: receive from channel
Subscription-->>Consumer: Message{Topic, Data}
Consumer->>Consumer: json.Unmarshal -> eventWrapper
Consumer->>Consumer: processEvent(&eventWrapper)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pkg/pubsub/memory.go
(1 hunks)pkg/pubsub/options.go
(1 hunks)pkg/pubsub/pubsub.go
(1 hunks)pkg/pubsub/redis.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
Message
(13-16)ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)PubSub
(45-50)pkg/pubsub/options.go (1)
Option
(3-3)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
Option
(3-3)pkg/pubsub/pubsub.go (5)
ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)Message
(13-16)PubSub
(45-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: E2E
- GitHub Check: Test
- GitHub Check: Lint
- GitHub Check: Benchmark
- GitHub Check: Analyze (go)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pkg/pubsub/memory.go
(1 hunks)pkg/pubsub/redis.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/pubsub/memory.go
🧰 Additional context used
🧬 Code graph analysis (1)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
Option
(3-3)pkg/pubsub/pubsub.go (5)
ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)Message
(13-16)PubSub
(45-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: Lint
- GitHub Check: E2E
- GitHub Check: Analyze (go)
🔇 Additional comments (5)
pkg/pubsub/redis.go (5)
1-22
: LGTM!The imports and struct definition are well-organized with appropriate synchronization primitives and fields.
24-39
: LGTM!The constructor properly applies the options pattern and initializes all struct fields correctly.
41-53
: LGTM!The Publish method correctly validates the closed state and topic, and properly delegates to the Redis client.
82-112
: LGTM!The goroutine implementation is correct with proper cleanup, comprehensive exit path handling, and safe message forwarding.
130-130
: LGTM!The compile-time interface assertion correctly verifies that
redisPubSub
implements thePubSub
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pkg/pubsub/memory.go
(1 hunks)pkg/pubsub/redis.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
Option
(3-3)pkg/pubsub/pubsub.go (5)
ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)Message
(13-16)PubSub
(45-50)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
Message
(13-16)ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)PubSub
(45-50)pkg/pubsub/options.go (1)
Option
(3-3)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build / Docker image (linux/amd64)
- GitHub Check: Build / Docker image (linux/arm64)
- GitHub Check: Benchmark
- GitHub Check: Analyze (go)
🔇 Additional comments (4)
pkg/pubsub/memory.go (2)
33-73
: LGTM! Publish synchronization is safe.The
RLock
held via defer (line 45) ensures the lock isn't released until afterwg.Wait()
completes (line 70), preventing the race whereunsubscribe
could remove and close a channel while publisher goroutines are sending. The past review concerns about concurrent map access have been properly addressed.
132-143
: Close is idempotent and thread-safe.Returning
nil
on repeated calls (line 135) matches the Redis implementation's behavior, resolving the API consistency concern from past reviews.pkg/pubsub/redis.go (2)
56-117
: LGTM! Subscription lifecycle is correct.The Subscribe implementation properly waits for Redis confirmation (line 68) before proceeding, and handles errors by closing the pubsub object (line 70). The goroutine's defer block (lines 85-94) cleans up resources in the correct order: close Redis subscription, close local channel, remove tracking, signal completion.
119-130
: Past review comment about Close behavior is now outdated.The past review suggested aligning Close behavior between memory and Redis implementations. Both now return
nil
on repeated calls (memory.go line 135, redis.go line 122), making them consistent. The concern has been resolved.
84dfae9
to
737cc55
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
pkg/pubsub/memory.go (2)
121-130
: Consider using defer for mutex unlock.The explicit unlock pattern works correctly, but using
defer
would be more idiomatic and maintainable.Apply this diff to use defer:
func (m *memoryPubSub) unsubscribe(id, topic string) { m.mu.Lock() + defer m.mu.Unlock() + subscriptions, ok := m.topics[topic] if !ok { - m.mu.Unlock() return } delete(subscriptions, id) - m.mu.Unlock() }
44-72
: Document blocking Publish behavior.
Publish
blocks until all subscribers receive the message or the context/close signal is triggered. This differs from the Redis implementation (which returns immediately after sending to Redis) and could surprise callers expecting fire-and-forget semantics.Consider adding a comment documenting this behavior:
// Publish sends a message to all subscribers of the given topic. // This method blocks until all subscribers have received the message // or until ctx is cancelled or the pubsub instance is closed.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pkg/pubsub/memory.go
(1 hunks)pkg/pubsub/options.go
(1 hunks)pkg/pubsub/pubsub.go
(1 hunks)pkg/pubsub/redis.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/pubsub/pubsub.go
🧰 Additional context used
🧬 Code graph analysis (2)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
Message
(13-16)ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)PubSub
(45-50)pkg/pubsub/options.go (1)
Option
(3-3)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
Option
(3-3)pkg/pubsub/pubsub.go (5)
ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)Message
(13-16)PubSub
(45-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: E2E
- GitHub Check: Benchmark
- GitHub Check: Analyze (go)
🔇 Additional comments (1)
pkg/pubsub/redis.go (1)
119-130
: Close behavior is now consistent.Both memory and Redis implementations now return
nil
on repeatedClose()
calls, providing consistent idempotent semantics. This resolves the inconsistency mentioned in previous reviews.
66878bf
to
56f4ebd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
internal/config/config.go
(3 hunks)internal/config/module.go
(2 hunks)internal/sms-gateway/app.go
(2 hunks)internal/sms-gateway/pubsub/config.go
(1 hunks)internal/sms-gateway/pubsub/module.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
internal/sms-gateway/pubsub/module.go (3)
internal/config/module.go (1)
Module
(22-131)internal/sms-gateway/app.go (1)
Module
(36-60)internal/sms-gateway/pubsub/pubsub.go (1)
New
(15-40)
internal/config/module.go (2)
internal/config/config.go (2)
Config
(10-19)Cache
(75-77)internal/sms-gateway/pubsub/config.go (1)
Config
(4-7)
internal/sms-gateway/app.go (2)
internal/config/module.go (1)
Module
(22-131)internal/sms-gateway/pubsub/module.go (1)
Module
(8-16)
internal/sms-gateway/pubsub/pubsub.go (3)
internal/config/config.go (2)
Config
(10-19)PubSub
(79-81)internal/sms-gateway/pubsub/config.go (1)
Config
(4-7)pkg/pubsub/options.go (1)
WithBufferSize
(17-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: E2E
- GitHub Check: Benchmark
- GitHub Check: Analyze (go)
🔇 Additional comments (12)
internal/config/config.go (3)
18-18
: LGTM!The PubSub configuration field is correctly added to the Config struct, following the same pattern as the existing Cache configuration.
79-81
: LGTM!The PubSub struct definition is consistent with the existing Cache struct pattern and correctly configured with appropriate YAML and environment variable tags.
111-113
: LGTM!The default PubSub configuration with "memory://" URL is appropriate for development and testing, consistent with the Cache default.
internal/sms-gateway/pubsub/config.go (1)
3-7
: LGTM!The Config struct is simple and clear, appropriately defining the parameters needed to control the PubSub backend via URL and configure the buffer size.
internal/sms-gateway/app.go (2)
24-24
: LGTM!The import for the pubsub package is correctly added.
49-49
: LGTM!The pubsub.Module() is correctly added to the fx.Module initialization, following the same pattern as other infrastructure modules.
internal/config/module.go (1)
14-14
: LGTM!The import for the pubsub package is correctly added.
internal/sms-gateway/pubsub/module.go (1)
8-16
: LGTM!The Module function correctly wires the pubsub subsystem into fx, with appropriate logger decoration and constructor provision.
internal/sms-gateway/pubsub/pubsub.go (4)
11-13
: LGTM!The topicPrefix constant is appropriately defined to namespace Redis topics.
16-23
: LGTM!URL defaulting and parsing logic is correct, with appropriate error handling and wrapping.
25-26
: LGTM!Options are correctly built with the configured BufferSize.
28-39
: LGTM!The scheme-based backend selection is clean and extensible. The Redis client lifecycle is properly managed by the NewRedis implementation (assuming it handles cleanup when the PubSub is closed).
bc2f812
to
c96b2b5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
internal/config/module.go (1)
125-130
: Previous issue resolved; consider documenting BufferSize choice.The past review concern about using
cfg.Cache.URL
instead ofcfg.PubSub.URL
has been fixed - line 127 now correctly referencescfg.PubSub.URL
.The hardcoded
BufferSize: 128
could benefit from a brief comment explaining the rationale (e.g., "reasonable default for in-memory queuing") or extraction to a named constant if this value is significant.Apply this diff to add a clarifying comment:
fx.Provide(func(cfg Config) pubsub.Config { return pubsub.Config{ URL: cfg.PubSub.URL, - BufferSize: 128, + BufferSize: 128, // Default channel buffer size for pub/sub message queuing } }),
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
internal/config/config.go
(3 hunks)internal/config/module.go
(2 hunks)internal/sms-gateway/app.go
(2 hunks)internal/sms-gateway/pubsub/config.go
(1 hunks)internal/sms-gateway/pubsub/module.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- internal/sms-gateway/pubsub/config.go
- internal/sms-gateway/app.go
🧰 Additional context used
🧬 Code graph analysis (3)
internal/sms-gateway/pubsub/module.go (3)
internal/config/module.go (1)
Module
(22-131)internal/sms-gateway/app.go (1)
Module
(36-60)internal/sms-gateway/pubsub/pubsub.go (1)
New
(15-40)
internal/config/module.go (2)
internal/config/config.go (2)
Config
(10-19)PubSub
(79-81)internal/sms-gateway/pubsub/config.go (1)
Config
(4-7)
internal/sms-gateway/pubsub/pubsub.go (2)
internal/sms-gateway/pubsub/config.go (1)
Config
(4-7)pkg/pubsub/options.go (1)
WithBufferSize
(17-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Benchmark
- GitHub Check: E2E
- GitHub Check: Analyze (go)
🔇 Additional comments (2)
internal/config/config.go (1)
18-18
: LGTM! Configuration follows established patterns.The PubSub configuration is structured identically to the Cache configuration, with appropriate YAML and environment variable tags, and a sensible default of
memory://
for local development.Also applies to: 79-82, 111-113
internal/sms-gateway/pubsub/module.go (1)
8-16
: LGTM! FX module wiring is correct.The module follows the established pattern for FX modules in this codebase, with appropriate logger decoration and constructor provision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal/sms-gateway/pubsub/module.go (1)
18-24
: PubSub.Close() is not context-aware; keep the call or change the API — rename the parameter to avoid shadowingVerified: pkg/pubsub/pubsub.go declares Close() error, so OnStop's context cannot be forwarded to Close() as-is. Keep the current call (returns error correctly) or change the PubSub API to Close(context.Context) error if you need context-aware shutdown. Minor nit: rename the parameter
pubsub
(e.g.,ps
orpubsubSvc
) to avoid shadowing the package.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/sms-gateway/pubsub/module.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/sms-gateway/pubsub/module.go (4)
internal/sms-gateway/app.go (1)
Module
(36-60)internal/config/module.go (1)
Module
(22-131)internal/sms-gateway/pubsub/pubsub.go (1)
New
(15-40)internal/config/config.go (1)
PubSub
(79-81)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: E2E
- GitHub Check: Benchmark
- GitHub Check: Analyze (go)
🔇 Additional comments (2)
internal/sms-gateway/pubsub/module.go (2)
1-9
: LGTM!Package declaration and imports are clean and appropriate for an fx module with lifecycle management.
11-16
: LGTM!Module structure and logger decoration follow fx best practices. The named logger will provide clear context in log output.
c5c19f6
to
01f4557
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal/sms-gateway/pubsub/config.go (1)
5-6
: Consider adding field documentation and struct tags.The exported fields lack individual documentation. While the type-level comment is helpful, documenting each field improves maintainability and helps users understand the purpose and constraints of each field.
Additionally, if this Config is unmarshaled from YAML/JSON configuration files, consider adding appropriate struct tags.
Apply this diff to add field documentation:
// Config controls the PubSub backend via a URL (e.g., "memory://", "redis://..."). type Config struct { + // URL specifies the backend to use (e.g., "memory://", "redis://host:port"). + // Defaults to "memory://" if empty. URL string + // BufferSize sets the channel buffer size for the Pub/Sub implementation. BufferSize uint }If config unmarshaling is used, also add struct tags:
type Config struct { - URL string - BufferSize uint + URL string `yaml:"url" json:"url"` + BufferSize uint `yaml:"buffer_size" json:"buffer_size"` }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
internal/config/config.go
(3 hunks)internal/config/module.go
(2 hunks)internal/sms-gateway/app.go
(2 hunks)internal/sms-gateway/pubsub/config.go
(1 hunks)internal/sms-gateway/pubsub/module.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- internal/config/config.go
- internal/sms-gateway/pubsub/module.go
- internal/sms-gateway/pubsub/pubsub.go
- internal/config/module.go
- internal/sms-gateway/app.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: E2E
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: Lint
- GitHub Check: Analyze (go)
🔇 Additional comments (1)
internal/sms-gateway/pubsub/config.go (1)
6-6
: No action needed: BufferSize is initialized to 128 ininternal/config/module.go
and always passed viaWithBufferSize
, so it never remains zero at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/sms-gateway/modules/events/service.go (1)
97-142
: Defensive check for nil event in consumed messages.eventWrapper.Event can be nil after unmarshal; prevent panic and record a failure.
func (s *Service) processEvent(wrapper *eventWrapper) { + if wrapper.Event == nil { + s.metrics.IncrementFailed("unknown", DeliveryTypeUnknown, FailureReasonSerializationError) + s.logger.Error("Received event wrapper with nil event", zap.String("user_id", wrapper.UserID)) + return + }
♻️ Duplicate comments (1)
internal/sms-gateway/pubsub/pubsub.go (1)
34-38
: Ensure Redis client is closed by PubSub.Close() (connection leak risk).Confirm pkgpubsub.NewRedis’s Close() calls client.Close().
#!/bin/bash # Verify redis PubSub Close closes the underlying client rg -nP 'type\s+\w+\s+struct\s*\{[^}]*client[^\n]*\n[^}]*\}' --type=go -C3 rg -nP 'func\s*\(\s*\w+\s*\*\w+\s*\)\s*Close\(\)\s*error' --type=go -C3 rg -n 'client\.Close\(\)' --type=go -C3
🧹 Nitpick comments (6)
internal/sms-gateway/modules/events/metrics.go (1)
22-25
: Consider adding a distinct deserialization failure reason.Consumer-side JSON parse failures are different from producer-side serialization. Add FailureReasonDeserializationError for clearer metrics and use it in the Run loop on unmarshal errors.
internal/sms-gateway/pubsub/pubsub.go (3)
7-9
: Avoid same-name import shadowing for clarity.Alias the external pkg to reduce confusion within package pubsub.
-import ( - "fmt" - "net/url" - - "github.com/android-sms-gateway/core/redis" - "github.com/android-sms-gateway/server/pkg/pubsub" -) +import ( + "fmt" + "net/url" + + "github.com/android-sms-gateway/core/redis" + pkgpubsub "github.com/android-sms-gateway/server/pkg/pubsub" +) - type PubSub = pubsub.PubSub + type PubSub = pkgpubsub.PubSubAlso applies to: 15-15
27-29
: Only set buffer size when > 0 (avoid forcing zero/unbuffered).Let backends keep their default when config.BufferSize == 0.
-opts := []pubsub.Option{} -opts = append(opts, pubsub.WithBufferSize(config.BufferSize)) +opts := []pkgpubsub.Option{} +if config.BufferSize > 0 { + opts = append(opts, pkgpubsub.WithBufferSize(config.BufferSize)) +}
30-41
: Support rediss (TLS) scheme.Common Redis URLs use rediss://. Treat it like redis.
-switch u.Scheme { +switch u.Scheme { case "memory": - return pubsub.NewMemory(opts...), nil + return pkgpubsub.NewMemory(opts...), nil -case "redis": +case "redis", "rediss": client, err := redis.New(redis.Config{URL: config.URL}) if err != nil { return nil, fmt.Errorf("can't create redis client: %w", err) } - return pubsub.NewRedis(client, topicPrefix, opts...), nil + return pkgpubsub.NewRedis(client, topicPrefix, opts...), nilinternal/sms-gateway/modules/events/types.go (1)
9-18
: Unify field name with push/SSE types for consistency.Consider renaming EventType -> Type (json:"event") to match push/sse.Event and reduce mental mapping across modules. If changed, adjust callers accordingly.
internal/sms-gateway/modules/events/service.go (1)
104-105
: Avoid shadowing the imported package name.Rename local variable from devices to devs to keep readability.
- devices, err := s.deviceSvc.Select(wrapper.UserID, filters...) + devs, err := s.deviceSvc.Select(wrapper.UserID, filters...) @@ - if len(devices) == 0 { + if len(devs) == 0 { @@ - for _, device := range devices { + for _, device := range devs {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
internal/sms-gateway/modules/events/metrics.go
(1 hunks)internal/sms-gateway/modules/events/service.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)
internal/sms-gateway/pubsub/pubsub.go (3)
internal/config/config.go (2)
PubSub
(79-81)Config
(10-19)internal/sms-gateway/pubsub/config.go (1)
Config
(4-7)pkg/pubsub/options.go (1)
WithBufferSize
(17-21)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(15-15)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/events/metrics.go (6)
DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)DeliveryTypeSSE
(19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Benchmark
- GitHub Check: Analyze (go)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/sms-gateway/modules/events/module.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/sms-gateway/modules/events/module.go (1)
internal/sms-gateway/modules/events/service.go (3)
Service
(19-30)s
(59-69)deviceSvc
(13-24)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: E2E
- GitHub Check: Test
- GitHub Check: Analyze (go)
a348cb4
to
4e59e2f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/sms-gateway/modules/events/service.go (1)
101-146
: Validate wrapper.Event to prevent nil-pointer dereference.The
processEvent
function accesseswrapper.Event.EventType
andwrapper.Event.Data
(lines 124-125, 137-138) without checking ifwrapper.Event
is nil. If deserialization produces a wrapper with a nil Event field, this will cause a panic.Add validation at the start of processEvent:
func (s *Service) processEvent(wrapper *eventWrapper) { + if wrapper.Event == nil { + s.logger.Error("Event wrapper has nil event", zap.String("user_id", wrapper.UserID)) + s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) + return + } + // Load devices from database filters := []devices.SelectFilter{}pkg/cache/redis.go (1)
191-197
: Bug: HExpireAt missing field argument (compile/runtime error).HExpireAt requires the field; here it’s called without it. This will fail to compile or misbehave and leaves entries without TTL.
Apply this fix:
- if !options.validUntil.IsZero() { - if err := r.client.HExpireAt(ctx, r.key, options.validUntil).Err(); err != nil { - return fmt.Errorf("can't set cache item ttl: %w", err) - } - } + if !options.validUntil.IsZero() { + if err := r.client.HExpireAt(ctx, r.key, options.validUntil, key).Err(); err != nil { + return fmt.Errorf("can't set cache item ttl: %w", err) + } + }
🧹 Nitpick comments (2)
pkg/cache/redis.go (1)
175-197
: Consider atomic SetNX + TTL in one step.Today, HSetNX then HExpireAt leaves a small window where the field exists without TTL. If you need strict atomicity, wrap both in a Lua script.
I can provide a compact script that performs HSETNX and, only on insert, applies HEXPIREAT in one EVAL. Interested?
internal/sms-gateway/cache/factory.go (1)
34-54
: Supportrediss://
URLs.Switch only accepts "redis". Consider handling "rediss" (TLS) the same way.
Example:
- switch u.Scheme { - case "memory": + switch u.Scheme { + case "memory": ... - case "redis": + case "redis", "rediss": ...
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
internal/sms-gateway/cache/factory.go
(1 hunks)internal/sms-gateway/modules/events/metrics.go
(1 hunks)internal/sms-gateway/modules/events/module.go
(1 hunks)internal/sms-gateway/modules/events/service.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)internal/sms-gateway/modules/messages/service.go
(2 hunks)internal/sms-gateway/modules/settings/service.go
(1 hunks)internal/sms-gateway/modules/webhooks/service.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)pkg/cache/redis.go
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- internal/sms-gateway/modules/messages/service.go
- internal/sms-gateway/modules/webhooks/service.go
- internal/sms-gateway/modules/settings/service.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-10T23:53:42.006Z
Learnt from: capcom6
PR: android-sms-gateway/server#178
File: pkg/cache/redis.go:63-75
Timestamp: 2025-09-10T23:53:42.006Z
Learning: In pkg/cache/redis.go, the Redis cache implementation uses Redis's native field-level TTL via HExpireAt, meaning expired fields are automatically removed by Redis itself. This differs from the memory implementation where the application tracks expiration times. As a result, operations like Drain that call HGetAll will only return non-expired items since expired fields are already removed by Redis.
Applied to files:
pkg/cache/redis.go
🧬 Code graph analysis (7)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/events/metrics.go (7)
DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)EventTypeUnknown
(26-26)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)DeliveryTypeSSE
(19-19)
internal/sms-gateway/modules/events/metrics.go (1)
internal/sms-gateway/modules/sse/metrics.go (1)
m
(85-87)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
NewRedis
(61-86)RedisConfig
(37-51)
pkg/cache/redis.go (2)
internal/config/config.go (1)
Cache
(75-77)pkg/cache/cache.go (1)
Cache
(5-36)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
Service
(19-30)internal/sms-gateway/app.go (1)
Run
(62-73)
internal/sms-gateway/pubsub/pubsub.go (4)
internal/config/config.go (2)
PubSub
(79-81)Config
(10-19)internal/sms-gateway/pubsub/config.go (1)
Config
(4-7)pkg/pubsub/options.go (1)
WithBufferSize
(17-21)pkg/cache/redis.go (1)
RedisConfig
(37-51)
🪛 GitHub Check: Lint
internal/sms-gateway/pubsub/pubsub.go
[failure] 37-37:
not enough arguments in call to pubsub.NewRedis
[failure] 33-33:
undefined: pubsub.RedisConfig
[failure] 33-33:
not enough return values
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Test
- GitHub Check: Benchmark
- GitHub Check: E2E
- GitHub Check: Analyze (go)
🔇 Additional comments (7)
internal/sms-gateway/modules/events/module.go (1)
17-28
: LGTM! Well-structured error handling.The updated fx.Invoke properly injects logger and shutdowner dependencies, and the goroutine correctly handles service errors by logging and triggering graceful shutdown with an exit code.
internal/sms-gateway/modules/events/metrics.go (1)
22-26
: LGTM! Constants align with pub/sub integration.The new failure reason constants (serialization_error, publish_error) and EventTypeUnknown appropriately categorize failures in the pub/sub-based event flow, while removing the now-unused FailureReasonQueueFull.
internal/sms-gateway/modules/events/types.go (2)
10-11
: LGTM! Public fields enable JSON serialization.The Event struct now has public fields with appropriate JSON tags, supporting the pub/sub payload serialization requirements.
27-33
: LGTM! Standard serialization methods.The serialize/deserialize methods correctly use
json.Marshal
andjson.Unmarshal
for converting eventWrapper to/from JSON bytes.internal/sms-gateway/modules/events/service.go (1)
79-99
: LGTM! Subscription loop handles channel closure correctly.The receive loop properly checks the
ok
value from the channel receive operation (line 85) and gracefully returns when the subscription channel closes (lines 86-89). Deserialization errors are correctly logged and counted without exposing PII.pkg/cache/redis.go (2)
103-122
: Drain script LGTM; atomic and returns only non‑expired fields.EVAL with HGETALL + UNLINK/DEL is atomic and, with field TTLs, returns only live entries. Good alignment with Redis field‑TTL semantics.
Based on learnings
160-166
: Pipelined HSET + HExpireAt is correct.Efficient and reduces round trips; TTL set only when configured. Looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
pkg/pubsub/redis.go (1)
45-56
: Document Client/URL precedence behavior.When both
Client
andURL
are provided inRedisConfig
, theClient
takes precedence andURL
is silently ignored (lines 49-56 only execute whenClient == nil
). This precedence should be documented in theRedisConfig
struct orNewRedis
function comments to avoid confusion.Consider adding a comment to the
RedisConfig
struct:type RedisConfig struct { // Client is the Redis client to use. - // If nil, a client is created from the URL. + // If nil, a client is created from the URL. + // If both Client and URL are provided, Client takes precedence. Client *redis.Client
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/pubsub/redis.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
Option
(3-3)pkg/pubsub/pubsub.go (5)
ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)Message
(13-16)PubSub
(45-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: E2E
- GitHub Check: Analyze (go)
🔇 Additional comments (2)
pkg/pubsub/redis.go (2)
99-104
: Resource leak fixed - LGTM!The subscription error path now properly closes the Redis subscription before returning, addressing the resource leak flagged in previous reviews. The use of
errors.Join
to combine both errors is appropriate.
117-126
: Cleanup logic is well-structured.The deferred cleanup properly closes the Redis subscription, closes the channel, removes the subscriber from the registry, and signals completion via
wg.Done()
. Ignoring the error fromps.Close()
is acceptable in the cleanup path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
pkg/pubsub/redis.go (2)
75-87
: Document non-blocking Publish behavior.This is a previously identified issue that remains unaddressed. Redis
Publish
is fire-and-forget (returns immediately after sending to Redis), while the memory implementation blocks until all subscribers receive. Add a comment documenting this behavioral difference.
160-171
: Fix resource leak and align Close behavior with memory implementation.Two issues:
Resource leak (new issue): When
NewRedis
creates a Redis client from URL (line 56), that client is never closed. This leaks connections and file descriptors.Inconsistent behavior (duplicate from past review): Returns
nil
on repeated calls, butmemory.go
returnsErrPubSubClosed
. This behavioral inconsistency across implementations could surprise users.To fix the resource leak, track client ownership:
type redisPubSub struct { prefix string bufferSize uint client *redis.Client + ownsClient bool wg sync.WaitGroup mu sync.Mutex subscribers map[string]context.CancelFunc closeCh chan struct{} }
In
NewRedis
, set ownership:if config.Client == nil { opt, err := redis.ParseURL(config.URL) if err != nil { return nil, fmt.Errorf("failed to parse redis url: %w", err) } config.Client = redis.NewClient(opt) + return &redisPubSub{ + // ... other fields ... + client: config.Client, + ownsClient: true, + }, nil } return &redisPubSub{ - prefix: config.Prefix, - bufferSize: o.bufferSize, - - client: config.Client, - - subscribers: make(map[string]context.CancelFunc), - closeCh: make(chan struct{}), + // ... other fields ... + client: config.Client, + ownsClient: false, }, nilIn
Close
, close the client if owned:func (r *redisPubSub) Close() error { select { case <-r.closeCh: - return nil + return ErrPubSubClosed default: close(r.closeCh) } r.wg.Wait() + if r.ownsClient { + return r.client.Close() + } + return nil }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
internal/sms-gateway/cache/factory.go
(1 hunks)internal/sms-gateway/modules/events/service.go
(2 hunks)pkg/pubsub/redis.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/sms-gateway/cache/factory.go
🧰 Additional context used
🧬 Code graph analysis (2)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
Option
(3-3)pkg/pubsub/pubsub.go (5)
ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)Message
(13-16)PubSub
(45-50)
internal/sms-gateway/modules/events/service.go (3)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/events/metrics.go (7)
EventTypeUnknown
(26-26)DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)DeliveryTypeSSE
(19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: E2E
- GitHub Check: Analyze (go)
🔇 Additional comments (6)
internal/sms-gateway/modules/events/service.go (4)
15-17
: LGTM: Good practice to use a constant for the pub/sub topic.The constant makes the topic name maintainable and follows DRY principles.
32-44
: LGTM: Clean dependency injection.The pub/sub dependency is properly injected through the constructor and stored in the Service struct.
77-104
: LGTM: Proper subscription lifecycle management.The method correctly:
- Checks for subscription errors and returns early
- Defers subscription closure
- Handles the closed channel case with the
ok
check- Avoids PII leakage by not logging the full message payload
- Returns cleanly on context cancellation or channel close
This addresses the previous review comments about channel handling and PII logging.
106-151
: LGTM: Proper event processing with metrics.The method correctly:
- Handles device selection with appropriate filters
- Implements fallback from push to SSE for devices without push tokens
- Increments metrics for both success and failure cases on each delivery path
- Logs errors with relevant context (user_id, device_id) without exposing sensitive event data
pkg/pubsub/redis.go (2)
100-105
: LGTM! Subscription confirmation correctly implemented.The code now properly waits for the Redis subscription acknowledgment using
ps.Receive(ctx)
before returning the subscription. The error handling on lines 103-104 correctly closes the subscription and combines both errors usingerrors.Join
, which is an improvement over simply ignoring the close error.
117-155
: LGTM! Goroutine correctly handles cleanup and prevents deadlock.The message forwarding goroutine is well-structured:
- Proper cleanup in defer (closes subscription, channel, removes from map)
- Responds to close signals (
r.closeCh
andsubCtx.Done()
)- The nested select (lines 143-152) prevents deadlock by checking close signals while attempting to send messages
This addresses the previously identified deadlock concern.
af41637
to
2038311
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/cache/redis.go (1)
194-198
: Fix TTL handling inSetOrFail
.When a TTL is supplied, this path calls
HExpireAt
without the hash field argument, so the command targets the whole hash (or fails with a Redis error). That breaks per-entry TTL semantics forSetOrFail
, unlike theSet
path which passes the field. Add the field argument here as well.Apply this diff:
- if err := r.client.HExpireAt(ctx, r.key, options.validUntil).Err(); err != nil { + if err := r.client.HExpireAt(ctx, r.key, options.validUntil, key).Err(); err != nil {
♻️ Duplicate comments (2)
internal/sms-gateway/modules/events/service.go (2)
47-49
: Add metric increment for validation failure.The validation check for empty
EventType
is good, but it should increment a failure metric to maintain observability consistency with other error paths in this function.Apply this diff to add the metric increment:
if event.EventType == "" { + s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("event type is empty") }
Note: Past review comments suggest using
FailureReasonValidationError
instead ofFailureReasonSerializationError
for better semantic clarity, but that would require adding the new constant first.
51-52
: Accept parent context instead of using Background().Using
context.Background()
ignores caller cancellation or timeout signals, potentially causing operations to hang for 5 seconds even when the caller has been cancelled. This can lead to resource leaks and degraded responsiveness during shutdown.Add a
context.Context
parameter toNotify
and use it as the parent context:-func (s *Service) Notify(userID string, deviceID *string, event Event) error { +func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error { if event.EventType == "" { s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("event type is empty") } - subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() wrapper := eventWrapper{ UserID: userID, DeviceID: deviceID, Event: &event, } wrapperBytes, err := wrapper.serialize() if err != nil { s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("can't serialize event wrapper: %w", err) } - if err := s.pubsub.Publish(subCtx, pubsubTopic, wrapperBytes); err != nil { + if err := s.pubsub.Publish(pubCtx, pubsubTopic, wrapperBytes); err != nil { s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonPublishError) return fmt.Errorf("can't publish event: %w", err) } s.metrics.IncrementEnqueued(string(event.EventType)) return nil }Then update all call sites (e.g., in
internal/sms-gateway/modules/messages/service.go
lines 233 and 244) to pass a context.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
internal/config/config.go
(3 hunks)internal/config/module.go
(2 hunks)internal/sms-gateway/app.go
(2 hunks)internal/sms-gateway/cache/factory.go
(1 hunks)internal/sms-gateway/modules/events/metrics.go
(1 hunks)internal/sms-gateway/modules/events/module.go
(1 hunks)internal/sms-gateway/modules/events/service.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)internal/sms-gateway/modules/messages/service.go
(2 hunks)internal/sms-gateway/modules/settings/service.go
(1 hunks)internal/sms-gateway/modules/webhooks/service.go
(1 hunks)internal/sms-gateway/pubsub/config.go
(1 hunks)internal/sms-gateway/pubsub/module.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)pkg/cache/cache.go
(1 hunks)pkg/cache/memory.go
(2 hunks)pkg/cache/redis.go
(2 hunks)pkg/pubsub/redis.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (9)
- internal/config/module.go
- internal/sms-gateway/modules/settings/service.go
- pkg/cache/memory.go
- internal/sms-gateway/app.go
- internal/sms-gateway/pubsub/module.go
- internal/config/config.go
- internal/sms-gateway/pubsub/pubsub.go
- internal/sms-gateway/modules/events/metrics.go
- pkg/cache/cache.go
🧰 Additional context used
🧬 Code graph analysis (8)
pkg/pubsub/redis.go (3)
pkg/pubsub/options.go (1)
Option
(3-3)pkg/pubsub/pubsub.go (5)
ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)Message
(13-16)PubSub
(45-50)internal/config/config.go (1)
PubSub
(79-81)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)
internal/sms-gateway/modules/messages/service.go (1)
internal/sms-gateway/modules/events/events.go (1)
NewMessageEnqueuedEvent
(9-11)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
Service
(19-30)internal/sms-gateway/app.go (1)
Run
(62-73)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/events/metrics.go (7)
DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)EventTypeUnknown
(26-26)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)DeliveryTypeSSE
(19-19)
pkg/cache/redis.go (2)
pkg/cache/cache.go (1)
Cache
(5-40)internal/config/config.go (1)
Cache
(75-77)
internal/sms-gateway/modules/webhooks/service.go (1)
internal/sms-gateway/modules/events/events.go (1)
NewWebhooksUpdatedEvent
(13-15)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
NewRedis
(62-89)RedisConfig
(37-51)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: E2E
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: Analyze (go)
🔇 Additional comments (9)
internal/sms-gateway/modules/events/module.go (1)
17-36
: LGTM! Error handling and shutdown flow are now correct.The lifecycle hooks properly capture errors from
svc.Run
, log them, and trigger graceful shutdown viafx.Shutdowner
. The OnStop hook correctly cancels the context.internal/sms-gateway/modules/messages/service.go (2)
232-236
: LGTM! Correct adaptation to non-pointer Event parameter.The dereference of
events.NewMessageEnqueuedEvent()
aligns with the updatedNotify
signature that now acceptsEvent
by value. The goroutine safely capturesuserID
anddeviceID
by value and logs errors appropriately.
242-244
: LGTM! Correct adaptation to non-pointer Event parameter.The dereference of
events.NewMessagesExportRequestedEvent()
aligns with the updatedNotify
signature that now acceptsEvent
by value.internal/sms-gateway/modules/events/service.go (6)
15-17
: LGTM! Topic constant is well-defined.The
pubsubTopic
constant provides a centralized definition for the events topic used across Pub/Sub operations.
32-44
: LGTM! Constructor correctly wires Pub/Sub dependency.The
NewService
constructor now includes thepubsub.PubSub
parameter and properly initializes the service with all required dependencies.
54-64
: LGTM! Wrapper creation and serialization error handling are correct.The wrapper construction and serialization with proper error handling and metrics tracking look good.
66-74
: LGTM! Pub/Sub publishing with proper error handling.The publish operation correctly handles errors, tracks metrics, and returns descriptive error messages.
76-103
: LGTM! Run loop correctly handles subscription lifecycle and closed channels.The subscription creation, message processing loop, and closed-channel check (
msg, ok := <-ch
) are all correctly implemented. Deserialization failures properly increment metrics and continue processing.
105-150
: LGTM! Event processing logic is sound.The
processEvent
function correctly dereferenceswrapper.Event
fields, routes to push or SSE services based on device configuration, and tracks metrics for both success and failure paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
internal/sms-gateway/modules/events/types.go (2)
9-12
: Extract a shared Event struct
TheEvent
type is defined three times (inmodules/events/types.go
,modules/sse/types.go
, andmodules/push/types/types.go
with an alias inpush/types.go
), varying only by field names/tags. Consolidate into one common definition—or document why they must differ—to eliminate duplication.
9-12
: Unify JSON tag for Event.Type across modules.
internal/sms-gateway/modules/events/types.go
usesjson:"event_type"
, butinternal/sms-gateway/modules/sse/types.go
usesjson:"event"
. Align these tags (e.g. bothevent_type
or bothevent
) to avoid serialization mismatches.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sum
is excluded by!**/*.sum
📒 Files selected for processing (3)
go.mod
(0 hunks)internal/sms-gateway/modules/events/events.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)
💤 Files with no reviewable changes (1)
- go.mod
🧰 Additional context used
🧬 Code graph analysis (2)
internal/sms-gateway/modules/events/events.go (4)
internal/sms-gateway/modules/events/types.go (2)
Event
(9-12)NewEvent
(14-19)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: Lint
- GitHub Check: Analyze (go)
🔇 Additional comments (5)
internal/sms-gateway/modules/events/events.go (1)
9-29
: LGTM! Return type changes align with the Event value type.The constructor functions now correctly return
Event
by value instead of*Event
, consistent with the updated Event type definition intypes.go
. The logic remains unchanged and correct.internal/sms-gateway/modules/events/types.go (4)
4-5
: LGTM! Import needed for serialization.The
encoding/json
import is correctly added to support the newserialize()
anddeserialize()
methods.
14-19
: LGTM! NewEvent correctly returns Event by value.The function now returns
Event
by value with exported fields properly initialized, aligning with the struct changes.
27-33
: LGTM! Serialization methods are correctly implemented.The
serialize()
anddeserialize()
methods provide straightforward JSON marshaling/unmarshaling functionality for pub/sub transmission.
21-25
: Event pointer usage is correct and consistent with JSON serialization requirements.The
Event
field pointer usage is appropriate here. The eventWrapper is exclusively used for JSON serialization/deserialization via pub/sub (lines 27-32 in types.go), where theomitempty
tag requires a pointer type. The code consistently creates the wrapper with&event
(line 54-58 in service.go), and deserialization properly uses pointer receivers (lines 94-95 in service.go). No inconsistency issues found.
7257958
to
5e5ee38
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
internal/sms-gateway/modules/events/service.go (2)
47-49
: Add metrics increment for validation failure.The validation check correctly rejects empty event types, but it doesn't increment failure metrics as requested in the past review. This makes validation failures invisible in observability.
Add the metrics call (using the validation error constant from
metrics.go
):if event.EventType == "" { + s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonValidationError) return fmt.Errorf("event type is empty") }
51-52
: Accept parent context to respect caller cancellation.Using
context.Background()
ignores caller cancellation/timeout, potentially causing operations to hang for 5 seconds even when the caller has been cancelled. This can lead to resource leaks and degraded responsiveness during shutdown.Update the signature to accept a context parameter:
-func (s *Service) Notify(userID string, deviceID *string, event Event) error { +func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error { if event.EventType == "" { s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonValidationError) return fmt.Errorf("event type is empty") } - subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel()Then update all callers to pass an appropriate context.
🧹 Nitpick comments (1)
pkg/cache/cache.go (1)
37-39
: Document possible Close() errors in Cache interface. Update theClose()
comment in pkg/cache/cache.go (lines 37–39) to describe when it can return errors (e.g. underlying Redis client.Close failures during connection shutdown).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sum
is excluded by!**/*.sum
📒 Files selected for processing (11)
go.mod
(0 hunks)internal/sms-gateway/cache/factory.go
(1 hunks)internal/sms-gateway/modules/events/events.go
(2 hunks)internal/sms-gateway/modules/events/metrics.go
(1 hunks)internal/sms-gateway/modules/events/module.go
(1 hunks)internal/sms-gateway/modules/events/service.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)pkg/cache/cache.go
(1 hunks)pkg/cache/memory.go
(2 hunks)pkg/cache/redis.go
(2 hunks)
💤 Files with no reviewable changes (1)
- go.mod
🚧 Files skipped from review as they are similar to previous changes (3)
- internal/sms-gateway/pubsub/pubsub.go
- internal/sms-gateway/cache/factory.go
- pkg/cache/redis.go
🧰 Additional context used
🧬 Code graph analysis (5)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
Service
(19-30)internal/sms-gateway/app.go (1)
Run
(62-73)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)
internal/sms-gateway/modules/events/events.go (4)
internal/sms-gateway/modules/events/types.go (2)
Event
(9-12)NewEvent
(14-19)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)
pkg/cache/memory.go (2)
internal/sms-gateway/cache/factory.go (1)
Cache
(14-14)pkg/cache/cache.go (1)
Cache
(5-40)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/events/metrics.go (7)
DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)EventTypeUnknown
(26-26)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)DeliveryTypeSSE
(19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Test
- GitHub Check: Benchmark
- GitHub Check: E2E
- GitHub Check: Analyze (go)
🔇 Additional comments (8)
pkg/cache/memory.go (2)
172-176
: LGTM: Close() implementation and compile-time assertion.The
Close()
implementation correctly returnsnil
since an in-memory cache has no external resources to clean up. The compile-time assertionvar _ Cache = (*memoryCache)(nil)
is good practice and ensuresmemoryCache
properly implements theCache
interface.
16-16
: Returning memoryCache remains a valid, unexported implementation of Cache
*memoryCache
still implements theCache
interface, callers can assign it toCache
without issue, and internal fields stay unexported.Likely an incorrect or invalid review comment.
internal/sms-gateway/modules/events/module.go (1)
17-28
: LGTM! Error handling properly implemented.The lifecycle hook correctly handles errors from
svc.Run
, logs them, and initiates graceful shutdown with a non-zero exit code. The nested error handling for shutdown failures is also appropriate.internal/sms-gateway/modules/events/events.go (1)
9-29
: LGTM! Consistent value semantics.The constructors now return
Event
by value, which is appropriate for small structs and aligns with the updatedNewEvent
signature intypes.go
. This eliminates unnecessary pointer allocations.internal/sms-gateway/modules/events/types.go (1)
4-19
: LGTM! JSON serialization properly configured.The
Event
struct now has exported fields with appropriate JSON tags, andNewEvent
correctly returns a value type. The serialization changes align well with the Pub/Sub integration requirements.internal/sms-gateway/modules/events/service.go (3)
32-44
: LGTM! Constructor properly updated.The
NewService
constructor now accepts thepubsub.PubSub
dependency, which is correctly stored in the service struct for use byNotify
andRun
methods.
60-71
: LGTM! Serialization and publish error handling is robust.The code properly:
- Serializes the event wrapper with error handling
- Increments appropriate metrics for serialization failures
- Publishes to Pub/Sub with error handling
- Increments appropriate metrics for publish failures
- Increments enqueued metrics on success
- Wraps errors with context using
fmt.Errorf
with%w
76-103
: LGTM! Subscription lifecycle properly managed.The
Run
method correctly:
- Subscribes to the Pub/Sub topic
- Defers subscription cleanup
- Handles context cancellation
- Checks for closed channels to avoid tight loops
- Increments metrics on deserialization failures
- Logs errors without PII
Note: Validation for nil
wrapper.Event
should be added as mentioned in the review oftypes.go
(lines 21-33) to prevent panics inprocessEvent
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
internal/sms-gateway/modules/events/service.go (1)
51-52
: Accept parent context instead of using Background().Using
context.Background()
ignores any caller's cancellation or timeout. This issue was previously flagged but remains unaddressed.-func (s *Service) Notify(userID string, deviceID *string, event Event) error { +func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error { if event.EventType == "" { return fmt.Errorf("event type is empty") } - subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel()
🧹 Nitpick comments (1)
internal/sms-gateway/modules/events/service.go (1)
47-49
: Consider incrementing metrics for validation failures.The validation error here doesn't increment failed metrics, while serialization (line 62) and publish errors (line 67) do. For consistency and observability, consider adding a metrics call similar to other error paths.
if event.EventType == "" { + s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonValidationError) return fmt.Errorf("event type is empty") }
Note: You would need to add the
FailureReasonValidationError
constant tometrics.go
if not already present.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal/sms-gateway/modules/events/service.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/sms-gateway/modules/events/types.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/sms-gateway/modules/events/service.go (6)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/events/metrics.go (7)
DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)EventTypeUnknown
(26-26)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)DeliveryTypeSSE
(19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: E2E
- GitHub Check: Test
- GitHub Check: Analyze (go)
🔇 Additional comments (2)
internal/sms-gateway/modules/events/service.go (2)
76-103
: LGTM! Pub/Sub integration properly implemented.The Run function correctly:
- Subscribes to the pubsub topic with proper error handling
- Handles channel close with the
ok
flag (line 89-93)- Avoids logging PII by only logging the error (line 97)
- Processes events through the wrapper pattern
The subscription lifecycle and error handling are well-implemented.
54-58
: No issues found. TheEvent
is stored by value ineventWrapper
(line 24 in types.go), which is intentional for immutability guarantees. The struct is small (~24 bytes: enum + map reference), so copying cost is negligible. JSON serialization handles value semantics correctly. This design choice is consistent with the serialize/deserialize implementation and prevents unintended modifications to events after wrapping.
2473277
to
adcbf48
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/cache/redis.go (1)
156-201
: <!-- [scratchpad]
[task final_analysis]
[observations]
- go-redis v9.9.0 is deployed; HExpireAt fully supported since v9.6.0
- HExpireAt calls in lines 166 and 195 are VALID Redis 7.4+ commands
- Lua scripts don't check expiry; rely on Redis auto-removal of expired fields
- Get() returns ErrKeyNotFound for both missing and expired keys (indistinguishable)
- GetAndDelete() Lua script returns false (mapped to ErrKeyNotFound) when field absent or expired
- Interface contract (cache.go:15) requires ErrKeyExpired to be returned for expired keys
[analysis]- Review comment's PRIMARY claim (HExpireAt doesn't exist) is INCORRECT
- Review comment's SECONDARY claim (ErrKeyExpired not returned) is VALID but for different reason
- With Redis 7.4 auto-expiry of hash fields, expired fields are auto-deleted by Redis
- This makes HGet return Nil for both expired and missing keys (semantically indistinguishable at API level)
- Current implementation violates interface contract which requires distinguishing expiry from non-existence
- The implementation relies on transparent auto-expiry but the interface doesn't support this model
[pending]- None; sufficient data to rewrite comment
[done]- Verified HExpireAt exists and is properly used
- Identified true semantic issue: interface contract violation for ErrKeyExpired distinction
Fix missing ErrKeyExpired distinction in Get() and GetAndDelete()—interface contract requires distinguishing expired from non-existent keys.
With Redis 7.4 auto-expiring hash fields, expired entries are transparently removed by Redis before retrieval. This makes HGet return Nil for both expired and missing keys, but the Cache interface contract (cache.go:15) requires returning ErrKeyExpired when a key has expired. Currently both Get() (line 128) and GetAndDelete() (line 142) return ErrKeyNotFound in both cases, violating the interface.
Solutions:
- A) Abandon per-field TTL; store each entry as a separate key with SET/GET/DEL+EX/EXAT for whole-key expiry, making expiration visible to the client.
- B) Keep the hash + add a sidecar sorted set (e.g., key + ":exp") tracking expiry times. In Get/GetAndDelete, use Lua to check if ZSCORE shows expiry < now; return ErrKeyExpired and clean up if so. Adjust Cleanup() and Drain() to also manage the sidecar.
- C) Clarify the interface: if auto-expiry is transparent, document that Get/GetAndDelete return ErrKeyNotFound for both missing and expired keys, and adjust memory.go to match (currently it returns ErrKeyExpired).
Note: HExpireAt (lines 166, 195) is correctly implemented—go-redis v9.9.0 fully supports this command.
♻️ Duplicate comments (2)
internal/sms-gateway/modules/events/metrics.go (1)
22-26
: Add validation-specific failure reason constant.The validation check in
service.go
(lines 47-49) should increment metrics with a distinct validation failure reason instead of lumping validation failures with serialization errors in metrics analysis.Apply this diff to add the constant:
FailureReasonSerializationError = "serialization_error" FailureReasonPublishError = "publish_error" + FailureReasonValidationError = "validation_error" FailureReasonProviderFailed = "provider_failed"
Then update
service.go
line 48 to increment metrics:if event.EventType == "" { s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonValidationError) return fmt.Errorf("event type is empty") }internal/sms-gateway/modules/events/service.go (1)
51-52
: Accept parent context to respect caller cancellation.Using
context.Background()
ignores any caller's cancellation or timeout, potentially causing operations to hang for 5 seconds even if the caller has been cancelled. This can lead to resource leaks and degraded responsiveness during shutdown or request cancellation.Add a
context.Context
parameter toNotify
:-func (s *Service) Notify(userID string, deviceID *string, event Event) error { +func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error { if event.EventType == "" { return fmt.Errorf("event type is empty") } - subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() wrapper := eventWrapper{ UserID: userID, DeviceID: deviceID, Event: event, } wrapperBytes, err := wrapper.serialize() if err != nil { s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("can't serialize event wrapper: %w", err) } - if err := s.pubsub.Publish(subCtx, pubsubTopic, wrapperBytes); err != nil { + if err := s.pubsub.Publish(pubCtx, pubsubTopic, wrapperBytes); err != nil { s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonPublishError) return fmt.Errorf("can't publish event: %w", err) } s.metrics.IncrementEnqueued(string(event.EventType)) return nil }
🧹 Nitpick comments (4)
pkg/cache/redis.go (3)
62-89
: Constructor refactor is solid; consider returning the interface and fail-fast ping.Validation, URL parsing, prefix normalization, and ownership look good. Prefer returning Cache to avoid leaking the concrete type, and optionally Ping to fail fast on bad URLs.
Apply:
-func NewRedis(config RedisConfig) (*redisCache, error) { +func NewRedis(config RedisConfig) (Cache, error) { ... - return &redisCache{ + return &redisCache{ client: client, ownedClient: config.Client == nil, key: config.Prefix + redisCacheKey, ttl: config.TTL, }, nil }Optionally, after creating the client:
// _ := client.Ping(context.Background()).Err() // return wrapped error if non-nil
36-51
: Clarify TTL semantics in the config.Document that TTL=0 disables per-entry expiry, and how overrides via options interact with TTL. This helps set expectations across memory and Redis backends.
203-209
: Make Close idempotent and concurrency-safe.Multiple/concurrent Close calls can return errors from the underlying client. Guard with sync.Once or atomically nil out the client after the first close.
Example:
+import "sync" type redisCache struct { client *redis.Client ownedClient bool + closeOnce sync.Once key string ttl time.Duration } func (r *redisCache) Close() error { - if r.ownedClient { - return r.client.Close() - } - return nil + var err error + if r.ownedClient && r.client != nil { + r.closeOnce.Do(func() { err = r.client.Close(); r.client = nil }) + } + return err }pkg/pubsub/options.go (1)
9-15
: *Simplify: apply doesn’t need to return options.Return value is unused; remove it to reduce noise.
-func (o *options) apply(opts ...Option) *options { +func (o *options) apply(opts ...Option) { for _, opt := range opts { opt(o) } - - return o }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sum
is excluded by!**/*.sum
📒 Files selected for processing (20)
go.mod
(0 hunks)internal/config/config.go
(3 hunks)internal/config/module.go
(2 hunks)internal/sms-gateway/app.go
(2 hunks)internal/sms-gateway/cache/factory.go
(1 hunks)internal/sms-gateway/modules/events/events.go
(2 hunks)internal/sms-gateway/modules/events/metrics.go
(1 hunks)internal/sms-gateway/modules/events/module.go
(1 hunks)internal/sms-gateway/modules/events/service.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)internal/sms-gateway/pubsub/config.go
(1 hunks)internal/sms-gateway/pubsub/module.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)pkg/cache/cache.go
(1 hunks)pkg/cache/memory.go
(2 hunks)pkg/cache/redis.go
(2 hunks)pkg/pubsub/memory.go
(1 hunks)pkg/pubsub/options.go
(1 hunks)pkg/pubsub/pubsub.go
(1 hunks)pkg/pubsub/redis.go
(1 hunks)
💤 Files with no reviewable changes (1)
- go.mod
🚧 Files skipped from review as they are similar to previous changes (12)
- internal/sms-gateway/pubsub/config.go
- internal/config/module.go
- internal/sms-gateway/pubsub/module.go
- pkg/cache/cache.go
- internal/config/config.go
- pkg/pubsub/redis.go
- pkg/pubsub/pubsub.go
- internal/sms-gateway/modules/events/module.go
- internal/sms-gateway/modules/events/types.go
- internal/sms-gateway/pubsub/pubsub.go
- internal/sms-gateway/cache/factory.go
- pkg/cache/memory.go
🧰 Additional context used
🧬 Code graph analysis (5)
internal/sms-gateway/modules/events/service.go (3)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/events/metrics.go (7)
DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)EventTypeUnknown
(26-26)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)DeliveryTypeSSE
(19-19)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
Message
(13-16)ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)PubSub
(45-50)pkg/pubsub/options.go (1)
Option
(3-3)
internal/sms-gateway/modules/events/events.go (3)
internal/sms-gateway/modules/events/types.go (2)
Event
(9-12)NewEvent
(14-19)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)
internal/sms-gateway/app.go (2)
internal/config/module.go (1)
Module
(22-131)internal/sms-gateway/pubsub/module.go (1)
Module
(11-26)
pkg/cache/redis.go (2)
pkg/cache/cache.go (1)
Cache
(5-40)internal/config/config.go (1)
Cache
(75-77)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: E2E
- GitHub Check: Analyze (go)
🔇 Additional comments (8)
internal/sms-gateway/modules/events/service.go (2)
76-103
: LGTM! Pub/Sub integration is well-implemented.The
Run
function correctly:
- Subscribes to the Pub/Sub topic with proper error handling
- Handles channel closure (lines 89-93)
- Increments deserialization failure metrics with safe labels (line 96)
- Avoids logging sensitive payload data (line 97)
The past review concerns about channel handling and PII logging have been addressed.
105-150
: LGTM! Event processing is correctly instrumented.The
processEvent
function properly:
- Handles both push and SSE delivery paths
- Increments success and failure metrics with appropriate labels
- Uses the wrapper's event type and data consistently
The implementation is clean and well-structured.
internal/sms-gateway/modules/events/events.go (1)
9-29
: LGTM! Value semantics for Event is appropriate.The change from pointer to value return type aligns with the
Notify()
method signature ininternal/sms-gateway/modules/events/service.go:46
, which expectsevent Event
(value type). All call sites acrosssettings/service.go
,messages/service.go
, andwebhooks/service.go
correctly pass Event values directly toNotify()
with no incompatibilities.pkg/cache/redis.go (1)
211-211
: Nice interface assertion.Compile-time check is helpful.
internal/sms-gateway/app.go (1)
24-24
: LGTM: pubsub module wired into app.Import and Module() registration order looks fine relative to config and events.
Also applies to: 49-49
pkg/pubsub/options.go (1)
5-7
: ****The claim that "make(chan T, n) requires n to be int" is incorrect. Go's
make()
built-in accepts any integer type for the buffer size parameter, includinguint
. The code as written will compile without errors. Bothpkg/pubsub/memory.go:91
andpkg/pubsub/redis.go:112
successfully passuint
values tomake(chan Message, ...)
.If there is a real type mismatch issue (e.g.,
config.BufferSize
isint
butWithBufferSize
expectsuint
), that is a separate concern and should be addressed at the call site, not by changing the buffer size type throughout the codebase.Likely an incorrect or invalid review comment.
pkg/pubsub/memory.go (2)
112-122
: Update subscribe to new subscriber type.-func (m *memoryPubSub) subscribe(id, topic string, ch chan Message) { +func (m *memoryPubSub) subscribe(id, topic string, sub subscriber) { m.mu.Lock() defer m.mu.Unlock() subscriptions, ok := m.topics[topic] if !ok { - subscriptions = make(map[string]chan Message) + subscriptions = make(map[string]subscriber) m.topics[topic] = subscriptions } - subscriptions[id] = ch + subscriptions[id] = sub }Likely an incorrect or invalid review comment.
58-71
: Skip canceled subscribers and guard mid-flight cancel in Publish.Pre-check sub.ctx.Done to avoid blocking, and include it in the send select.
- for _, ch := range subscribers { - wg.Add(1) - go func(ch chan Message) { + for _, sub := range subscribers { + // Skip already-canceled subscribers. + select { + case <-sub.ctx.Done(): + continue + default: + } + wg.Add(1) + go func(sub subscriber) { defer wg.Done() select { - case ch <- msg: + case sub.ch <- msg: case <-ctx.Done(): return case <-m.closeCh: return + case <-sub.ctx.Done(): + // Subscriber canceled mid-flight; drop. + return } - }(ch) + }(sub) }Likely an incorrect or invalid review comment.
adcbf48
to
fb29abc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
pkg/pubsub/redis.go (1)
78-90
: Document Publish semantics (fire-and-forget vs memory).Add a note that Redis Publish returns when Redis accepts the message; it does not wait for subscribers to receive it (unlike memory, which blocks until delivery). Helps callers set expectations.
func (r *redisPubSub) Publish(ctx context.Context, topic string, data []byte) error { + // Note: Redis publish is fire-and-forget. It returns once Redis accepts the + // message and does not wait for subscribers to receive it. This differs from + // the in-memory implementation which blocks until delivery (or context/close). select {internal/sms-gateway/modules/events/metrics.go (1)
22-26
: Add a validation-specific failure reason.Introduce a dedicated constant to distinguish validation failures from serialization errors in metrics.
FailureReasonSerializationError = "serialization_error" FailureReasonPublishError = "publish_error" FailureReasonProviderFailed = "provider_failed" + FailureReasonValidationError = "validation_error"
internal/sms-gateway/modules/events/service.go (2)
47-49
: Increment failure metrics on validation error.The validation check on line 47 returns an error but does not increment failure metrics, making it difficult to track validation failures in monitoring. Past review comments flagged this gap.
Apply this diff to increment metrics before returning:
if event.EventType == "" { + s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("event type is empty") }
Note: If a
FailureReasonValidationError
constant is available inmetrics.go
, prefer that overFailureReasonSerializationError
for clearer failure classification.
51-52
: Accept parent context to honor caller cancellation.Using
context.Background()
creates a detached context that ignores any cancellation or timeout from the caller. During shutdown or request cancellation, this can cause the publish operation to hang for up to 5 seconds, leading to resource leaks and degraded responsiveness.Update the
Notify
signature to accept acontext.Context
parameter and use it as the parent for the timeout:-func (s *Service) Notify(userID string, deviceID *string, event Event) error { +func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error { if event.EventType == "" { + s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("event type is empty") } - subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel()Then update all callers of
Notify
to pass an appropriate context.
🧹 Nitpick comments (4)
pkg/cache/cache.go (2)
39-39
: Consider adding context parameter for timeout control.While
Close() error
follows the standardio.Closer
pattern, operations that close network resources (like Redis connections) could benefit from a context parameter for timeout and cancellation control. Consider whetherClose(ctx context.Context) error
would better align with the other methods and provide better control over cleanup operations.
37-38
: Enhance documentation with idempotency and error behavior.Consider documenting whether
Close()
is idempotent (safe to call multiple times) and what specific error conditions callers should expect. This helps consumers understand proper usage patterns.internal/sms-gateway/pubsub/config.go (1)
3-7
: Clarify BufferSize semantics (and consider int).Document that 0 means unbuffered; consider int for BufferSize to match common make(chan, n) usage and avoid ad-hoc casts elsewhere. Safe to keep uint if you prefer—just be consistent across options/backends.
pkg/pubsub/memory.go (1)
33-36
: High fan-out: goroutine-per-subscriber delivery could be costly.If topics have many subscribers and events are frequent, consider bounded worker pool or direct synchronous sends with timeouts/backpressure to reduce goroutine churn.
If helpful, I can sketch benchmarks to gauge throughput/latency under fan-out.
Also applies to: 55-76
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sum
is excluded by!**/*.sum
📒 Files selected for processing (20)
go.mod
(0 hunks)internal/config/config.go
(3 hunks)internal/config/module.go
(2 hunks)internal/sms-gateway/app.go
(2 hunks)internal/sms-gateway/cache/factory.go
(1 hunks)internal/sms-gateway/modules/events/events.go
(2 hunks)internal/sms-gateway/modules/events/metrics.go
(1 hunks)internal/sms-gateway/modules/events/module.go
(1 hunks)internal/sms-gateway/modules/events/service.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)internal/sms-gateway/pubsub/config.go
(1 hunks)internal/sms-gateway/pubsub/module.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)pkg/cache/cache.go
(1 hunks)pkg/cache/memory.go
(2 hunks)pkg/cache/redis.go
(2 hunks)pkg/pubsub/memory.go
(1 hunks)pkg/pubsub/options.go
(1 hunks)pkg/pubsub/pubsub.go
(1 hunks)pkg/pubsub/redis.go
(1 hunks)
💤 Files with no reviewable changes (1)
- go.mod
🚧 Files skipped from review as they are similar to previous changes (12)
- pkg/pubsub/options.go
- internal/sms-gateway/app.go
- pkg/cache/memory.go
- internal/sms-gateway/pubsub/pubsub.go
- internal/config/module.go
- pkg/cache/redis.go
- internal/config/config.go
- pkg/pubsub/pubsub.go
- internal/sms-gateway/modules/events/module.go
- internal/sms-gateway/modules/events/types.go
- internal/sms-gateway/pubsub/module.go
- internal/sms-gateway/cache/factory.go
🧰 Additional context used
🧬 Code graph analysis (4)
pkg/pubsub/redis.go (3)
pkg/pubsub/options.go (1)
Option
(3-3)pkg/pubsub/pubsub.go (5)
ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)Message
(13-16)PubSub
(45-50)internal/config/config.go (1)
PubSub
(79-81)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
Message
(13-16)ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)PubSub
(45-50)pkg/pubsub/options.go (1)
Option
(3-3)
internal/sms-gateway/modules/events/events.go (2)
internal/sms-gateway/modules/events/types.go (2)
Event
(9-12)NewEvent
(14-19)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/events/metrics.go (7)
DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)EventTypeUnknown
(26-26)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)DeliveryTypeSSE
(19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: E2E
- GitHub Check: Analyze (go)
🔇 Additional comments (3)
pkg/cache/cache.go (1)
37-39
: All implementations of theCache
interface within this codebase have already been updated with theClose()
method.Both
memoryCache
(pkg/cache/memory.go:172) andredisCache
(pkg/cache/redis.go:203) implementClose()
, confirmed by compile-time assertions. No additional implementations exist that require updates.Note: This remains a breaking change for external consumers of the public
pkg/cache
package, but the PR itself is complete for this codebase.internal/sms-gateway/modules/events/events.go (1)
9-11
: LGTM – value returns align with NewEvent.Constructors now return Event by value consistently. Please ensure downstream call sites expect values (not pointers).
Also applies to: 13-15, 17-25, 27-29
internal/sms-gateway/modules/events/service.go (1)
76-103
: LGTM—past review concerns addressed.The subscription lifecycle (lines 77-81), closed-channel handling (lines 89-93), and error logging without PII (line 97) correctly address previous review comments. The deserialization error flow with metrics and safe logging is well implemented.
a102d73
to
2ffc5a2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/sms-gateway/cache/factory.go (1)
44-59
: Critical: Double prefix bug causes cache key collision.Line 59 prepends
keyPrefix
toname
before invokingf.new()
, then line 47 prependskeyPrefix
again. This results inPrefix: keyPrefix + keyPrefix + originalName
, breaking cache isolation.Apply this fix:
case "redis": return &factory{ new: func(name string) (Cache, error) { return cache.NewRedis(cache.RedisConfig{ Client: nil, URL: config.URL, - Prefix: keyPrefix + name, + Prefix: name, TTL: 0, }) }, }, nil
♻️ Duplicate comments (3)
internal/sms-gateway/modules/events/metrics.go (1)
22-27
: Add a validation-specific failure reason (follow-up to prior review).Differentiate malformed payloads from serialization errors for cleaner metrics.
FailureReasonSerializationError = "serialization_error" FailureReasonPublishError = "publish_error" FailureReasonProviderFailed = "provider_failed" + FailureReasonValidationError = "validation_error"
Then use FailureReasonValidationError where inputs fail validation (e.g., empty event_type).
internal/sms-gateway/modules/events/service.go (2)
46-49
: Increment metrics on validation failure.The validation check correctly guards against empty EventType, but doesn't record the failure in metrics. This creates a blind spot in observability.
Apply this fix:
func (s *Service) Notify(userID string, deviceID *string, event Event) error { if event.EventType == "" { + s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("event type is empty") }
51-52
: Accept parent context to respect caller cancellation.Using
context.Background()
ignores the caller's cancellation and timeout, potentially causing the publish to hang for 5 seconds even when the caller has been cancelled. This degrades system responsiveness during shutdown.Apply this fix to accept and propagate caller context:
-func (s *Service) Notify(userID string, deviceID *string, event Event) error { +func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error { if event.EventType == "" { s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("event type is empty") } - subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel()
🧹 Nitpick comments (5)
internal/sms-gateway/modules/events/types.go (2)
31-33
: Return validation error from deserialize (empty event_type, missing user_id).Currently nil-pointer risk is gone (Event is a value), but invalid payloads slip through (e.g., event_type empty). Prefer failing fast in deserialize.
func (w *eventWrapper) deserialize(data []byte) error { - return json.Unmarshal(data, w) + dec := json.NewDecoder(bytes.NewReader(data)) + dec.DisallowUnknownFields() + if err := dec.Decode(w); err != nil { + return err + } + if w.UserID == "" { + return fmt.Errorf("user_id is required") + } + if w.Event.EventType == "" { + return fmt.Errorf("event.event_type is required") + } + return nil }Also update imports:
-import "encoding/json" +import ( + "bytes" + "encoding/json" + "fmt" +)
10-11
: Align event JSON shape across modules (event vs event_type).This file serializes wrapper as {"event":{"event_type":...,"data":...}}, while SSE uses {"event": "...", "data": ...}. Consider a single canonical Event representation or explicit translation helpers to avoid confusion and drift.
Please confirm intended cross-module JSON contracts (Pub/Sub vs SSE) and whether a shared type or mapper should be introduced.
Also applies to: 21-25
internal/config/config.go (1)
79-81
: Expose Pub/Sub buffer size in config (avoid magic 128).internal/config/module.go hardcodes BufferSize: 128. Make it configurable alongside URL.
type PubSub struct { - URL string `yaml:"url" envconfig:"PUBSUB__URL"` + URL string `yaml:"url" envconfig:"PUBSUB__URL"` + BufferSize int `yaml:"buffer_size" envconfig:"PUBSUB__BUFFER_SIZE"` }Default:
PubSub: PubSub{ - URL: "memory://", + URL: "memory://", + BufferSize: 128, },And in internal/config/module.go (for completeness):
return pubsub.Config{ - URL: cfg.PubSub.URL, - BufferSize: 128, + URL: cfg.PubSub.URL, + BufferSize: cfg.PubSub.BufferSize, }Also applies to: 111-113
internal/sms-gateway/pubsub/module.go (1)
18-24
: Avoid shadowing package name; log Close errors in OnStop.Variable name equals imported package; also errors from Close are swallowed by Fx logs without module context.
- fx.Invoke(func(pubsub pubsub.PubSub, lc fx.Lifecycle) { + fx.Invoke(func(ps pubsub.PubSub, lc fx.Lifecycle, logger *zap.Logger) { lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { - return pubsub.Close() + if err := ps.Close(); err != nil { + logger.Error("pubsub close failed", zap.Error(err)) + return err + } + return nil }, }) }),internal/sms-gateway/modules/events/module.go (1)
21-28
: Add panic guard in Run goroutine to trigger graceful shutdown.Prevents silent process crashes in the events service goroutine; keeps error handling behavior consistent across both error and panic paths.
go func() { + defer func() { + if r := recover(); r != nil { + logger.Error("Panic in events service", zap.Any("recover", r)) + if err := sh.Shutdown(fx.ExitCode(1)); err != nil { + logger.Error("Failed to shutdown", zap.Error(err)) + } + } + }() if err := svc.Run(ctx); err != nil { logger.Error("Error running events service", zap.Error(err)) if err := sh.Shutdown(fx.ExitCode(1)); err != nil { logger.Error("Failed to shutdown", zap.Error(err)) } } }()Verified:
Service.Run(ctx context.Context)
returnserror
atinternal/sms-gateway/modules/events/service.go:76
, so the block compiles correctly.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sum
is excluded by!**/*.sum
📒 Files selected for processing (18)
go.mod
(0 hunks)internal/config/config.go
(3 hunks)internal/config/module.go
(2 hunks)internal/sms-gateway/app.go
(2 hunks)internal/sms-gateway/cache/factory.go
(1 hunks)internal/sms-gateway/modules/events/events.go
(2 hunks)internal/sms-gateway/modules/events/metrics.go
(1 hunks)internal/sms-gateway/modules/events/module.go
(1 hunks)internal/sms-gateway/modules/events/service.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)internal/sms-gateway/pubsub/config.go
(1 hunks)internal/sms-gateway/pubsub/module.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)pkg/cache/cache.go
(1 hunks)pkg/cache/memory.go
(2 hunks)pkg/cache/redis.go
(2 hunks)pkg/pubsub/memory.go
(1 hunks)pkg/pubsub/redis.go
(1 hunks)
💤 Files with no reviewable changes (1)
- go.mod
🚧 Files skipped from review as they are similar to previous changes (6)
- internal/config/module.go
- internal/sms-gateway/pubsub/config.go
- pkg/cache/memory.go
- pkg/cache/cache.go
- pkg/pubsub/redis.go
- internal/sms-gateway/app.go
🧰 Additional context used
🧬 Code graph analysis (10)
internal/config/config.go (1)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
Service
(19-30)internal/sms-gateway/app.go (1)
Run
(62-73)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
Message
(13-16)ErrPubSubClosed
(9-9)ErrInvalidTopic
(10-10)Subscription
(18-23)PubSub
(45-50)pkg/pubsub/options.go (1)
Option
(3-3)
internal/sms-gateway/modules/events/events.go (2)
internal/sms-gateway/modules/events/types.go (2)
Event
(9-12)NewEvent
(14-19)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)
internal/sms-gateway/pubsub/pubsub.go (5)
internal/config/config.go (2)
PubSub
(79-81)Config
(10-19)internal/sms-gateway/pubsub/config.go (1)
Config
(4-7)pkg/pubsub/options.go (1)
WithBufferSize
(17-21)pkg/pubsub/memory.go (1)
NewMemory
(24-36)pkg/pubsub/redis.go (2)
NewRedis
(42-76)RedisConfig
(15-27)
internal/sms-gateway/modules/events/service.go (10)
internal/sms-gateway/modules/push/service.go (1)
Service
(36-46)internal/sms-gateway/modules/messages/service.go (2)
Service
(48-61)NewService
(63-85)internal/sms-gateway/modules/sse/service.go (2)
Service
(17-25)NewService
(38-47)internal/sms-gateway/modules/settings/service.go (2)
Service
(19-25)NewService
(27-35)internal/sms-gateway/modules/webhooks/service.go (2)
Service
(28-37)NewService
(39-50)internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/events/metrics.go (6)
DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)EventTypeUnknown
(26-26)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)
internal/sms-gateway/pubsub/module.go (4)
internal/config/module.go (1)
Module
(22-131)internal/sms-gateway/app.go (1)
Module
(36-60)internal/sms-gateway/pubsub/pubsub.go (2)
New
(16-41)PubSub
(14-14)internal/config/config.go (1)
PubSub
(79-81)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
NewRedis
(62-89)RedisConfig
(37-51)
pkg/cache/redis.go (2)
pkg/cache/cache.go (1)
Cache
(5-40)internal/config/config.go (1)
Cache
(75-77)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: E2E
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: Analyze (go)
🔇 Additional comments (7)
internal/sms-gateway/pubsub/pubsub.go (1)
16-40
: LGTM! Factory pattern correctly implemented.The function properly defaults to memory://, parses the URL, and switches on scheme to instantiate the appropriate backend with options.
pkg/pubsub/memory.go (1)
10-153
: LGTM! All previous concurrency issues resolved.The implementation correctly:
- Stores subscriber context alongside channels (lines 19-22, 100)
- Holds RLock during broadcast to prevent map mutation panics (lines 52-53)
- Checks subscriber cancellation to avoid deadlocks (line 74)
- Unsubscribes before closing channels to prevent send-on-closed panics (lines 109-111)
- Includes all fields in returned Subscription (line 116)
pkg/cache/redis.go (2)
36-89
: LGTM! Config-based Redis cache properly manages client lifecycle.The implementation correctly:
- Validates that either Client or URL is provided (lines 67-69)
- Creates a client from URL when needed (lines 72-79)
- Tracks ownership with
ownedClient
flag (line 83)- Normalizes prefix with trailing colon (lines 63-65)
203-211
: LGTM! Close method properly releases owned resources.The implementation correctly closes only owned clients, avoiding double-close bugs.
internal/sms-gateway/modules/events/events.go (1)
9-29
: LGTM! Event constructors correctly return by value.The migration from pointer to value semantics is appropriate for the Event struct and consistently applied across all constructors.
internal/sms-gateway/modules/events/service.go (2)
76-103
: LGTM! Run method correctly handles subscription lifecycle.The implementation properly:
- Checks for closed channel (lines 89-93)
- Increments metrics on deserialization failure (line 96)
- Logs errors without exposing PII (line 97)
- Returns on context cancellation (lines 86-88)
105-150
: LGTM! Event processing correctly accesses Event fields.The method properly uses
wrapper.Event.EventType
andwrapper.Event.Data
throughout, consistent with the updated Event struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal/sms-gateway/pubsub/module.go (1)
18-18
: Consider using the localPubSub
alias for consistency.The parameter uses the fully qualified
pubsub.PubSub
type from the imported package, while the local package exports aPubSub
alias. Using the local alias would be more consistent with the package's public API and the return type ofNew()
.Apply this diff for consistency:
- fx.Invoke(func(ps pubsub.PubSub, logger *zap.Logger, lc fx.Lifecycle) { + fx.Invoke(func(ps PubSub, logger *zap.Logger, lc fx.Lifecycle) {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal/sms-gateway/cache/factory.go
(2 hunks)internal/sms-gateway/pubsub/module.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
internal/sms-gateway/pubsub/module.go (4)
internal/sms-gateway/app.go (1)
Module
(36-60)internal/config/module.go (1)
Module
(22-131)internal/sms-gateway/pubsub/pubsub.go (2)
New
(16-41)PubSub
(14-14)internal/config/config.go (1)
PubSub
(79-81)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
NewRedis
(62-89)RedisConfig
(37-51)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: E2E
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: Analyze (go)
🔇 Additional comments (4)
internal/sms-gateway/cache/factory.go (2)
44-49
: Namespace bug fixed! Verify TTL configuration.The Redis prefix now correctly includes the cache name (
keyPrefix + name
), resolving the previous namespace collision issue. Each cache will have a unique namespace.However, the TTL is hardcoded to 0. Verify whether this is intentional (meaning no expiration) or if TTL should be configurable via the factory config.
59-59
: LGTM!The delegation is clean and correct. The raw name is passed to the internal factory function, which handles prefixing appropriately in the Redis case.
internal/sms-gateway/pubsub/module.go (2)
1-9
: LGTM: Clean imports and package structure.The package declaration and imports are correct. All dependencies are necessary for the fx module setup.
11-29
: LGTM: Well-structured fx module integration.The module follows Uber Fx best practices with proper logger scoping, constructor wiring, and lifecycle management. The OnStop hook ensures graceful shutdown of the PubSub instance.
3d03662
to
7e48f90
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
internal/sms-gateway/modules/events/service.go (2)
51-52
: Accept parent context to respect caller cancellation.Using
context.Background()
ignores any caller's cancellation or timeout, potentially causing the operation to block for up to 5 seconds even when the caller has been cancelled. This can lead to resource leaks and degraded responsiveness during shutdown.Consider updating the
Notify
signature to accept a context parameter:-func (s *Service) Notify(userID string, deviceID *string, event Event) error { +func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error { if event.EventType == "" { s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("event type is empty") } - subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel()Then update all callers to pass an appropriate context.
46-49
: Add failure metric for validation errors.The validation check on line 47-49 returns an error but doesn't increment a failure metric. This makes it difficult to monitor validation failures in production.
Apply this diff to add metric tracking:
func (s *Service) Notify(userID string, deviceID *string, event Event) error { if event.EventType == "" { + s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) return fmt.Errorf("event type is empty") }
Note: If you add
FailureReasonValidationError
as suggested inmetrics.go
, use that constant instead ofFailureReasonSerializationError
for clearer metrics distinction.
🧹 Nitpick comments (1)
internal/sms-gateway/modules/events/metrics.go (1)
22-26
: Consider adding a validation-specific failure reason constant.The past review comments suggest adding
FailureReasonValidationError = "validation_error"
to distinguish validation failures from serialization errors. Currently, validation failures inservice.go
(line 48) don't increment any failure metric, and if they did, they would need a distinct constant to avoid conflating validation issues with actual serialization failures.Apply this diff to add the validation error constant:
FailureReasonSerializationError = "serialization_error" FailureReasonPublishError = "publish_error" + FailureReasonValidationError = "validation_error" FailureReasonProviderFailed = "provider_failed"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sum
is excluded by!**/*.sum
📒 Files selected for processing (20)
go.mod
(0 hunks)internal/config/config.go
(3 hunks)internal/config/module.go
(2 hunks)internal/sms-gateway/app.go
(2 hunks)internal/sms-gateway/cache/factory.go
(2 hunks)internal/sms-gateway/modules/events/events.go
(2 hunks)internal/sms-gateway/modules/events/metrics.go
(1 hunks)internal/sms-gateway/modules/events/module.go
(1 hunks)internal/sms-gateway/modules/events/service.go
(2 hunks)internal/sms-gateway/modules/events/types.go
(1 hunks)internal/sms-gateway/pubsub/config.go
(1 hunks)internal/sms-gateway/pubsub/module.go
(1 hunks)internal/sms-gateway/pubsub/pubsub.go
(1 hunks)pkg/cache/cache.go
(1 hunks)pkg/cache/memory.go
(2 hunks)pkg/cache/redis.go
(2 hunks)pkg/pubsub/memory.go
(1 hunks)pkg/pubsub/options.go
(1 hunks)pkg/pubsub/pubsub.go
(1 hunks)pkg/pubsub/redis.go
(1 hunks)
💤 Files with no reviewable changes (1)
- go.mod
🚧 Files skipped from review as they are similar to previous changes (6)
- internal/config/config.go
- internal/sms-gateway/pubsub/module.go
- pkg/pubsub/memory.go
- pkg/cache/cache.go
- pkg/pubsub/options.go
- pkg/pubsub/redis.go
🧰 Additional context used
🧬 Code graph analysis (10)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
Service
(19-30)internal/sms-gateway/app.go (1)
Run
(62-73)
internal/sms-gateway/app.go (2)
internal/config/module.go (1)
Module
(22-131)internal/sms-gateway/pubsub/module.go (1)
Module
(10-29)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
NewRedis
(62-89)RedisConfig
(37-51)
pkg/cache/memory.go (2)
internal/sms-gateway/cache/factory.go (1)
Cache
(14-14)pkg/cache/cache.go (1)
Cache
(5-40)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)internal/sms-gateway/modules/events/types.go (1)
Event
(9-12)internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/events/metrics.go (7)
DeliveryTypeUnknown
(20-20)FailureReasonSerializationError
(22-22)FailureReasonPublishError
(23-23)EventTypeUnknown
(26-26)DeliveryTypePush
(18-18)FailureReasonProviderFailed
(24-24)DeliveryTypeSSE
(19-19)
internal/config/module.go (3)
internal/config/config.go (2)
Config
(10-19)PubSub
(79-81)internal/sms-gateway/pubsub/config.go (1)
Config
(4-7)internal/sms-gateway/pubsub/pubsub.go (1)
PubSub
(14-14)
internal/sms-gateway/modules/events/events.go (1)
internal/sms-gateway/modules/events/types.go (2)
Event
(9-12)NewEvent
(14-19)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types/types.go (1)
Event
(7-10)internal/sms-gateway/modules/push/types.go (1)
Event
(16-16)
internal/sms-gateway/pubsub/pubsub.go (5)
internal/config/config.go (2)
PubSub
(79-81)Config
(10-19)internal/sms-gateway/pubsub/config.go (1)
Config
(4-7)pkg/pubsub/options.go (1)
WithBufferSize
(17-21)pkg/pubsub/memory.go (1)
NewMemory
(24-36)pkg/pubsub/redis.go (2)
NewRedis
(42-76)RedisConfig
(15-27)
pkg/cache/redis.go (2)
pkg/cache/cache.go (1)
Cache
(5-40)internal/config/config.go (1)
Cache
(75-77)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: Test
- GitHub Check: E2E
- GitHub Check: Analyze (go)
🔇 Additional comments (16)
internal/sms-gateway/pubsub/config.go (1)
1-7
: LGTM!The Config struct is clean and well-documented. The URL-based backend selection pattern is consistent with the cache factory approach elsewhere in the codebase.
internal/sms-gateway/cache/factory.go (1)
44-49
: Good fix for the namespace collision issue.The Redis cache now correctly uses
keyPrefix + name
as the Prefix, which addresses the past review concern about all caches colliding on the same namespace.pkg/cache/redis.go (2)
36-89
: LGTM! Clean config-based initialization with proper ownership tracking.The RedisConfig pattern and client ownership management are well-implemented:
- Clear validation that either Client or URL must be provided
- Proper prefix normalization
- Ownership tracking enables safe resource cleanup
203-211
: LGTM! Proper resource cleanup with ownership check.The Close method correctly closes only owned clients, preventing double-close issues when an external client is provided. The compile-time assertion is good practice.
internal/sms-gateway/modules/events/events.go (1)
9-29
: LGTM! Consistent value-type returns.The event constructors now return Event by value, which aligns with the updated Event type definition and eliminates pointer management overhead for these immutable data structures.
internal/sms-gateway/modules/events/service.go (2)
89-100
: LGTM! Proper closed-channel handling.The select statement correctly checks the
ok
value from the channel receive, preventing tight loops when the subscription is closed. The error handling and metrics for deserialization failures are appropriate.
105-150
: LGTM! Clean device-based event routing.The processEvent method properly handles device selection, routing to push or SSE based on token availability, and tracks success/failure metrics for each delivery type.
internal/config/module.go (1)
125-130
: LGTM! Correct URL source for PubSub configuration.The provider now correctly reads from
cfg.PubSub.URL
instead ofcfg.Cache.URL
, addressing the past review concern. The BufferSize of 128 provides reasonable capacity for the Pub/Sub channels.pkg/cache/memory.go (2)
16-23
: LGTM! Constructor change aligns with lifecycle management.Returning the concrete
*memoryCache
type instead of theCache
interface is appropriate here, especially with the compile-time check at Line 176 ensuring interface compliance.
172-176
: LGTM! Close implementation and interface check are correct.The no-op
Close()
is appropriate for the in-memory cache (no external resources to clean up), and the compile-time assertion ensures interface compliance.internal/sms-gateway/modules/events/module.go (1)
17-28
: LGTM! Error handling and shutdown logic are well-structured.The addition of error handling for
svc.Run(ctx)
with proper logging and graceful shutdown viafx.Shutdowner
follows fx lifecycle best practices.internal/sms-gateway/app.go (1)
24-24
: LGTM! Clean integration of the pubsub module.The import and module registration are straightforward and follow the existing pattern.
Also applies to: 49-49
pkg/pubsub/pubsub.go (3)
8-16
: LGTM! Clean error definitions and message structure.The sentinel errors and
Message
struct provide a clear, simple foundation for the pub/sub abstraction.
18-31
: LGTM! Well-designed subscription lifecycle management.The
Subscription
type properly encapsulates its state with unexported fields, exposes a read-only channel viaReceive()
, and uses context cancellation for cleanup—following Go best practices.
33-50
: LGTM! Well-structured interface design.The interface hierarchy follows good design principles:
- Segregates
Publisher
andSubscriber
concerns- Combines them in the
PubSub
interface with lifecycle management- Clear documentation of behavior (fan-out, context cancellation)
internal/sms-gateway/pubsub/pubsub.go (1)
16-41
: No issues found—Redis client closure is properly implemented.The verification confirms the factory implementation correctly manages the Redis client lifecycle:
- Factory passes
Client: nil
to RedisConfig (line 20)- NewRedis() detects nil client and creates a new one via
redis.NewClient(opt)
(line 59)ownedClient
flag is correctly set totrue
when the client is created internally (line 71)- Close() properly guards the client closure with the
ownedClient
check and callsr.client.Close()
when owned (lines 173–174)The past concern about Redis client closure is not validated—the code already implements proper client lifecycle management.
Summary by CodeRabbit
New Features
Improvements
API