Skip to content

Conversation

capcom6
Copy link
Member

@capcom6 capcom6 commented Oct 9, 2025

Summary by CodeRabbit

  • New Features

    • Configurable Pub/Sub with in-memory and Redis backends, buffer-size option, and runtime module to wire Pub/Sub with lifecycle hooks.
  • Improvements

    • Events now routed over Pub/Sub with JSON serialization, publish metrics, and improved startup/shutdown handling.
    • Cache and Pub/Sub implementations support explicit Close for clean shutdown; default Pub/Sub URL set to memory://.
  • API

    • Events are JSON-serializable; several event constructors now return values (non-pointer) and event fields are exported.

Copy link

coderabbitai bot commented Oct 9, 2025

Walkthrough

Adds a pub/sub abstraction with memory and Redis backends, an options pattern, fx wiring and factory for sms-gateway, migrates events delivery to Pub/Sub with JSON-serializable wrappers and value-semantics for Event, and updates cache backends and interfaces to support configurable Redis creation and Close.

Changes

Cohort / File(s) Summary
Pub/Sub Core
pkg/pubsub/pubsub.go, pkg/pubsub/options.go
Introduces pub/sub API: Message, Subscription (Receive/Close), errors (ErrPubSubClosed, ErrInvalidTopic), interfaces (Subscriber,Publisher,PubSub), options pattern (Option, WithBufferSize).
In-memory Pub/Sub
pkg/pubsub/memory.go
Adds memory-backed memoryPubSub with NewMemory(opts...), topic subscriber maps, RWMutex/WaitGroup, Publish, Subscribe, subscribe/unsubscribe helpers, Close, and compile-time interface assertion.
Redis Pub/Sub
pkg/pubsub/redis.go
Adds redisPubSub and RedisConfig, NewRedis(config, opts...), Pub/Sub publish/subscribe using Redis channels, per-subscription forwarder goroutines, Publish, Subscribe, Close, and interface assertion.
Internal sms-gateway pubsub module
internal/sms-gateway/pubsub/config.go, internal/sms-gateway/pubsub/module.go, internal/sms-gateway/pubsub/pubsub.go
Adds internal Config (URL, BufferSize), Module() fx option (scoped logger, provider, shutdown hook), and New(config) factory selecting memory or redis backend; exposes type PubSub = pubsub.PubSub.
App wiring & config
internal/config/config.go, internal/config/module.go, internal/sms-gateway/app.go
Adds PubSub field to global Config (default memory://), fx provider mapping to pubsub.Config, and registers pubsub.Module() in application modules.
Events — types & serialization
internal/sms-gateway/modules/events/types.go, internal/sms-gateway/modules/events/events.go
Exports Event fields with JSON tags; NewEvent and several event constructors now return value Event; adds eventWrapper JSON tags and serialize/deserialize methods.
Events — service integration
internal/sms-gateway/modules/events/service.go, internal/sms-gateway/modules/events/metrics.go, internal/sms-gateway/modules/events/module.go
Replaces in-process queue with Pub/Sub: Service depends on pubsub.PubSub; Notify serializes and publishes wrappers (5s timeout) with metrics/error handling; Run subscribes, deserializes and processes messages; lifecycle invoke now logs and triggers shutdown on Run error; updates failure-reason constants.
Cache changes & lifecycle
pkg/cache/cache.go, pkg/cache/memory.go, pkg/cache/redis.go, internal/sms-gateway/cache/factory.go
Adds Close() error to Cache interface; memory cache implements Close; Redis cache accepts RedisConfig (Client or URL), may create client from URL, normalizes prefix, owns/closes client when created; factory defers Redis client construction (passes nil Client + URL).
Dependency tidy
go.mod
Removes github.com/android-sms-gateway/core v1.0.1 from requirements.

Sequence Diagram(s)

sequenceDiagram
    participant Producer as Service.Notify
    participant PubSub as PubSub (Memory/Redis)
    participant Consumer as Service.Run

    rect rgb(250,247,237)
    Producer->>Producer: build eventWrapper & json.Marshal (5s timeout)
    Producer->>PubSub: Publish(ctx, "events", data)
    PubSub-->>Producer: return (err / nil)
    end

    rect rgb(237,250,243)
    Consumer->>PubSub: Subscribe(ctx, "events")
    PubSub-->>Consumer: *Subscription (chan Message)
    loop for each message
        Consumer->>Subscription: receive from channel
        Subscription-->>Consumer: Message{Topic, Data}
        Consumer->>Consumer: json.Unmarshal -> eventWrapper
        Consumer->>Consumer: processEvent(&eventWrapper)
    end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Add SSE support #154 — Modifies the events subsystem and fx wiring; strongly related to migrating event delivery to Pub/Sub and changes to Service/Notify/Run signatures.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "[pubsub] introduce memory and redis implementations" is specific and directly related to the primary changes in this pull request. The changeset introduces a comprehensive pub/sub system with core interfaces and two concrete implementations (memory-based and Redis-backed), which is accurately captured by the title. While the PR also includes important consequential changes such as events module refactoring, cache interface updates, and configuration changes, these are enablers or consumers of the new pub/sub system rather than the main focus. The title is clear and specific enough that a teammate scanning commit history would understand that a new pub/sub abstraction with multiple backends was introduced.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch events/multi-instance-support

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6fad3c1 and c2d4671.

📒 Files selected for processing (4)
  • pkg/pubsub/memory.go (1 hunks)
  • pkg/pubsub/options.go (1 hunks)
  • pkg/pubsub/pubsub.go (1 hunks)
  • pkg/pubsub/redis.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
  • Message (13-16)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • PubSub (45-50)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/pubsub.go (5)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • Message (13-16)
  • PubSub (45-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: E2E
  • GitHub Check: Test
  • GitHub Check: Lint
  • GitHub Check: Benchmark
  • GitHub Check: Analyze (go)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c2d4671 and 6052f3d.

📒 Files selected for processing (2)
  • pkg/pubsub/memory.go (1 hunks)
  • pkg/pubsub/redis.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/pubsub/memory.go
🧰 Additional context used
🧬 Code graph analysis (1)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/pubsub.go (5)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • Message (13-16)
  • PubSub (45-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Benchmark
  • GitHub Check: Lint
  • GitHub Check: E2E
  • GitHub Check: Analyze (go)
🔇 Additional comments (5)
pkg/pubsub/redis.go (5)

1-22: LGTM!

The imports and struct definition are well-organized with appropriate synchronization primitives and fields.


24-39: LGTM!

The constructor properly applies the options pattern and initializes all struct fields correctly.


41-53: LGTM!

The Publish method correctly validates the closed state and topic, and properly delegates to the Redis client.


82-112: LGTM!

The goroutine implementation is correct with proper cleanup, comprehensive exit path handling, and safe message forwarding.


130-130: LGTM!

The compile-time interface assertion correctly verifies that redisPubSub implements the PubSub interface.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6052f3d and 5ddd58e.

📒 Files selected for processing (2)
  • pkg/pubsub/memory.go (1 hunks)
  • pkg/pubsub/redis.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/pubsub.go (5)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • Message (13-16)
  • PubSub (45-50)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
  • Message (13-16)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • PubSub (45-50)
pkg/pubsub/options.go (1)
  • Option (3-3)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Build / Docker image (linux/amd64)
  • GitHub Check: Build / Docker image (linux/arm64)
  • GitHub Check: Benchmark
  • GitHub Check: Analyze (go)
🔇 Additional comments (4)
pkg/pubsub/memory.go (2)

33-73: LGTM! Publish synchronization is safe.

The RLock held via defer (line 45) ensures the lock isn't released until after wg.Wait() completes (line 70), preventing the race where unsubscribe could remove and close a channel while publisher goroutines are sending. The past review concerns about concurrent map access have been properly addressed.


132-143: Close is idempotent and thread-safe.

Returning nil on repeated calls (line 135) matches the Redis implementation's behavior, resolving the API consistency concern from past reviews.

pkg/pubsub/redis.go (2)

56-117: LGTM! Subscription lifecycle is correct.

The Subscribe implementation properly waits for Redis confirmation (line 68) before proceeding, and handles errors by closing the pubsub object (line 70). The goroutine's defer block (lines 85-94) cleans up resources in the correct order: close Redis subscription, close local channel, remove tracking, signal completion.


119-130: Past review comment about Close behavior is now outdated.

The past review suggested aligning Close behavior between memory and Redis implementations. Both now return nil on repeated calls (memory.go line 135, redis.go line 122), making them consistent. The concern has been resolved.

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from 84dfae9 to 737cc55 Compare October 10, 2025 23:24
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
pkg/pubsub/memory.go (2)

121-130: Consider using defer for mutex unlock.

The explicit unlock pattern works correctly, but using defer would be more idiomatic and maintainable.

Apply this diff to use defer:

 func (m *memoryPubSub) unsubscribe(id, topic string) {
 	m.mu.Lock()
+	defer m.mu.Unlock()
+
 	subscriptions, ok := m.topics[topic]
 	if !ok {
-		m.mu.Unlock()
 		return
 	}
 	delete(subscriptions, id)
-	m.mu.Unlock()
 }

44-72: Document blocking Publish behavior.

Publish blocks until all subscribers receive the message or the context/close signal is triggered. This differs from the Redis implementation (which returns immediately after sending to Redis) and could surprise callers expecting fire-and-forget semantics.

Consider adding a comment documenting this behavior:

// Publish sends a message to all subscribers of the given topic.
// This method blocks until all subscribers have received the message
// or until ctx is cancelled or the pubsub instance is closed.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 84dfae9 and 737cc55.

📒 Files selected for processing (4)
  • pkg/pubsub/memory.go (1 hunks)
  • pkg/pubsub/options.go (1 hunks)
  • pkg/pubsub/pubsub.go (1 hunks)
  • pkg/pubsub/redis.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/pubsub/pubsub.go
🧰 Additional context used
🧬 Code graph analysis (2)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
  • Message (13-16)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • PubSub (45-50)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/pubsub.go (5)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • Message (13-16)
  • PubSub (45-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: E2E
  • GitHub Check: Benchmark
  • GitHub Check: Analyze (go)
🔇 Additional comments (1)
pkg/pubsub/redis.go (1)

119-130: Close behavior is now consistent.

Both memory and Redis implementations now return nil on repeated Close() calls, providing consistent idempotent semantics. This resolves the inconsistency mentioned in previous reviews.

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from 66878bf to 56f4ebd Compare October 12, 2025 05:07
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 56f4ebd and 51f7dca.

📒 Files selected for processing (6)
  • internal/config/config.go (3 hunks)
  • internal/config/module.go (2 hunks)
  • internal/sms-gateway/app.go (2 hunks)
  • internal/sms-gateway/pubsub/config.go (1 hunks)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
internal/sms-gateway/pubsub/module.go (3)
internal/config/module.go (1)
  • Module (22-131)
internal/sms-gateway/app.go (1)
  • Module (36-60)
internal/sms-gateway/pubsub/pubsub.go (1)
  • New (15-40)
internal/config/module.go (2)
internal/config/config.go (2)
  • Config (10-19)
  • Cache (75-77)
internal/sms-gateway/pubsub/config.go (1)
  • Config (4-7)
internal/sms-gateway/app.go (2)
internal/config/module.go (1)
  • Module (22-131)
internal/sms-gateway/pubsub/module.go (1)
  • Module (8-16)
internal/sms-gateway/pubsub/pubsub.go (3)
internal/config/config.go (2)
  • Config (10-19)
  • PubSub (79-81)
internal/sms-gateway/pubsub/config.go (1)
  • Config (4-7)
pkg/pubsub/options.go (1)
  • WithBufferSize (17-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: E2E
  • GitHub Check: Benchmark
  • GitHub Check: Analyze (go)
🔇 Additional comments (12)
internal/config/config.go (3)

18-18: LGTM!

The PubSub configuration field is correctly added to the Config struct, following the same pattern as the existing Cache configuration.


79-81: LGTM!

The PubSub struct definition is consistent with the existing Cache struct pattern and correctly configured with appropriate YAML and environment variable tags.


111-113: LGTM!

The default PubSub configuration with "memory://" URL is appropriate for development and testing, consistent with the Cache default.

internal/sms-gateway/pubsub/config.go (1)

3-7: LGTM!

The Config struct is simple and clear, appropriately defining the parameters needed to control the PubSub backend via URL and configure the buffer size.

internal/sms-gateway/app.go (2)

24-24: LGTM!

The import for the pubsub package is correctly added.


49-49: LGTM!

The pubsub.Module() is correctly added to the fx.Module initialization, following the same pattern as other infrastructure modules.

internal/config/module.go (1)

14-14: LGTM!

The import for the pubsub package is correctly added.

internal/sms-gateway/pubsub/module.go (1)

8-16: LGTM!

The Module function correctly wires the pubsub subsystem into fx, with appropriate logger decoration and constructor provision.

internal/sms-gateway/pubsub/pubsub.go (4)

11-13: LGTM!

The topicPrefix constant is appropriately defined to namespace Redis topics.


16-23: LGTM!

URL defaulting and parsing logic is correct, with appropriate error handling and wrapping.


25-26: LGTM!

Options are correctly built with the configured BufferSize.


28-39: LGTM!

The scheme-based backend selection is clean and extensible. The Redis client lifecycle is properly managed by the NewRedis implementation (assuming it handles cleanup when the PubSub is closed).

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from bc2f812 to c96b2b5 Compare October 12, 2025 23:56
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
internal/config/module.go (1)

125-130: Previous issue resolved; consider documenting BufferSize choice.

The past review concern about using cfg.Cache.URL instead of cfg.PubSub.URL has been fixed - line 127 now correctly references cfg.PubSub.URL.

The hardcoded BufferSize: 128 could benefit from a brief comment explaining the rationale (e.g., "reasonable default for in-memory queuing") or extraction to a named constant if this value is significant.

Apply this diff to add a clarifying comment:

 	fx.Provide(func(cfg Config) pubsub.Config {
 		return pubsub.Config{
 			URL:        cfg.PubSub.URL,
-			BufferSize: 128,
+			BufferSize: 128, // Default channel buffer size for pub/sub message queuing
 		}
 	}),
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bc2f812 and c96b2b5.

📒 Files selected for processing (6)
  • internal/config/config.go (3 hunks)
  • internal/config/module.go (2 hunks)
  • internal/sms-gateway/app.go (2 hunks)
  • internal/sms-gateway/pubsub/config.go (1 hunks)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal/sms-gateway/pubsub/config.go
  • internal/sms-gateway/app.go
🧰 Additional context used
🧬 Code graph analysis (3)
internal/sms-gateway/pubsub/module.go (3)
internal/config/module.go (1)
  • Module (22-131)
internal/sms-gateway/app.go (1)
  • Module (36-60)
internal/sms-gateway/pubsub/pubsub.go (1)
  • New (15-40)
internal/config/module.go (2)
internal/config/config.go (2)
  • Config (10-19)
  • PubSub (79-81)
internal/sms-gateway/pubsub/config.go (1)
  • Config (4-7)
internal/sms-gateway/pubsub/pubsub.go (2)
internal/sms-gateway/pubsub/config.go (1)
  • Config (4-7)
pkg/pubsub/options.go (1)
  • WithBufferSize (17-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Benchmark
  • GitHub Check: E2E
  • GitHub Check: Analyze (go)
🔇 Additional comments (2)
internal/config/config.go (1)

18-18: LGTM! Configuration follows established patterns.

The PubSub configuration is structured identically to the Cache configuration, with appropriate YAML and environment variable tags, and a sensible default of memory:// for local development.

Also applies to: 79-82, 111-113

internal/sms-gateway/pubsub/module.go (1)

8-16: LGTM! FX module wiring is correct.

The module follows the established pattern for FX modules in this codebase, with appropriate logger decoration and constructor provision.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal/sms-gateway/pubsub/module.go (1)

18-24: PubSub.Close() is not context-aware; keep the call or change the API — rename the parameter to avoid shadowing

Verified: pkg/pubsub/pubsub.go declares Close() error, so OnStop's context cannot be forwarded to Close() as-is. Keep the current call (returns error correctly) or change the PubSub API to Close(context.Context) error if you need context-aware shutdown. Minor nit: rename the parameter pubsub (e.g., ps or pubsubSvc) to avoid shadowing the package.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c96b2b5 and c5c19f6.

📒 Files selected for processing (1)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/sms-gateway/pubsub/module.go (4)
internal/sms-gateway/app.go (1)
  • Module (36-60)
internal/config/module.go (1)
  • Module (22-131)
internal/sms-gateway/pubsub/pubsub.go (1)
  • New (15-40)
internal/config/config.go (1)
  • PubSub (79-81)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: E2E
  • GitHub Check: Benchmark
  • GitHub Check: Analyze (go)
🔇 Additional comments (2)
internal/sms-gateway/pubsub/module.go (2)

1-9: LGTM!

Package declaration and imports are clean and appropriate for an fx module with lifecycle management.


11-16: LGTM!

Module structure and logger decoration follow fx best practices. The named logger will provide clear context in log output.

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from c5c19f6 to 01f4557 Compare October 14, 2025 13:00
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal/sms-gateway/pubsub/config.go (1)

5-6: Consider adding field documentation and struct tags.

The exported fields lack individual documentation. While the type-level comment is helpful, documenting each field improves maintainability and helps users understand the purpose and constraints of each field.

Additionally, if this Config is unmarshaled from YAML/JSON configuration files, consider adding appropriate struct tags.

Apply this diff to add field documentation:

 // Config controls the PubSub backend via a URL (e.g., "memory://", "redis://...").
 type Config struct {
+	// URL specifies the backend to use (e.g., "memory://", "redis://host:port").
+	// Defaults to "memory://" if empty.
 	URL        string
+	// BufferSize sets the channel buffer size for the Pub/Sub implementation.
 	BufferSize uint
 }

If config unmarshaling is used, also add struct tags:

 type Config struct {
-	URL        string
-	BufferSize uint
+	URL        string `yaml:"url" json:"url"`
+	BufferSize uint   `yaml:"buffer_size" json:"buffer_size"`
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c5c19f6 and 01f4557.

📒 Files selected for processing (6)
  • internal/config/config.go (3 hunks)
  • internal/config/module.go (2 hunks)
  • internal/sms-gateway/app.go (2 hunks)
  • internal/sms-gateway/pubsub/config.go (1 hunks)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • internal/config/config.go
  • internal/sms-gateway/pubsub/module.go
  • internal/sms-gateway/pubsub/pubsub.go
  • internal/config/module.go
  • internal/sms-gateway/app.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: E2E
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: Lint
  • GitHub Check: Analyze (go)
🔇 Additional comments (1)
internal/sms-gateway/pubsub/config.go (1)

6-6: No action needed: BufferSize is initialized to 128 in internal/config/module.go and always passed via WithBufferSize, so it never remains zero at runtime.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/sms-gateway/modules/events/service.go (1)

97-142: Defensive check for nil event in consumed messages.

eventWrapper.Event can be nil after unmarshal; prevent panic and record a failure.

 func (s *Service) processEvent(wrapper *eventWrapper) {
+	if wrapper.Event == nil {
+		s.metrics.IncrementFailed("unknown", DeliveryTypeUnknown, FailureReasonSerializationError)
+		s.logger.Error("Received event wrapper with nil event", zap.String("user_id", wrapper.UserID))
+		return
+	}
♻️ Duplicate comments (1)
internal/sms-gateway/pubsub/pubsub.go (1)

34-38: Ensure Redis client is closed by PubSub.Close() (connection leak risk).

Confirm pkgpubsub.NewRedis’s Close() calls client.Close().

#!/bin/bash
# Verify redis PubSub Close closes the underlying client
rg -nP 'type\s+\w+\s+struct\s*\{[^}]*client[^\n]*\n[^}]*\}' --type=go -C3
rg -nP 'func\s*\(\s*\w+\s*\*\w+\s*\)\s*Close\(\)\s*error' --type=go -C3
rg -n 'client\.Close\(\)' --type=go -C3
🧹 Nitpick comments (6)
internal/sms-gateway/modules/events/metrics.go (1)

22-25: Consider adding a distinct deserialization failure reason.

Consumer-side JSON parse failures are different from producer-side serialization. Add FailureReasonDeserializationError for clearer metrics and use it in the Run loop on unmarshal errors.

internal/sms-gateway/pubsub/pubsub.go (3)

7-9: Avoid same-name import shadowing for clarity.

Alias the external pkg to reduce confusion within package pubsub.

-import (
-	"fmt"
-	"net/url"
-
-	"github.com/android-sms-gateway/core/redis"
-	"github.com/android-sms-gateway/server/pkg/pubsub"
-)
+import (
+	"fmt"
+	"net/url"
+
+	"github.com/android-sms-gateway/core/redis"
+	pkgpubsub "github.com/android-sms-gateway/server/pkg/pubsub"
+)
 
- type PubSub = pubsub.PubSub
+ type PubSub = pkgpubsub.PubSub

Also applies to: 15-15


27-29: Only set buffer size when > 0 (avoid forcing zero/unbuffered).

Let backends keep their default when config.BufferSize == 0.

-opts := []pubsub.Option{}
-opts = append(opts, pubsub.WithBufferSize(config.BufferSize))
+opts := []pkgpubsub.Option{}
+if config.BufferSize > 0 {
+	opts = append(opts, pkgpubsub.WithBufferSize(config.BufferSize))
+}

30-41: Support rediss (TLS) scheme.

Common Redis URLs use rediss://. Treat it like redis.

-switch u.Scheme {
+switch u.Scheme {
 case "memory":
-	return pubsub.NewMemory(opts...), nil
+	return pkgpubsub.NewMemory(opts...), nil
-case "redis":
+case "redis", "rediss":
 	client, err := redis.New(redis.Config{URL: config.URL})
 	if err != nil {
 		return nil, fmt.Errorf("can't create redis client: %w", err)
 	}
-	return pubsub.NewRedis(client, topicPrefix, opts...), nil
+	return pkgpubsub.NewRedis(client, topicPrefix, opts...), nil
internal/sms-gateway/modules/events/types.go (1)

9-18: Unify field name with push/SSE types for consistency.

Consider renaming EventType -> Type (json:"event") to match push/sse.Event and reduce mental mapping across modules. If changed, adjust callers accordingly.

internal/sms-gateway/modules/events/service.go (1)

104-105: Avoid shadowing the imported package name.

Rename local variable from devices to devs to keep readability.

-	devices, err := s.deviceSvc.Select(wrapper.UserID, filters...)
+	devs, err := s.deviceSvc.Select(wrapper.UserID, filters...)
@@
-	if len(devices) == 0 {
+	if len(devs) == 0 {
@@
-	for _, device := range devices {
+	for _, device := range devs {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 01f4557 and ea7dce5.

📒 Files selected for processing (4)
  • internal/sms-gateway/modules/events/metrics.go (1 hunks)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/pubsub/pubsub.go (3)
internal/config/config.go (2)
  • PubSub (79-81)
  • Config (10-19)
internal/sms-gateway/pubsub/config.go (1)
  • Config (4-7)
pkg/pubsub/options.go (1)
  • WithBufferSize (17-21)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (15-15)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/metrics.go (6)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
  • DeliveryTypeSSE (19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Benchmark
  • GitHub Check: Analyze (go)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ea7dce5 and e85bff2.

📒 Files selected for processing (1)
  • internal/sms-gateway/modules/events/module.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/sms-gateway/modules/events/module.go (1)
internal/sms-gateway/modules/events/service.go (3)
  • Service (19-30)
  • s (59-69)
  • deviceSvc (13-24)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Benchmark
  • GitHub Check: E2E
  • GitHub Check: Test
  • GitHub Check: Analyze (go)

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from a348cb4 to 4e59e2f Compare October 16, 2025 01:42
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/sms-gateway/modules/events/service.go (1)

101-146: Validate wrapper.Event to prevent nil-pointer dereference.

The processEvent function accesses wrapper.Event.EventType and wrapper.Event.Data (lines 124-125, 137-138) without checking if wrapper.Event is nil. If deserialization produces a wrapper with a nil Event field, this will cause a panic.

Add validation at the start of processEvent:

 func (s *Service) processEvent(wrapper *eventWrapper) {
+	if wrapper.Event == nil {
+		s.logger.Error("Event wrapper has nil event", zap.String("user_id", wrapper.UserID))
+		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
+		return
+	}
+
 	// Load devices from database
 	filters := []devices.SelectFilter{}
pkg/cache/redis.go (1)

191-197: Bug: HExpireAt missing field argument (compile/runtime error).

HExpireAt requires the field; here it’s called without it. This will fail to compile or misbehave and leaves entries without TTL.

Apply this fix:

-  if !options.validUntil.IsZero() {
-    if err := r.client.HExpireAt(ctx, r.key, options.validUntil).Err(); err != nil {
-      return fmt.Errorf("can't set cache item ttl: %w", err)
-    }
-  }
+  if !options.validUntil.IsZero() {
+    if err := r.client.HExpireAt(ctx, r.key, options.validUntil, key).Err(); err != nil {
+      return fmt.Errorf("can't set cache item ttl: %w", err)
+    }
+  }
🧹 Nitpick comments (2)
pkg/cache/redis.go (1)

175-197: Consider atomic SetNX + TTL in one step.

Today, HSetNX then HExpireAt leaves a small window where the field exists without TTL. If you need strict atomicity, wrap both in a Lua script.

I can provide a compact script that performs HSETNX and, only on insert, applies HEXPIREAT in one EVAL. Interested?

internal/sms-gateway/cache/factory.go (1)

34-54: Support rediss:// URLs.

Switch only accepts "redis". Consider handling "rediss" (TLS) the same way.

Example:

-  switch u.Scheme {
-  case "memory":
+  switch u.Scheme {
+  case "memory":
     ...
-  case "redis":
+  case "redis", "rediss":
     ...
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a348cb4 and 4e59e2f.

📒 Files selected for processing (10)
  • internal/sms-gateway/cache/factory.go (1 hunks)
  • internal/sms-gateway/modules/events/metrics.go (1 hunks)
  • internal/sms-gateway/modules/events/module.go (1 hunks)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
  • internal/sms-gateway/modules/messages/service.go (2 hunks)
  • internal/sms-gateway/modules/settings/service.go (1 hunks)
  • internal/sms-gateway/modules/webhooks/service.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
  • pkg/cache/redis.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/sms-gateway/modules/messages/service.go
  • internal/sms-gateway/modules/webhooks/service.go
  • internal/sms-gateway/modules/settings/service.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-10T23:53:42.006Z
Learnt from: capcom6
PR: android-sms-gateway/server#178
File: pkg/cache/redis.go:63-75
Timestamp: 2025-09-10T23:53:42.006Z
Learning: In pkg/cache/redis.go, the Redis cache implementation uses Redis's native field-level TTL via HExpireAt, meaning expired fields are automatically removed by Redis itself. This differs from the memory implementation where the application tracks expiration times. As a result, operations like Drain that call HGetAll will only return non-expired items since expired fields are already removed by Redis.

Applied to files:

  • pkg/cache/redis.go
🧬 Code graph analysis (7)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/metrics.go (7)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • EventTypeUnknown (26-26)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
  • DeliveryTypeSSE (19-19)
internal/sms-gateway/modules/events/metrics.go (1)
internal/sms-gateway/modules/sse/metrics.go (1)
  • m (85-87)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
  • NewRedis (61-86)
  • RedisConfig (37-51)
pkg/cache/redis.go (2)
internal/config/config.go (1)
  • Cache (75-77)
pkg/cache/cache.go (1)
  • Cache (5-36)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
  • Service (19-30)
internal/sms-gateway/app.go (1)
  • Run (62-73)
internal/sms-gateway/pubsub/pubsub.go (4)
internal/config/config.go (2)
  • PubSub (79-81)
  • Config (10-19)
internal/sms-gateway/pubsub/config.go (1)
  • Config (4-7)
pkg/pubsub/options.go (1)
  • WithBufferSize (17-21)
pkg/cache/redis.go (1)
  • RedisConfig (37-51)
🪛 GitHub Check: Lint
internal/sms-gateway/pubsub/pubsub.go

[failure] 37-37:
not enough arguments in call to pubsub.NewRedis


[failure] 33-33:
undefined: pubsub.RedisConfig


[failure] 33-33:
not enough return values

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Test
  • GitHub Check: Benchmark
  • GitHub Check: E2E
  • GitHub Check: Analyze (go)
🔇 Additional comments (7)
internal/sms-gateway/modules/events/module.go (1)

17-28: LGTM! Well-structured error handling.

The updated fx.Invoke properly injects logger and shutdowner dependencies, and the goroutine correctly handles service errors by logging and triggering graceful shutdown with an exit code.

internal/sms-gateway/modules/events/metrics.go (1)

22-26: LGTM! Constants align with pub/sub integration.

The new failure reason constants (serialization_error, publish_error) and EventTypeUnknown appropriately categorize failures in the pub/sub-based event flow, while removing the now-unused FailureReasonQueueFull.

internal/sms-gateway/modules/events/types.go (2)

10-11: LGTM! Public fields enable JSON serialization.

The Event struct now has public fields with appropriate JSON tags, supporting the pub/sub payload serialization requirements.


27-33: LGTM! Standard serialization methods.

The serialize/deserialize methods correctly use json.Marshal and json.Unmarshal for converting eventWrapper to/from JSON bytes.

internal/sms-gateway/modules/events/service.go (1)

79-99: LGTM! Subscription loop handles channel closure correctly.

The receive loop properly checks the ok value from the channel receive operation (line 85) and gracefully returns when the subscription channel closes (lines 86-89). Deserialization errors are correctly logged and counted without exposing PII.

pkg/cache/redis.go (2)

103-122: Drain script LGTM; atomic and returns only non‑expired fields.

EVAL with HGETALL + UNLINK/DEL is atomic and, with field TTLs, returns only live entries. Good alignment with Redis field‑TTL semantics.

Based on learnings


160-166: Pipelined HSET + HExpireAt is correct.

Efficient and reduces round trips; TTL set only when configured. Looks good.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
pkg/pubsub/redis.go (1)

45-56: Document Client/URL precedence behavior.

When both Client and URL are provided in RedisConfig, the Client takes precedence and URL is silently ignored (lines 49-56 only execute when Client == nil). This precedence should be documented in the RedisConfig struct or NewRedis function comments to avoid confusion.

Consider adding a comment to the RedisConfig struct:

 type RedisConfig struct {
 	// Client is the Redis client to use.
-	// If nil, a client is created from the URL.
+	// If nil, a client is created from the URL.
+	// If both Client and URL are provided, Client takes precedence.
 	Client *redis.Client
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e59e2f and dcb7002.

📒 Files selected for processing (1)
  • pkg/pubsub/redis.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/pubsub.go (5)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • Message (13-16)
  • PubSub (45-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: E2E
  • GitHub Check: Analyze (go)
🔇 Additional comments (2)
pkg/pubsub/redis.go (2)

99-104: Resource leak fixed - LGTM!

The subscription error path now properly closes the Redis subscription before returning, addressing the resource leak flagged in previous reviews. The use of errors.Join to combine both errors is appropriate.


117-126: Cleanup logic is well-structured.

The deferred cleanup properly closes the Redis subscription, closes the channel, removes the subscriber from the registry, and signals completion via wg.Done(). Ignoring the error from ps.Close() is acceptable in the cleanup path.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
pkg/pubsub/redis.go (2)

75-87: Document non-blocking Publish behavior.

This is a previously identified issue that remains unaddressed. Redis Publish is fire-and-forget (returns immediately after sending to Redis), while the memory implementation blocks until all subscribers receive. Add a comment documenting this behavioral difference.


160-171: Fix resource leak and align Close behavior with memory implementation.

Two issues:

  1. Resource leak (new issue): When NewRedis creates a Redis client from URL (line 56), that client is never closed. This leaks connections and file descriptors.

  2. Inconsistent behavior (duplicate from past review): Returns nil on repeated calls, but memory.go returns ErrPubSubClosed. This behavioral inconsistency across implementations could surprise users.

To fix the resource leak, track client ownership:

 type redisPubSub struct {
 	prefix     string
 	bufferSize uint
 
 	client *redis.Client
+	ownsClient bool
 
 	wg          sync.WaitGroup
 	mu          sync.Mutex
 	subscribers map[string]context.CancelFunc
 	closeCh     chan struct{}
 }

In NewRedis, set ownership:

 	if config.Client == nil {
 		opt, err := redis.ParseURL(config.URL)
 		if err != nil {
 			return nil, fmt.Errorf("failed to parse redis url: %w", err)
 		}
 
 		config.Client = redis.NewClient(opt)
+		return &redisPubSub{
+			// ... other fields ...
+			client: config.Client,
+			ownsClient: true,
+		}, nil
 	}
 
 	return &redisPubSub{
-		prefix:     config.Prefix,
-		bufferSize: o.bufferSize,
-
-		client: config.Client,
-
-		subscribers: make(map[string]context.CancelFunc),
-		closeCh:     make(chan struct{}),
+		// ... other fields ...
+		client: config.Client,
+		ownsClient: false,
 	}, nil

In Close, close the client if owned:

 func (r *redisPubSub) Close() error {
 	select {
 	case <-r.closeCh:
-		return nil
+		return ErrPubSubClosed
 	default:
 		close(r.closeCh)
 	}
 
 	r.wg.Wait()
 
+	if r.ownsClient {
+		return r.client.Close()
+	}
+
 	return nil
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dcb7002 and 0f03caa.

📒 Files selected for processing (3)
  • internal/sms-gateway/cache/factory.go (1 hunks)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • pkg/pubsub/redis.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/sms-gateway/cache/factory.go
🧰 Additional context used
🧬 Code graph analysis (2)
pkg/pubsub/redis.go (2)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/pubsub.go (5)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • Message (13-16)
  • PubSub (45-50)
internal/sms-gateway/modules/events/service.go (3)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/events/metrics.go (7)
  • EventTypeUnknown (26-26)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
  • DeliveryTypeSSE (19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: E2E
  • GitHub Check: Analyze (go)
🔇 Additional comments (6)
internal/sms-gateway/modules/events/service.go (4)

15-17: LGTM: Good practice to use a constant for the pub/sub topic.

The constant makes the topic name maintainable and follows DRY principles.


32-44: LGTM: Clean dependency injection.

The pub/sub dependency is properly injected through the constructor and stored in the Service struct.


77-104: LGTM: Proper subscription lifecycle management.

The method correctly:

  • Checks for subscription errors and returns early
  • Defers subscription closure
  • Handles the closed channel case with the ok check
  • Avoids PII leakage by not logging the full message payload
  • Returns cleanly on context cancellation or channel close

This addresses the previous review comments about channel handling and PII logging.


106-151: LGTM: Proper event processing with metrics.

The method correctly:

  • Handles device selection with appropriate filters
  • Implements fallback from push to SSE for devices without push tokens
  • Increments metrics for both success and failure cases on each delivery path
  • Logs errors with relevant context (user_id, device_id) without exposing sensitive event data
pkg/pubsub/redis.go (2)

100-105: LGTM! Subscription confirmation correctly implemented.

The code now properly waits for the Redis subscription acknowledgment using ps.Receive(ctx) before returning the subscription. The error handling on lines 103-104 correctly closes the subscription and combines both errors using errors.Join, which is an improvement over simply ignoring the close error.


117-155: LGTM! Goroutine correctly handles cleanup and prevents deadlock.

The message forwarding goroutine is well-structured:

  • Proper cleanup in defer (closes subscription, channel, removes from map)
  • Responds to close signals (r.closeCh and subCtx.Done())
  • The nested select (lines 143-152) prevents deadlock by checking close signals while attempting to send messages

This addresses the previously identified deadlock concern.

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from af41637 to 2038311 Compare October 16, 2025 06:45
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/cache/redis.go (1)

194-198: Fix TTL handling in SetOrFail.

When a TTL is supplied, this path calls HExpireAt without the hash field argument, so the command targets the whole hash (or fails with a Redis error). That breaks per-entry TTL semantics for SetOrFail, unlike the Set path which passes the field. Add the field argument here as well.

Apply this diff:

-	if err := r.client.HExpireAt(ctx, r.key, options.validUntil).Err(); err != nil {
+	if err := r.client.HExpireAt(ctx, r.key, options.validUntil, key).Err(); err != nil {
♻️ Duplicate comments (2)
internal/sms-gateway/modules/events/service.go (2)

47-49: Add metric increment for validation failure.

The validation check for empty EventType is good, but it should increment a failure metric to maintain observability consistency with other error paths in this function.

Apply this diff to add the metric increment:

 	if event.EventType == "" {
+		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
 		return fmt.Errorf("event type is empty")
 	}

Note: Past review comments suggest using FailureReasonValidationError instead of FailureReasonSerializationError for better semantic clarity, but that would require adding the new constant first.


51-52: Accept parent context instead of using Background().

Using context.Background() ignores caller cancellation or timeout signals, potentially causing operations to hang for 5 seconds even when the caller has been cancelled. This can lead to resource leaks and degraded responsiveness during shutdown.

Add a context.Context parameter to Notify and use it as the parent context:

-func (s *Service) Notify(userID string, deviceID *string, event Event) error {
+func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error {
 	if event.EventType == "" {
 		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
 		return fmt.Errorf("event type is empty")
 	}

-	subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
 	defer cancel()

 	wrapper := eventWrapper{
 		UserID:   userID,
 		DeviceID: deviceID,
 		Event:    &event,
 	}

 	wrapperBytes, err := wrapper.serialize()
 	if err != nil {
 		s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonSerializationError)
 		return fmt.Errorf("can't serialize event wrapper: %w", err)
 	}

-	if err := s.pubsub.Publish(subCtx, pubsubTopic, wrapperBytes); err != nil {
+	if err := s.pubsub.Publish(pubCtx, pubsubTopic, wrapperBytes); err != nil {
 		s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonPublishError)
 		return fmt.Errorf("can't publish event: %w", err)
 	}

 	s.metrics.IncrementEnqueued(string(event.EventType))

 	return nil
 }

Then update all call sites (e.g., in internal/sms-gateway/modules/messages/service.go lines 233 and 244) to pass a context.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between af41637 and 2038311.

📒 Files selected for processing (18)
  • internal/config/config.go (3 hunks)
  • internal/config/module.go (2 hunks)
  • internal/sms-gateway/app.go (2 hunks)
  • internal/sms-gateway/cache/factory.go (1 hunks)
  • internal/sms-gateway/modules/events/metrics.go (1 hunks)
  • internal/sms-gateway/modules/events/module.go (1 hunks)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
  • internal/sms-gateway/modules/messages/service.go (2 hunks)
  • internal/sms-gateway/modules/settings/service.go (1 hunks)
  • internal/sms-gateway/modules/webhooks/service.go (1 hunks)
  • internal/sms-gateway/pubsub/config.go (1 hunks)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
  • pkg/cache/cache.go (1 hunks)
  • pkg/cache/memory.go (2 hunks)
  • pkg/cache/redis.go (2 hunks)
  • pkg/pubsub/redis.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (9)
  • internal/config/module.go
  • internal/sms-gateway/modules/settings/service.go
  • pkg/cache/memory.go
  • internal/sms-gateway/app.go
  • internal/sms-gateway/pubsub/module.go
  • internal/config/config.go
  • internal/sms-gateway/pubsub/pubsub.go
  • internal/sms-gateway/modules/events/metrics.go
  • pkg/cache/cache.go
🧰 Additional context used
🧬 Code graph analysis (8)
pkg/pubsub/redis.go (3)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/pubsub.go (5)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • Message (13-16)
  • PubSub (45-50)
internal/config/config.go (1)
  • PubSub (79-81)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/modules/messages/service.go (1)
internal/sms-gateway/modules/events/events.go (1)
  • NewMessageEnqueuedEvent (9-11)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
  • Service (19-30)
internal/sms-gateway/app.go (1)
  • Run (62-73)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/metrics.go (7)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • EventTypeUnknown (26-26)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
  • DeliveryTypeSSE (19-19)
pkg/cache/redis.go (2)
pkg/cache/cache.go (1)
  • Cache (5-40)
internal/config/config.go (1)
  • Cache (75-77)
internal/sms-gateway/modules/webhooks/service.go (1)
internal/sms-gateway/modules/events/events.go (1)
  • NewWebhooksUpdatedEvent (13-15)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
  • NewRedis (62-89)
  • RedisConfig (37-51)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: E2E
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: Analyze (go)
🔇 Additional comments (9)
internal/sms-gateway/modules/events/module.go (1)

17-36: LGTM! Error handling and shutdown flow are now correct.

The lifecycle hooks properly capture errors from svc.Run, log them, and trigger graceful shutdown via fx.Shutdowner. The OnStop hook correctly cancels the context.

internal/sms-gateway/modules/messages/service.go (2)

232-236: LGTM! Correct adaptation to non-pointer Event parameter.

The dereference of events.NewMessageEnqueuedEvent() aligns with the updated Notify signature that now accepts Event by value. The goroutine safely captures userID and deviceID by value and logs errors appropriately.


242-244: LGTM! Correct adaptation to non-pointer Event parameter.

The dereference of events.NewMessagesExportRequestedEvent() aligns with the updated Notify signature that now accepts Event by value.

internal/sms-gateway/modules/events/service.go (6)

15-17: LGTM! Topic constant is well-defined.

The pubsubTopic constant provides a centralized definition for the events topic used across Pub/Sub operations.


32-44: LGTM! Constructor correctly wires Pub/Sub dependency.

The NewService constructor now includes the pubsub.PubSub parameter and properly initializes the service with all required dependencies.


54-64: LGTM! Wrapper creation and serialization error handling are correct.

The wrapper construction and serialization with proper error handling and metrics tracking look good.


66-74: LGTM! Pub/Sub publishing with proper error handling.

The publish operation correctly handles errors, tracks metrics, and returns descriptive error messages.


76-103: LGTM! Run loop correctly handles subscription lifecycle and closed channels.

The subscription creation, message processing loop, and closed-channel check (msg, ok := <-ch) are all correctly implemented. Deserialization failures properly increment metrics and continue processing.


105-150: LGTM! Event processing logic is sound.

The processEvent function correctly dereferences wrapper.Event fields, routes to push or SSE services based on device configuration, and tracks metrics for both success and failure paths.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
internal/sms-gateway/modules/events/types.go (2)

9-12: Extract a shared Event struct
The Event type is defined three times (in modules/events/types.go, modules/sse/types.go, and modules/push/types/types.go with an alias in push/types.go), varying only by field names/tags. Consolidate into one common definition—or document why they must differ—to eliminate duplication.


9-12: Unify JSON tag for Event.Type across modules.
internal/sms-gateway/modules/events/types.go uses json:"event_type", but internal/sms-gateway/modules/sse/types.go uses json:"event". Align these tags (e.g. both event_type or both event) to avoid serialization mismatches.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2038311 and 7257958.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (3)
  • go.mod (0 hunks)
  • internal/sms-gateway/modules/events/events.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
💤 Files with no reviewable changes (1)
  • go.mod
🧰 Additional context used
🧬 Code graph analysis (2)
internal/sms-gateway/modules/events/events.go (4)
internal/sms-gateway/modules/events/types.go (2)
  • Event (9-12)
  • NewEvent (14-19)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: Lint
  • GitHub Check: Analyze (go)
🔇 Additional comments (5)
internal/sms-gateway/modules/events/events.go (1)

9-29: LGTM! Return type changes align with the Event value type.

The constructor functions now correctly return Event by value instead of *Event, consistent with the updated Event type definition in types.go. The logic remains unchanged and correct.

internal/sms-gateway/modules/events/types.go (4)

4-5: LGTM! Import needed for serialization.

The encoding/json import is correctly added to support the new serialize() and deserialize() methods.


14-19: LGTM! NewEvent correctly returns Event by value.

The function now returns Event by value with exported fields properly initialized, aligning with the struct changes.


27-33: LGTM! Serialization methods are correctly implemented.

The serialize() and deserialize() methods provide straightforward JSON marshaling/unmarshaling functionality for pub/sub transmission.


21-25: Event pointer usage is correct and consistent with JSON serialization requirements.

The Event field pointer usage is appropriate here. The eventWrapper is exclusively used for JSON serialization/deserialization via pub/sub (lines 27-32 in types.go), where the omitempty tag requires a pointer type. The code consistently creates the wrapper with &event (line 54-58 in service.go), and deserialization properly uses pointer receivers (lines 94-95 in service.go). No inconsistency issues found.

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from 7257958 to 5e5ee38 Compare October 16, 2025 11:53
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
internal/sms-gateway/modules/events/service.go (2)

47-49: Add metrics increment for validation failure.

The validation check correctly rejects empty event types, but it doesn't increment failure metrics as requested in the past review. This makes validation failures invisible in observability.

Add the metrics call (using the validation error constant from metrics.go):

 	if event.EventType == "" {
+		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonValidationError)
 		return fmt.Errorf("event type is empty")
 	}

51-52: Accept parent context to respect caller cancellation.

Using context.Background() ignores caller cancellation/timeout, potentially causing operations to hang for 5 seconds even when the caller has been cancelled. This can lead to resource leaks and degraded responsiveness during shutdown.

Update the signature to accept a context parameter:

-func (s *Service) Notify(userID string, deviceID *string, event Event) error {
+func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error {
 	if event.EventType == "" {
 		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonValidationError)
 		return fmt.Errorf("event type is empty")
 	}

-	subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
 	defer cancel()

Then update all callers to pass an appropriate context.

🧹 Nitpick comments (1)
pkg/cache/cache.go (1)

37-39: Document possible Close() errors in Cache interface. Update the Close() comment in pkg/cache/cache.go (lines 37–39) to describe when it can return errors (e.g. underlying Redis client.Close failures during connection shutdown).

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7257958 and 5e5ee38.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (11)
  • go.mod (0 hunks)
  • internal/sms-gateway/cache/factory.go (1 hunks)
  • internal/sms-gateway/modules/events/events.go (2 hunks)
  • internal/sms-gateway/modules/events/metrics.go (1 hunks)
  • internal/sms-gateway/modules/events/module.go (1 hunks)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
  • pkg/cache/cache.go (1 hunks)
  • pkg/cache/memory.go (2 hunks)
  • pkg/cache/redis.go (2 hunks)
💤 Files with no reviewable changes (1)
  • go.mod
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/sms-gateway/pubsub/pubsub.go
  • internal/sms-gateway/cache/factory.go
  • pkg/cache/redis.go
🧰 Additional context used
🧬 Code graph analysis (5)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
  • Service (19-30)
internal/sms-gateway/app.go (1)
  • Run (62-73)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/events.go (4)
internal/sms-gateway/modules/events/types.go (2)
  • Event (9-12)
  • NewEvent (14-19)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
pkg/cache/memory.go (2)
internal/sms-gateway/cache/factory.go (1)
  • Cache (14-14)
pkg/cache/cache.go (1)
  • Cache (5-40)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/metrics.go (7)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • EventTypeUnknown (26-26)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
  • DeliveryTypeSSE (19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Test
  • GitHub Check: Benchmark
  • GitHub Check: E2E
  • GitHub Check: Analyze (go)
🔇 Additional comments (8)
pkg/cache/memory.go (2)

172-176: LGTM: Close() implementation and compile-time assertion.

The Close() implementation correctly returns nil since an in-memory cache has no external resources to clean up. The compile-time assertion var _ Cache = (*memoryCache)(nil) is good practice and ensures memoryCache properly implements the Cache interface.


16-16: Returning memoryCache remains a valid, unexported implementation of Cache
*memoryCache still implements the Cache interface, callers can assign it to Cache without issue, and internal fields stay unexported.

Likely an incorrect or invalid review comment.

internal/sms-gateway/modules/events/module.go (1)

17-28: LGTM! Error handling properly implemented.

The lifecycle hook correctly handles errors from svc.Run, logs them, and initiates graceful shutdown with a non-zero exit code. The nested error handling for shutdown failures is also appropriate.

internal/sms-gateway/modules/events/events.go (1)

9-29: LGTM! Consistent value semantics.

The constructors now return Event by value, which is appropriate for small structs and aligns with the updated NewEvent signature in types.go. This eliminates unnecessary pointer allocations.

internal/sms-gateway/modules/events/types.go (1)

4-19: LGTM! JSON serialization properly configured.

The Event struct now has exported fields with appropriate JSON tags, and NewEvent correctly returns a value type. The serialization changes align well with the Pub/Sub integration requirements.

internal/sms-gateway/modules/events/service.go (3)

32-44: LGTM! Constructor properly updated.

The NewService constructor now accepts the pubsub.PubSub dependency, which is correctly stored in the service struct for use by Notify and Run methods.


60-71: LGTM! Serialization and publish error handling is robust.

The code properly:

  • Serializes the event wrapper with error handling
  • Increments appropriate metrics for serialization failures
  • Publishes to Pub/Sub with error handling
  • Increments appropriate metrics for publish failures
  • Increments enqueued metrics on success
  • Wraps errors with context using fmt.Errorf with %w

76-103: LGTM! Subscription lifecycle properly managed.

The Run method correctly:

  • Subscribes to the Pub/Sub topic
  • Defers subscription cleanup
  • Handles context cancellation
  • Checks for closed channels to avoid tight loops
  • Increments metrics on deserialization failures
  • Logs errors without PII

Note: Validation for nil wrapper.Event should be added as mentioned in the review of types.go (lines 21-33) to prevent panics in processEvent.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
internal/sms-gateway/modules/events/service.go (1)

51-52: Accept parent context instead of using Background().

Using context.Background() ignores any caller's cancellation or timeout. This issue was previously flagged but remains unaddressed.

-func (s *Service) Notify(userID string, deviceID *string, event Event) error {
+func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error {
 	if event.EventType == "" {
 		return fmt.Errorf("event type is empty")
 	}

-	subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
 	defer cancel()
🧹 Nitpick comments (1)
internal/sms-gateway/modules/events/service.go (1)

47-49: Consider incrementing metrics for validation failures.

The validation error here doesn't increment failed metrics, while serialization (line 62) and publish errors (line 67) do. For consistency and observability, consider adding a metrics call similar to other error paths.

 	if event.EventType == "" {
+		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonValidationError)
 		return fmt.Errorf("event type is empty")
 	}

Note: You would need to add the FailureReasonValidationError constant to metrics.go if not already present.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e5ee38 and 2473277.

📒 Files selected for processing (2)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/sms-gateway/modules/events/types.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/sms-gateway/modules/events/service.go (6)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/metrics.go (7)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • EventTypeUnknown (26-26)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
  • DeliveryTypeSSE (19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Benchmark
  • GitHub Check: E2E
  • GitHub Check: Test
  • GitHub Check: Analyze (go)
🔇 Additional comments (2)
internal/sms-gateway/modules/events/service.go (2)

76-103: LGTM! Pub/Sub integration properly implemented.

The Run function correctly:

  • Subscribes to the pubsub topic with proper error handling
  • Handles channel close with the ok flag (line 89-93)
  • Avoids logging PII by only logging the error (line 97)
  • Processes events through the wrapper pattern

The subscription lifecycle and error handling are well-implemented.


54-58: No issues found. The Event is stored by value in eventWrapper (line 24 in types.go), which is intentional for immutability guarantees. The struct is small (~24 bytes: enum + map reference), so copying cost is negligible. JSON serialization handles value semantics correctly. This design choice is consistent with the serialize/deserialize implementation and prevents unintended modifications to events after wrapping.

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from 2473277 to adcbf48 Compare October 17, 2025 02:30
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/cache/redis.go (1)

156-201: <!-- [scratchpad]
[task final_analysis]
[observations]

  • go-redis v9.9.0 is deployed; HExpireAt fully supported since v9.6.0
  • HExpireAt calls in lines 166 and 195 are VALID Redis 7.4+ commands
  • Lua scripts don't check expiry; rely on Redis auto-removal of expired fields
  • Get() returns ErrKeyNotFound for both missing and expired keys (indistinguishable)
  • GetAndDelete() Lua script returns false (mapped to ErrKeyNotFound) when field absent or expired
  • Interface contract (cache.go:15) requires ErrKeyExpired to be returned for expired keys
    [analysis]
  • Review comment's PRIMARY claim (HExpireAt doesn't exist) is INCORRECT
  • Review comment's SECONDARY claim (ErrKeyExpired not returned) is VALID but for different reason
  • With Redis 7.4 auto-expiry of hash fields, expired fields are auto-deleted by Redis
  • This makes HGet return Nil for both expired and missing keys (semantically indistinguishable at API level)
  • Current implementation violates interface contract which requires distinguishing expiry from non-existence
  • The implementation relies on transparent auto-expiry but the interface doesn't support this model
    [pending]
  • None; sufficient data to rewrite comment
    [done]
  • Verified HExpireAt exists and is properly used
  • Identified true semantic issue: interface contract violation for ErrKeyExpired distinction

Fix missing ErrKeyExpired distinction in Get() and GetAndDelete()—interface contract requires distinguishing expired from non-existent keys.

With Redis 7.4 auto-expiring hash fields, expired entries are transparently removed by Redis before retrieval. This makes HGet return Nil for both expired and missing keys, but the Cache interface contract (cache.go:15) requires returning ErrKeyExpired when a key has expired. Currently both Get() (line 128) and GetAndDelete() (line 142) return ErrKeyNotFound in both cases, violating the interface.

Solutions:

  • A) Abandon per-field TTL; store each entry as a separate key with SET/GET/DEL+EX/EXAT for whole-key expiry, making expiration visible to the client.
  • B) Keep the hash + add a sidecar sorted set (e.g., key + ":exp") tracking expiry times. In Get/GetAndDelete, use Lua to check if ZSCORE shows expiry < now; return ErrKeyExpired and clean up if so. Adjust Cleanup() and Drain() to also manage the sidecar.
  • C) Clarify the interface: if auto-expiry is transparent, document that Get/GetAndDelete return ErrKeyNotFound for both missing and expired keys, and adjust memory.go to match (currently it returns ErrKeyExpired).

Note: HExpireAt (lines 166, 195) is correctly implemented—go-redis v9.9.0 fully supports this command.

♻️ Duplicate comments (2)
internal/sms-gateway/modules/events/metrics.go (1)

22-26: Add validation-specific failure reason constant.

The validation check in service.go (lines 47-49) should increment metrics with a distinct validation failure reason instead of lumping validation failures with serialization errors in metrics analysis.

Apply this diff to add the constant:

 	FailureReasonSerializationError = "serialization_error"
 	FailureReasonPublishError       = "publish_error"
+	FailureReasonValidationError    = "validation_error"
 	FailureReasonProviderFailed     = "provider_failed"

Then update service.go line 48 to increment metrics:

if event.EventType == "" {
	s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonValidationError)
	return fmt.Errorf("event type is empty")
}
internal/sms-gateway/modules/events/service.go (1)

51-52: Accept parent context to respect caller cancellation.

Using context.Background() ignores any caller's cancellation or timeout, potentially causing operations to hang for 5 seconds even if the caller has been cancelled. This can lead to resource leaks and degraded responsiveness during shutdown or request cancellation.

Add a context.Context parameter to Notify:

-func (s *Service) Notify(userID string, deviceID *string, event Event) error {
+func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error {
 	if event.EventType == "" {
 		return fmt.Errorf("event type is empty")
 	}
 
-	subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
 	defer cancel()
 
 	wrapper := eventWrapper{
 		UserID:   userID,
 		DeviceID: deviceID,
 		Event:    event,
 	}
 
 	wrapperBytes, err := wrapper.serialize()
 	if err != nil {
 		s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonSerializationError)
 		return fmt.Errorf("can't serialize event wrapper: %w", err)
 	}
 
-	if err := s.pubsub.Publish(subCtx, pubsubTopic, wrapperBytes); err != nil {
+	if err := s.pubsub.Publish(pubCtx, pubsubTopic, wrapperBytes); err != nil {
 		s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonPublishError)
 		return fmt.Errorf("can't publish event: %w", err)
 	}
 
 	s.metrics.IncrementEnqueued(string(event.EventType))
 
 	return nil
 }
🧹 Nitpick comments (4)
pkg/cache/redis.go (3)

62-89: Constructor refactor is solid; consider returning the interface and fail-fast ping.

Validation, URL parsing, prefix normalization, and ownership look good. Prefer returning Cache to avoid leaking the concrete type, and optionally Ping to fail fast on bad URLs.

Apply:

-func NewRedis(config RedisConfig) (*redisCache, error) {
+func NewRedis(config RedisConfig) (Cache, error) {
   ...
-  return &redisCache{
+  return &redisCache{
     client:      client,
     ownedClient: config.Client == nil,
     key:         config.Prefix + redisCacheKey,
     ttl:         config.TTL,
   }, nil
}

Optionally, after creating the client:

// _ := client.Ping(context.Background()).Err() // return wrapped error if non-nil

36-51: Clarify TTL semantics in the config.

Document that TTL=0 disables per-entry expiry, and how overrides via options interact with TTL. This helps set expectations across memory and Redis backends.


203-209: Make Close idempotent and concurrency-safe.

Multiple/concurrent Close calls can return errors from the underlying client. Guard with sync.Once or atomically nil out the client after the first close.

Example:

+import "sync"
 type redisCache struct {
   client      *redis.Client
   ownedClient bool
+  closeOnce   sync.Once
   key         string
   ttl         time.Duration
 }
 
 func (r *redisCache) Close() error {
-  if r.ownedClient {
-    return r.client.Close()
-  }
-  return nil
+  var err error
+  if r.ownedClient && r.client != nil {
+    r.closeOnce.Do(func() { err = r.client.Close(); r.client = nil })
+  }
+  return err
 }
pkg/pubsub/options.go (1)

9-15: *Simplify: apply doesn’t need to return options.

Return value is unused; remove it to reduce noise.

-func (o *options) apply(opts ...Option) *options {
+func (o *options) apply(opts ...Option) {
 	for _, opt := range opts {
 		opt(o)
 	}
-
-	return o
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2473277 and adcbf48.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (20)
  • go.mod (0 hunks)
  • internal/config/config.go (3 hunks)
  • internal/config/module.go (2 hunks)
  • internal/sms-gateway/app.go (2 hunks)
  • internal/sms-gateway/cache/factory.go (1 hunks)
  • internal/sms-gateway/modules/events/events.go (2 hunks)
  • internal/sms-gateway/modules/events/metrics.go (1 hunks)
  • internal/sms-gateway/modules/events/module.go (1 hunks)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
  • internal/sms-gateway/pubsub/config.go (1 hunks)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
  • pkg/cache/cache.go (1 hunks)
  • pkg/cache/memory.go (2 hunks)
  • pkg/cache/redis.go (2 hunks)
  • pkg/pubsub/memory.go (1 hunks)
  • pkg/pubsub/options.go (1 hunks)
  • pkg/pubsub/pubsub.go (1 hunks)
  • pkg/pubsub/redis.go (1 hunks)
💤 Files with no reviewable changes (1)
  • go.mod
🚧 Files skipped from review as they are similar to previous changes (12)
  • internal/sms-gateway/pubsub/config.go
  • internal/config/module.go
  • internal/sms-gateway/pubsub/module.go
  • pkg/cache/cache.go
  • internal/config/config.go
  • pkg/pubsub/redis.go
  • pkg/pubsub/pubsub.go
  • internal/sms-gateway/modules/events/module.go
  • internal/sms-gateway/modules/events/types.go
  • internal/sms-gateway/pubsub/pubsub.go
  • internal/sms-gateway/cache/factory.go
  • pkg/cache/memory.go
🧰 Additional context used
🧬 Code graph analysis (5)
internal/sms-gateway/modules/events/service.go (3)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/events/metrics.go (7)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • EventTypeUnknown (26-26)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
  • DeliveryTypeSSE (19-19)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
  • Message (13-16)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • PubSub (45-50)
pkg/pubsub/options.go (1)
  • Option (3-3)
internal/sms-gateway/modules/events/events.go (3)
internal/sms-gateway/modules/events/types.go (2)
  • Event (9-12)
  • NewEvent (14-19)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/app.go (2)
internal/config/module.go (1)
  • Module (22-131)
internal/sms-gateway/pubsub/module.go (1)
  • Module (11-26)
pkg/cache/redis.go (2)
pkg/cache/cache.go (1)
  • Cache (5-40)
internal/config/config.go (1)
  • Cache (75-77)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: E2E
  • GitHub Check: Analyze (go)
🔇 Additional comments (8)
internal/sms-gateway/modules/events/service.go (2)

76-103: LGTM! Pub/Sub integration is well-implemented.

The Run function correctly:

  • Subscribes to the Pub/Sub topic with proper error handling
  • Handles channel closure (lines 89-93)
  • Increments deserialization failure metrics with safe labels (line 96)
  • Avoids logging sensitive payload data (line 97)

The past review concerns about channel handling and PII logging have been addressed.


105-150: LGTM! Event processing is correctly instrumented.

The processEvent function properly:

  • Handles both push and SSE delivery paths
  • Increments success and failure metrics with appropriate labels
  • Uses the wrapper's event type and data consistently

The implementation is clean and well-structured.

internal/sms-gateway/modules/events/events.go (1)

9-29: LGTM! Value semantics for Event is appropriate.

The change from pointer to value return type aligns with the Notify() method signature in internal/sms-gateway/modules/events/service.go:46, which expects event Event (value type). All call sites across settings/service.go, messages/service.go, and webhooks/service.go correctly pass Event values directly to Notify() with no incompatibilities.

pkg/cache/redis.go (1)

211-211: Nice interface assertion.

Compile-time check is helpful.

internal/sms-gateway/app.go (1)

24-24: LGTM: pubsub module wired into app.

Import and Module() registration order looks fine relative to config and events.

Also applies to: 49-49

pkg/pubsub/options.go (1)

5-7: ****

The claim that "make(chan T, n) requires n to be int" is incorrect. Go's make() built-in accepts any integer type for the buffer size parameter, including uint. The code as written will compile without errors. Both pkg/pubsub/memory.go:91 and pkg/pubsub/redis.go:112 successfully pass uint values to make(chan Message, ...).

If there is a real type mismatch issue (e.g., config.BufferSize is int but WithBufferSize expects uint), that is a separate concern and should be addressed at the call site, not by changing the buffer size type throughout the codebase.

Likely an incorrect or invalid review comment.

pkg/pubsub/memory.go (2)

112-122: Update subscribe to new subscriber type.

-func (m *memoryPubSub) subscribe(id, topic string, ch chan Message) {
+func (m *memoryPubSub) subscribe(id, topic string, sub subscriber) {
 	m.mu.Lock()
 	defer m.mu.Unlock()
 
 	subscriptions, ok := m.topics[topic]
 	if !ok {
-		subscriptions = make(map[string]chan Message)
+		subscriptions = make(map[string]subscriber)
 		m.topics[topic] = subscriptions
 	}
-	subscriptions[id] = ch
+	subscriptions[id] = sub
 }

Likely an incorrect or invalid review comment.


58-71: Skip canceled subscribers and guard mid-flight cancel in Publish.

Pre-check sub.ctx.Done to avoid blocking, and include it in the send select.

-	for _, ch := range subscribers {
-		wg.Add(1)
-		go func(ch chan Message) {
+	for _, sub := range subscribers {
+		// Skip already-canceled subscribers.
+		select {
+		case <-sub.ctx.Done():
+			continue
+		default:
+		}
+		wg.Add(1)
+		go func(sub subscriber) {
 			defer wg.Done()
 
 			select {
-			case ch <- msg:
+			case sub.ch <- msg:
 			case <-ctx.Done():
 				return
 			case <-m.closeCh:
 				return
+			case <-sub.ctx.Done():
+				// Subscriber canceled mid-flight; drop.
+				return
 			}
-		}(ch)
+		}(sub)
 	}

Likely an incorrect or invalid review comment.

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from adcbf48 to fb29abc Compare October 17, 2025 07:10
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
pkg/pubsub/redis.go (1)

78-90: Document Publish semantics (fire-and-forget vs memory).

Add a note that Redis Publish returns when Redis accepts the message; it does not wait for subscribers to receive it (unlike memory, which blocks until delivery). Helps callers set expectations.

 func (r *redisPubSub) Publish(ctx context.Context, topic string, data []byte) error {
+	// Note: Redis publish is fire-and-forget. It returns once Redis accepts the
+	// message and does not wait for subscribers to receive it. This differs from
+	// the in-memory implementation which blocks until delivery (or context/close).
 	select {
internal/sms-gateway/modules/events/metrics.go (1)

22-26: Add a validation-specific failure reason.

Introduce a dedicated constant to distinguish validation failures from serialization errors in metrics.

 	FailureReasonSerializationError = "serialization_error"
 	FailureReasonPublishError       = "publish_error"
 	FailureReasonProviderFailed     = "provider_failed"
+	FailureReasonValidationError    = "validation_error"
internal/sms-gateway/modules/events/service.go (2)

47-49: Increment failure metrics on validation error.

The validation check on line 47 returns an error but does not increment failure metrics, making it difficult to track validation failures in monitoring. Past review comments flagged this gap.

Apply this diff to increment metrics before returning:

 	if event.EventType == "" {
+		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
 		return fmt.Errorf("event type is empty")
 	}

Note: If a FailureReasonValidationError constant is available in metrics.go, prefer that over FailureReasonSerializationError for clearer failure classification.


51-52: Accept parent context to honor caller cancellation.

Using context.Background() creates a detached context that ignores any cancellation or timeout from the caller. During shutdown or request cancellation, this can cause the publish operation to hang for up to 5 seconds, leading to resource leaks and degraded responsiveness.

Update the Notify signature to accept a context.Context parameter and use it as the parent for the timeout:

-func (s *Service) Notify(userID string, deviceID *string, event Event) error {
+func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error {
 	if event.EventType == "" {
+		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
 		return fmt.Errorf("event type is empty")
 	}

-	subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
 	defer cancel()

Then update all callers of Notify to pass an appropriate context.

🧹 Nitpick comments (4)
pkg/cache/cache.go (2)

39-39: Consider adding context parameter for timeout control.

While Close() error follows the standard io.Closer pattern, operations that close network resources (like Redis connections) could benefit from a context parameter for timeout and cancellation control. Consider whether Close(ctx context.Context) error would better align with the other methods and provide better control over cleanup operations.


37-38: Enhance documentation with idempotency and error behavior.

Consider documenting whether Close() is idempotent (safe to call multiple times) and what specific error conditions callers should expect. This helps consumers understand proper usage patterns.

internal/sms-gateway/pubsub/config.go (1)

3-7: Clarify BufferSize semantics (and consider int).

Document that 0 means unbuffered; consider int for BufferSize to match common make(chan, n) usage and avoid ad-hoc casts elsewhere. Safe to keep uint if you prefer—just be consistent across options/backends.

pkg/pubsub/memory.go (1)

33-36: High fan-out: goroutine-per-subscriber delivery could be costly.

If topics have many subscribers and events are frequent, consider bounded worker pool or direct synchronous sends with timeouts/backpressure to reduce goroutine churn.

If helpful, I can sketch benchmarks to gauge throughput/latency under fan-out.

Also applies to: 55-76

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between adcbf48 and fb29abc.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (20)
  • go.mod (0 hunks)
  • internal/config/config.go (3 hunks)
  • internal/config/module.go (2 hunks)
  • internal/sms-gateway/app.go (2 hunks)
  • internal/sms-gateway/cache/factory.go (1 hunks)
  • internal/sms-gateway/modules/events/events.go (2 hunks)
  • internal/sms-gateway/modules/events/metrics.go (1 hunks)
  • internal/sms-gateway/modules/events/module.go (1 hunks)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
  • internal/sms-gateway/pubsub/config.go (1 hunks)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
  • pkg/cache/cache.go (1 hunks)
  • pkg/cache/memory.go (2 hunks)
  • pkg/cache/redis.go (2 hunks)
  • pkg/pubsub/memory.go (1 hunks)
  • pkg/pubsub/options.go (1 hunks)
  • pkg/pubsub/pubsub.go (1 hunks)
  • pkg/pubsub/redis.go (1 hunks)
💤 Files with no reviewable changes (1)
  • go.mod
🚧 Files skipped from review as they are similar to previous changes (12)
  • pkg/pubsub/options.go
  • internal/sms-gateway/app.go
  • pkg/cache/memory.go
  • internal/sms-gateway/pubsub/pubsub.go
  • internal/config/module.go
  • pkg/cache/redis.go
  • internal/config/config.go
  • pkg/pubsub/pubsub.go
  • internal/sms-gateway/modules/events/module.go
  • internal/sms-gateway/modules/events/types.go
  • internal/sms-gateway/pubsub/module.go
  • internal/sms-gateway/cache/factory.go
🧰 Additional context used
🧬 Code graph analysis (4)
pkg/pubsub/redis.go (3)
pkg/pubsub/options.go (1)
  • Option (3-3)
pkg/pubsub/pubsub.go (5)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • Message (13-16)
  • PubSub (45-50)
internal/config/config.go (1)
  • PubSub (79-81)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
  • Message (13-16)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • PubSub (45-50)
pkg/pubsub/options.go (1)
  • Option (3-3)
internal/sms-gateway/modules/events/events.go (2)
internal/sms-gateway/modules/events/types.go (2)
  • Event (9-12)
  • NewEvent (14-19)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/metrics.go (7)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • EventTypeUnknown (26-26)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
  • DeliveryTypeSSE (19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: E2E
  • GitHub Check: Analyze (go)
🔇 Additional comments (3)
pkg/cache/cache.go (1)

37-39: All implementations of the Cache interface within this codebase have already been updated with the Close() method.

Both memoryCache (pkg/cache/memory.go:172) and redisCache (pkg/cache/redis.go:203) implement Close(), confirmed by compile-time assertions. No additional implementations exist that require updates.

Note: This remains a breaking change for external consumers of the public pkg/cache package, but the PR itself is complete for this codebase.

internal/sms-gateway/modules/events/events.go (1)

9-11: LGTM – value returns align with NewEvent.

Constructors now return Event by value consistently. Please ensure downstream call sites expect values (not pointers).

Also applies to: 13-15, 17-25, 27-29

internal/sms-gateway/modules/events/service.go (1)

76-103: LGTM—past review concerns addressed.

The subscription lifecycle (lines 77-81), closed-channel handling (lines 89-93), and error logging without PII (line 97) correctly address previous review comments. The deserialization error flow with metrics and safe logging is well implemented.

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from a102d73 to 2ffc5a2 Compare October 18, 2025 01:11
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/sms-gateway/cache/factory.go (1)

44-59: Critical: Double prefix bug causes cache key collision.

Line 59 prepends keyPrefix to name before invoking f.new(), then line 47 prepends keyPrefix again. This results in Prefix: keyPrefix + keyPrefix + originalName, breaking cache isolation.

Apply this fix:

 case "redis":
   return &factory{
     new: func(name string) (Cache, error) {
       return cache.NewRedis(cache.RedisConfig{
         Client: nil,
         URL:    config.URL,
-        Prefix: keyPrefix + name,
+        Prefix: name,
         TTL:    0,
       })
     },
   }, nil
♻️ Duplicate comments (3)
internal/sms-gateway/modules/events/metrics.go (1)

22-27: Add a validation-specific failure reason (follow-up to prior review).

Differentiate malformed payloads from serialization errors for cleaner metrics.

 	FailureReasonSerializationError = "serialization_error"
 	FailureReasonPublishError       = "publish_error"
 	FailureReasonProviderFailed     = "provider_failed"
+	FailureReasonValidationError    = "validation_error"

Then use FailureReasonValidationError where inputs fail validation (e.g., empty event_type).

internal/sms-gateway/modules/events/service.go (2)

46-49: Increment metrics on validation failure.

The validation check correctly guards against empty EventType, but doesn't record the failure in metrics. This creates a blind spot in observability.

Apply this fix:

 func (s *Service) Notify(userID string, deviceID *string, event Event) error {
   if event.EventType == "" {
+    s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
     return fmt.Errorf("event type is empty")
   }

51-52: Accept parent context to respect caller cancellation.

Using context.Background() ignores the caller's cancellation and timeout, potentially causing the publish to hang for 5 seconds even when the caller has been cancelled. This degrades system responsiveness during shutdown.

Apply this fix to accept and propagate caller context:

-func (s *Service) Notify(userID string, deviceID *string, event Event) error {
+func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error {
   if event.EventType == "" {
     s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
     return fmt.Errorf("event type is empty")
   }

-  subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+  pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
   defer cancel()
🧹 Nitpick comments (5)
internal/sms-gateway/modules/events/types.go (2)

31-33: Return validation error from deserialize (empty event_type, missing user_id).

Currently nil-pointer risk is gone (Event is a value), but invalid payloads slip through (e.g., event_type empty). Prefer failing fast in deserialize.

 func (w *eventWrapper) deserialize(data []byte) error {
-	return json.Unmarshal(data, w)
+	dec := json.NewDecoder(bytes.NewReader(data))
+	dec.DisallowUnknownFields()
+	if err := dec.Decode(w); err != nil {
+		return err
+	}
+	if w.UserID == "" {
+		return fmt.Errorf("user_id is required")
+	}
+	if w.Event.EventType == "" {
+		return fmt.Errorf("event.event_type is required")
+	}
+	return nil
 }

Also update imports:

-import "encoding/json"
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+)

10-11: Align event JSON shape across modules (event vs event_type).

This file serializes wrapper as {"event":{"event_type":...,"data":...}}, while SSE uses {"event": "...", "data": ...}. Consider a single canonical Event representation or explicit translation helpers to avoid confusion and drift.

Please confirm intended cross-module JSON contracts (Pub/Sub vs SSE) and whether a shared type or mapper should be introduced.

Also applies to: 21-25

internal/config/config.go (1)

79-81: Expose Pub/Sub buffer size in config (avoid magic 128).

internal/config/module.go hardcodes BufferSize: 128. Make it configurable alongside URL.

 type PubSub struct {
-	URL string `yaml:"url" envconfig:"PUBSUB__URL"`
+	URL        string `yaml:"url"         envconfig:"PUBSUB__URL"`
+	BufferSize int    `yaml:"buffer_size" envconfig:"PUBSUB__BUFFER_SIZE"`
 }

Default:

 PubSub: PubSub{
-	URL: "memory://",
+	URL:        "memory://",
+	BufferSize: 128,
 },

And in internal/config/module.go (for completeness):

 return pubsub.Config{
-  URL:        cfg.PubSub.URL,
-  BufferSize: 128,
+  URL:        cfg.PubSub.URL,
+  BufferSize: cfg.PubSub.BufferSize,
 }

Also applies to: 111-113

internal/sms-gateway/pubsub/module.go (1)

18-24: Avoid shadowing package name; log Close errors in OnStop.

Variable name equals imported package; also errors from Close are swallowed by Fx logs without module context.

-		fx.Invoke(func(pubsub pubsub.PubSub, lc fx.Lifecycle) {
+		fx.Invoke(func(ps pubsub.PubSub, lc fx.Lifecycle, logger *zap.Logger) {
 			lc.Append(fx.Hook{
 				OnStop: func(_ context.Context) error {
-					return pubsub.Close()
+					if err := ps.Close(); err != nil {
+						logger.Error("pubsub close failed", zap.Error(err))
+						return err
+					}
+					return nil
 				},
 			})
 		}),
internal/sms-gateway/modules/events/module.go (1)

21-28: Add panic guard in Run goroutine to trigger graceful shutdown.

Prevents silent process crashes in the events service goroutine; keeps error handling behavior consistent across both error and panic paths.

 go func() {
+	defer func() {
+		if r := recover(); r != nil {
+			logger.Error("Panic in events service", zap.Any("recover", r))
+			if err := sh.Shutdown(fx.ExitCode(1)); err != nil {
+				logger.Error("Failed to shutdown", zap.Error(err))
+			}
+		}
+	}()
 	if err := svc.Run(ctx); err != nil {
 		logger.Error("Error running events service", zap.Error(err))
 		if err := sh.Shutdown(fx.ExitCode(1)); err != nil {
 			logger.Error("Failed to shutdown", zap.Error(err))
 		}
 	}
 }()

Verified: Service.Run(ctx context.Context) returns error at internal/sms-gateway/modules/events/service.go:76, so the block compiles correctly.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a102d73 and 2ffc5a2.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (18)
  • go.mod (0 hunks)
  • internal/config/config.go (3 hunks)
  • internal/config/module.go (2 hunks)
  • internal/sms-gateway/app.go (2 hunks)
  • internal/sms-gateway/cache/factory.go (1 hunks)
  • internal/sms-gateway/modules/events/events.go (2 hunks)
  • internal/sms-gateway/modules/events/metrics.go (1 hunks)
  • internal/sms-gateway/modules/events/module.go (1 hunks)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
  • internal/sms-gateway/pubsub/config.go (1 hunks)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
  • pkg/cache/cache.go (1 hunks)
  • pkg/cache/memory.go (2 hunks)
  • pkg/cache/redis.go (2 hunks)
  • pkg/pubsub/memory.go (1 hunks)
  • pkg/pubsub/redis.go (1 hunks)
💤 Files with no reviewable changes (1)
  • go.mod
🚧 Files skipped from review as they are similar to previous changes (6)
  • internal/config/module.go
  • internal/sms-gateway/pubsub/config.go
  • pkg/cache/memory.go
  • pkg/cache/cache.go
  • pkg/pubsub/redis.go
  • internal/sms-gateway/app.go
🧰 Additional context used
🧬 Code graph analysis (10)
internal/config/config.go (1)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
  • Service (19-30)
internal/sms-gateway/app.go (1)
  • Run (62-73)
pkg/pubsub/memory.go (2)
pkg/pubsub/pubsub.go (5)
  • Message (13-16)
  • ErrPubSubClosed (9-9)
  • ErrInvalidTopic (10-10)
  • Subscription (18-23)
  • PubSub (45-50)
pkg/pubsub/options.go (1)
  • Option (3-3)
internal/sms-gateway/modules/events/events.go (2)
internal/sms-gateway/modules/events/types.go (2)
  • Event (9-12)
  • NewEvent (14-19)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/pubsub/pubsub.go (5)
internal/config/config.go (2)
  • PubSub (79-81)
  • Config (10-19)
internal/sms-gateway/pubsub/config.go (1)
  • Config (4-7)
pkg/pubsub/options.go (1)
  • WithBufferSize (17-21)
pkg/pubsub/memory.go (1)
  • NewMemory (24-36)
pkg/pubsub/redis.go (2)
  • NewRedis (42-76)
  • RedisConfig (15-27)
internal/sms-gateway/modules/events/service.go (10)
internal/sms-gateway/modules/push/service.go (1)
  • Service (36-46)
internal/sms-gateway/modules/messages/service.go (2)
  • Service (48-61)
  • NewService (63-85)
internal/sms-gateway/modules/sse/service.go (2)
  • Service (17-25)
  • NewService (38-47)
internal/sms-gateway/modules/settings/service.go (2)
  • Service (19-25)
  • NewService (27-35)
internal/sms-gateway/modules/webhooks/service.go (2)
  • Service (28-37)
  • NewService (39-50)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/metrics.go (6)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • EventTypeUnknown (26-26)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
internal/sms-gateway/pubsub/module.go (4)
internal/config/module.go (1)
  • Module (22-131)
internal/sms-gateway/app.go (1)
  • Module (36-60)
internal/sms-gateway/pubsub/pubsub.go (2)
  • New (16-41)
  • PubSub (14-14)
internal/config/config.go (1)
  • PubSub (79-81)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
  • NewRedis (62-89)
  • RedisConfig (37-51)
pkg/cache/redis.go (2)
pkg/cache/cache.go (1)
  • Cache (5-40)
internal/config/config.go (1)
  • Cache (75-77)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: E2E
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: Analyze (go)
🔇 Additional comments (7)
internal/sms-gateway/pubsub/pubsub.go (1)

16-40: LGTM! Factory pattern correctly implemented.

The function properly defaults to memory://, parses the URL, and switches on scheme to instantiate the appropriate backend with options.

pkg/pubsub/memory.go (1)

10-153: LGTM! All previous concurrency issues resolved.

The implementation correctly:

  • Stores subscriber context alongside channels (lines 19-22, 100)
  • Holds RLock during broadcast to prevent map mutation panics (lines 52-53)
  • Checks subscriber cancellation to avoid deadlocks (line 74)
  • Unsubscribes before closing channels to prevent send-on-closed panics (lines 109-111)
  • Includes all fields in returned Subscription (line 116)
pkg/cache/redis.go (2)

36-89: LGTM! Config-based Redis cache properly manages client lifecycle.

The implementation correctly:

  • Validates that either Client or URL is provided (lines 67-69)
  • Creates a client from URL when needed (lines 72-79)
  • Tracks ownership with ownedClient flag (line 83)
  • Normalizes prefix with trailing colon (lines 63-65)

203-211: LGTM! Close method properly releases owned resources.

The implementation correctly closes only owned clients, avoiding double-close bugs.

internal/sms-gateway/modules/events/events.go (1)

9-29: LGTM! Event constructors correctly return by value.

The migration from pointer to value semantics is appropriate for the Event struct and consistently applied across all constructors.

internal/sms-gateway/modules/events/service.go (2)

76-103: LGTM! Run method correctly handles subscription lifecycle.

The implementation properly:

  • Checks for closed channel (lines 89-93)
  • Increments metrics on deserialization failure (line 96)
  • Logs errors without exposing PII (line 97)
  • Returns on context cancellation (lines 86-88)

105-150: LGTM! Event processing correctly accesses Event fields.

The method properly uses wrapper.Event.EventType and wrapper.Event.Data throughout, consistent with the updated Event struct.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal/sms-gateway/pubsub/module.go (1)

18-18: Consider using the local PubSub alias for consistency.

The parameter uses the fully qualified pubsub.PubSub type from the imported package, while the local package exports a PubSub alias. Using the local alias would be more consistent with the package's public API and the return type of New().

Apply this diff for consistency:

-		fx.Invoke(func(ps pubsub.PubSub, logger *zap.Logger, lc fx.Lifecycle) {
+		fx.Invoke(func(ps PubSub, logger *zap.Logger, lc fx.Lifecycle) {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2ffc5a2 and bf7a8aa.

📒 Files selected for processing (2)
  • internal/sms-gateway/cache/factory.go (2 hunks)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
internal/sms-gateway/pubsub/module.go (4)
internal/sms-gateway/app.go (1)
  • Module (36-60)
internal/config/module.go (1)
  • Module (22-131)
internal/sms-gateway/pubsub/pubsub.go (2)
  • New (16-41)
  • PubSub (14-14)
internal/config/config.go (1)
  • PubSub (79-81)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
  • NewRedis (62-89)
  • RedisConfig (37-51)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: E2E
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: Analyze (go)
🔇 Additional comments (4)
internal/sms-gateway/cache/factory.go (2)

44-49: Namespace bug fixed! Verify TTL configuration.

The Redis prefix now correctly includes the cache name (keyPrefix + name), resolving the previous namespace collision issue. Each cache will have a unique namespace.

However, the TTL is hardcoded to 0. Verify whether this is intentional (meaning no expiration) or if TTL should be configurable via the factory config.


59-59: LGTM!

The delegation is clean and correct. The raw name is passed to the internal factory function, which handles prefixing appropriately in the Redis case.

internal/sms-gateway/pubsub/module.go (2)

1-9: LGTM: Clean imports and package structure.

The package declaration and imports are correct. All dependencies are necessary for the fx module setup.


11-29: LGTM: Well-structured fx module integration.

The module follows Uber Fx best practices with proper logger scoping, constructor wiring, and lifecycle management. The OnStop hook ensures graceful shutdown of the PubSub instance.

@capcom6 capcom6 force-pushed the events/multi-instance-support branch from 3d03662 to 7e48f90 Compare October 18, 2025 23:11
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
internal/sms-gateway/modules/events/service.go (2)

51-52: Accept parent context to respect caller cancellation.

Using context.Background() ignores any caller's cancellation or timeout, potentially causing the operation to block for up to 5 seconds even when the caller has been cancelled. This can lead to resource leaks and degraded responsiveness during shutdown.

Consider updating the Notify signature to accept a context parameter:

-func (s *Service) Notify(userID string, deviceID *string, event Event) error {
+func (s *Service) Notify(ctx context.Context, userID string, deviceID *string, event Event) error {
 	if event.EventType == "" {
 		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
 		return fmt.Errorf("event type is empty")
 	}

-	subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
 	defer cancel()

Then update all callers to pass an appropriate context.


46-49: Add failure metric for validation errors.

The validation check on line 47-49 returns an error but doesn't increment a failure metric. This makes it difficult to monitor validation failures in production.

Apply this diff to add metric tracking:

 func (s *Service) Notify(userID string, deviceID *string, event Event) error {
 	if event.EventType == "" {
+		s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError)
 		return fmt.Errorf("event type is empty")
 	}

Note: If you add FailureReasonValidationError as suggested in metrics.go, use that constant instead of FailureReasonSerializationError for clearer metrics distinction.

🧹 Nitpick comments (1)
internal/sms-gateway/modules/events/metrics.go (1)

22-26: Consider adding a validation-specific failure reason constant.

The past review comments suggest adding FailureReasonValidationError = "validation_error" to distinguish validation failures from serialization errors. Currently, validation failures in service.go (line 48) don't increment any failure metric, and if they did, they would need a distinct constant to avoid conflating validation issues with actual serialization failures.

Apply this diff to add the validation error constant:

 	FailureReasonSerializationError = "serialization_error"
 	FailureReasonPublishError       = "publish_error"
+	FailureReasonValidationError    = "validation_error"
 	FailureReasonProviderFailed     = "provider_failed"
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d03662 and 7e48f90.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (20)
  • go.mod (0 hunks)
  • internal/config/config.go (3 hunks)
  • internal/config/module.go (2 hunks)
  • internal/sms-gateway/app.go (2 hunks)
  • internal/sms-gateway/cache/factory.go (2 hunks)
  • internal/sms-gateway/modules/events/events.go (2 hunks)
  • internal/sms-gateway/modules/events/metrics.go (1 hunks)
  • internal/sms-gateway/modules/events/module.go (1 hunks)
  • internal/sms-gateway/modules/events/service.go (2 hunks)
  • internal/sms-gateway/modules/events/types.go (1 hunks)
  • internal/sms-gateway/pubsub/config.go (1 hunks)
  • internal/sms-gateway/pubsub/module.go (1 hunks)
  • internal/sms-gateway/pubsub/pubsub.go (1 hunks)
  • pkg/cache/cache.go (1 hunks)
  • pkg/cache/memory.go (2 hunks)
  • pkg/cache/redis.go (2 hunks)
  • pkg/pubsub/memory.go (1 hunks)
  • pkg/pubsub/options.go (1 hunks)
  • pkg/pubsub/pubsub.go (1 hunks)
  • pkg/pubsub/redis.go (1 hunks)
💤 Files with no reviewable changes (1)
  • go.mod
🚧 Files skipped from review as they are similar to previous changes (6)
  • internal/config/config.go
  • internal/sms-gateway/pubsub/module.go
  • pkg/pubsub/memory.go
  • pkg/cache/cache.go
  • pkg/pubsub/options.go
  • pkg/pubsub/redis.go
🧰 Additional context used
🧬 Code graph analysis (10)
internal/sms-gateway/modules/events/module.go (2)
internal/sms-gateway/modules/events/service.go (1)
  • Service (19-30)
internal/sms-gateway/app.go (1)
  • Run (62-73)
internal/sms-gateway/app.go (2)
internal/config/module.go (1)
  • Module (22-131)
internal/sms-gateway/pubsub/module.go (1)
  • Module (10-29)
internal/sms-gateway/cache/factory.go (1)
pkg/cache/redis.go (2)
  • NewRedis (62-89)
  • RedisConfig (37-51)
pkg/cache/memory.go (2)
internal/sms-gateway/cache/factory.go (1)
  • Cache (14-14)
pkg/cache/cache.go (1)
  • Cache (5-40)
internal/sms-gateway/modules/events/service.go (5)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/types.go (1)
  • Event (9-12)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/events/metrics.go (7)
  • DeliveryTypeUnknown (20-20)
  • FailureReasonSerializationError (22-22)
  • FailureReasonPublishError (23-23)
  • EventTypeUnknown (26-26)
  • DeliveryTypePush (18-18)
  • FailureReasonProviderFailed (24-24)
  • DeliveryTypeSSE (19-19)
internal/config/module.go (3)
internal/config/config.go (2)
  • Config (10-19)
  • PubSub (79-81)
internal/sms-gateway/pubsub/config.go (1)
  • Config (4-7)
internal/sms-gateway/pubsub/pubsub.go (1)
  • PubSub (14-14)
internal/sms-gateway/modules/events/events.go (1)
internal/sms-gateway/modules/events/types.go (2)
  • Event (9-12)
  • NewEvent (14-19)
internal/sms-gateway/modules/events/types.go (3)
internal/sms-gateway/modules/sse/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types/types.go (1)
  • Event (7-10)
internal/sms-gateway/modules/push/types.go (1)
  • Event (16-16)
internal/sms-gateway/pubsub/pubsub.go (5)
internal/config/config.go (2)
  • PubSub (79-81)
  • Config (10-19)
internal/sms-gateway/pubsub/config.go (1)
  • Config (4-7)
pkg/pubsub/options.go (1)
  • WithBufferSize (17-21)
pkg/pubsub/memory.go (1)
  • NewMemory (24-36)
pkg/pubsub/redis.go (2)
  • NewRedis (42-76)
  • RedisConfig (15-27)
pkg/cache/redis.go (2)
pkg/cache/cache.go (1)
  • Cache (5-40)
internal/config/config.go (1)
  • Cache (75-77)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Benchmark
  • GitHub Check: Test
  • GitHub Check: E2E
  • GitHub Check: Analyze (go)
🔇 Additional comments (16)
internal/sms-gateway/pubsub/config.go (1)

1-7: LGTM!

The Config struct is clean and well-documented. The URL-based backend selection pattern is consistent with the cache factory approach elsewhere in the codebase.

internal/sms-gateway/cache/factory.go (1)

44-49: Good fix for the namespace collision issue.

The Redis cache now correctly uses keyPrefix + name as the Prefix, which addresses the past review concern about all caches colliding on the same namespace.

pkg/cache/redis.go (2)

36-89: LGTM! Clean config-based initialization with proper ownership tracking.

The RedisConfig pattern and client ownership management are well-implemented:

  • Clear validation that either Client or URL must be provided
  • Proper prefix normalization
  • Ownership tracking enables safe resource cleanup

203-211: LGTM! Proper resource cleanup with ownership check.

The Close method correctly closes only owned clients, preventing double-close issues when an external client is provided. The compile-time assertion is good practice.

internal/sms-gateway/modules/events/events.go (1)

9-29: LGTM! Consistent value-type returns.

The event constructors now return Event by value, which aligns with the updated Event type definition and eliminates pointer management overhead for these immutable data structures.

internal/sms-gateway/modules/events/service.go (2)

89-100: LGTM! Proper closed-channel handling.

The select statement correctly checks the ok value from the channel receive, preventing tight loops when the subscription is closed. The error handling and metrics for deserialization failures are appropriate.


105-150: LGTM! Clean device-based event routing.

The processEvent method properly handles device selection, routing to push or SSE based on token availability, and tracks success/failure metrics for each delivery type.

internal/config/module.go (1)

125-130: LGTM! Correct URL source for PubSub configuration.

The provider now correctly reads from cfg.PubSub.URL instead of cfg.Cache.URL, addressing the past review concern. The BufferSize of 128 provides reasonable capacity for the Pub/Sub channels.

pkg/cache/memory.go (2)

16-23: LGTM! Constructor change aligns with lifecycle management.

Returning the concrete *memoryCache type instead of the Cache interface is appropriate here, especially with the compile-time check at Line 176 ensuring interface compliance.


172-176: LGTM! Close implementation and interface check are correct.

The no-op Close() is appropriate for the in-memory cache (no external resources to clean up), and the compile-time assertion ensures interface compliance.

internal/sms-gateway/modules/events/module.go (1)

17-28: LGTM! Error handling and shutdown logic are well-structured.

The addition of error handling for svc.Run(ctx) with proper logging and graceful shutdown via fx.Shutdowner follows fx lifecycle best practices.

internal/sms-gateway/app.go (1)

24-24: LGTM! Clean integration of the pubsub module.

The import and module registration are straightforward and follow the existing pattern.

Also applies to: 49-49

pkg/pubsub/pubsub.go (3)

8-16: LGTM! Clean error definitions and message structure.

The sentinel errors and Message struct provide a clear, simple foundation for the pub/sub abstraction.


18-31: LGTM! Well-designed subscription lifecycle management.

The Subscription type properly encapsulates its state with unexported fields, exposes a read-only channel via Receive(), and uses context cancellation for cleanup—following Go best practices.


33-50: LGTM! Well-structured interface design.

The interface hierarchy follows good design principles:

  • Segregates Publisher and Subscriber concerns
  • Combines them in the PubSub interface with lifecycle management
  • Clear documentation of behavior (fan-out, context cancellation)
internal/sms-gateway/pubsub/pubsub.go (1)

16-41: No issues found—Redis client closure is properly implemented.

The verification confirms the factory implementation correctly manages the Redis client lifecycle:

  • Factory passes Client: nil to RedisConfig (line 20)
  • NewRedis() detects nil client and creates a new one via redis.NewClient(opt) (line 59)
  • ownedClient flag is correctly set to true when the client is created internally (line 71)
  • Close() properly guards the client closure with the ownedClient check and calls r.client.Close() when owned (lines 173–174)

The past concern about Redis client closure is not validated—the code already implements proper client lifecycle management.

@capcom6 capcom6 marked this pull request as ready for review October 18, 2025 23:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant