Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feart(streaming): query cache #2392

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open

feart(streaming): query cache #2392

wants to merge 21 commits into from

Conversation

hekike
Copy link
Contributor

@hekike hekike commented Mar 8, 2025

Query cache that merges cached data with fresh data:

  1. Look up cached rows (old usage, > 24h)
  2. Query new cachable data if any (old enough usage but not cached yet: cached < x < h24)
  3. Store new cachable data for future queries
  4. Queries fresh usage from events table (< 24h)
  5. Merges cached and fresh data (aggregates in memory if needed)
  6. ...Batch Insert invalidates cache when needed

Only enabled when cachable is set true, for HTTP requests when clientId is present it is set to true automatically (UI queries).

Summary by CodeRabbit

  • New Features
    • Enhanced caching for meter queries, delivering faster and more reliable responses when unique identifiers are provided.
    • Improved merging of cached and fresh data for consistent query performance.
    • Introduced deterministic parameter processing for reliable caching behavior.
    • Added precise time filtering capabilities in SQL queries.
    • Introduced configuration options for query caching in aggregation settings.
    • New mock implementations for testing ProgressManager and ClickHouse database interactions.
  • Tests
    • Expanded test coverage for caching, data merging, and error handling to ensure robust and dependable performance.

Copy link

coderabbitai bot commented Mar 8, 2025

Walkthrough

The pull request introduces caching capabilities and enhancements for meter queries. It adds conditional caching logic in the query handler when a ClientID is provided and extends the query parameters with a deterministic hash function. New functions and SQL generation methods are implemented for caching and merging meter query rows for ClickHouse. Additionally, mock implementations for ProgressManager and ClickHouse interactions are added, and the Connector’s query handling is refactored to include progress tracking and improved error handling. Comprehensive unit tests have been introduced to validate these new functionalities.

Changes

File(s) Summary
openmeter/meter/httphandler/query.go Added a conditional check in QueryMeter to enable caching when ClientID is provided.
openmeter/progressmanager/adapter/mock.go Introduced a new mock for the ProgressManager service with MockProgressManager and its methods (GetProgress, DeleteProgressByRuntimeID, UpsertProgress, and constructor).
openmeter/streaming/clickhouse/mock.go Added mocks for ClickHouse: MockClickHouse and MockRows with methods to simulate database operations (e.g., Query, QueryRow, Select, etc.).
openmeter/.../raw_events/cache.go, cachemerge.go, cachequery.go, connector.go, meter_query.go Implemented caching, query merging, and SQL generation for meter queries. Refactored the Connector to split query functions (adding queryMeterWithProgress and updating queryMeter), create cache tables, and improve row scanning error handling.
openmeter/.../raw_events/cache_test.go, cachemerge_test.go, cachequery_test.go, connector_test.go Added comprehensive unit tests covering caching logic, merging of cached and fresh rows, SQL generation, and Connector query behavior including error scenarios.
openmeter/streaming/query_params.go Extended QueryParams with a new Hash() method and a Cachable field to generate a deterministic hash based on time, filtering, grouping, and window parameters.
app/common/streaming.go Updated NewStreamingConnector to include new caching fields in ConnectorConfig.
app/config/aggregation.go Introduced AggregationQueryCacheConfiguration for cache settings in the aggregation engine and added validation logic.
app/config/config_test.go Enhanced configuration tests with new QueryCache field in AggregationConfiguration.

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f898670 and a75fd39.

📒 Files selected for processing (1)
  • openmeter/streaming/clickhouse/raw_events/cache.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: CI
  • GitHub Check: Analyze (go)
🔇 Additional comments (9)
openmeter/streaming/clickhouse/raw_events/cache.go (9)

15-42: Consider extending aggregator coverage in future.

Currently, isQueryCachable explicitly checks for Sum, Count, Min, and Max. This is sufficient for now, but you might want to support additional aggregators (e.g., Avg) down the road if it becomes relevant.


44-57: Looks good to me!

This closure-based approach for creating a query meter dedicated to fresh data is clear and readable. No immediate concerns stand out for this logic.


59-125: Revisit partial cache-insert error handling for critical cases.

Logging an error on insert failures is often acceptable. However, if caching consistency is important, consider whether a retry or fallback mechanism is needed to prevent silent performance degradation or partial cache states.


127-159: Truncating query range may surprise users.

You are subtracting the freshness delta and truncating to full days. This design is valid, but it may confuse users if they explicitly request a partial-day range. Consider adding a log or doc comment clarifying that truncation is applied.


161-187: Secure parameter usage and error handling look good.

Your query-building pattern, with placeholders for SQL statements, helps mitigate injection risks. The overall flow for retrieving cached data appears robust.


189-205: Consider large batch insertion strategy for performance.

If a high volume of rows needs caching simultaneously, you might benefit from batching or optimized insert operations. This could reduce overhead and improve throughput when dealing with large datasets.


207-220: Creation logic is straightforward.

Ensuring the cache table exists at startup is a neat approach. Error handling and logging appear sufficient here.


222-241: Namespace filtering logic is clear.

Removing duplicates and checking the latest event time to decide invalidation is succinct and effective.


243-262: Cleanly handles cache invalidation toggling.

Skipping invalidation when caching is disabled avoids unnecessary DB operations. Deletion logic is succinct and straightforward.

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@hekike hekike changed the title Feat/query cache feart(streaming): query cache Mar 8, 2025
@hekike hekike added area/processor release-note/feature Release note: Exciting New Features labels Mar 8, 2025
@hekike hekike marked this pull request as ready for review March 9, 2025 02:33
@hekike hekike requested a review from a team as a code owner March 9, 2025 02:33
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🔭 Outside diff range comments (3)
pkg/gosundheit/logger.go (1)

29-40: 🧹 Nitpick (assertive)

Consider using a log level check instead of commenting out code.

Rather than commenting out the log statements, consider using a conditional check based on log level. This approach is more maintainable as log levels can be adjusted at runtime.

func (c checkListener) OnCheckStarted(name string) {
-	// c.logger.Debug("starting health check", slog.String("check", name))
+	if c.logger.Enabled(context.Background(), slog.LevelDebug) {
+		c.logger.Debug("starting health check", slog.String("check", name))
+	}
}

func (c checkListener) OnCheckCompleted(name string, result health.Result) {
	if result.Error != nil {
		c.logger.Warn("health check failed", slog.String("check", name), slog.Any("error", result.Error))

		return
	}

-	// c.logger.Debug("health check completed", slog.String("check", name))
+	if c.logger.Enabled(context.Background(), slog.LevelDebug) {
+		c.logger.Debug("health check completed", slog.String("check", name))
+	}
}
openmeter/streaming/clickhouse/raw_events/cachemerge_test.go (1)

232-327: 🧹 Nitpick (assertive)

Comprehensive aggregation tests.

The tests for aggregateMeterQueryRows cover the main aggregation types:

  • sum
  • count
  • min
  • max

And they verify both the aggregated values and the preservation of metadata like subject, window range, and group-by values.

Consider adding test cases for the remaining aggregation types:

  • avg (Average)
  • unique_count (Unique Count)

These might have different behaviors worth testing.

openmeter/streaming/clickhouse/raw_events/cachequery_test.go (1)

181-229: 🧹 Nitpick (assertive)

Row scanning test is effective; extend to check multiple rows if possible.
Current approach confirms handling of group-by keys with empty strings. Adding a multi-row fixture would better validate iterative scanning behavior.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c73a71a and 3e6c834.

📒 Files selected for processing (15)
  • etc/seed/seed.yaml (1 hunks)
  • openmeter/meter/httphandler/query.go (1 hunks)
  • openmeter/progressmanager/adapter/mock.go (1 hunks)
  • openmeter/streaming/clickhouse/mock.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachemerge.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachemerge_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachequery.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachequery_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/connector.go (5 hunks)
  • openmeter/streaming/clickhouse/raw_events/connector_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/meter_query.go (3 hunks)
  • openmeter/streaming/query_params.go (2 hunks)
  • pkg/gosundheit/logger.go (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: CI
  • GitHub Check: Analyze (go)
🔇 Additional comments (34)
openmeter/progressmanager/adapter/mock.go (5)

12-12: Good practice using compile-time interface check.

Using a variable declaration with the blank identifier to verify interface implementation at compile-time is an excellent practice. This ensures the mock properly implements the expected interface.


14-16: Constructor implementation is clean.

This follows Go's standard constructor pattern for creating a new instance of the mock object.


18-21: Proper mock structure.

The MockProgressManager struct correctly embeds mock.Mock from the testify package, which is the standard approach for creating testify mocks.


28-31: Method implementation looks good.

The DeleteProgressByRuntimeID method correctly registers the call and returns the expected error.


33-36: Method implementation looks good.

The UpsertProgress method correctly registers the call and returns the expected error.

openmeter/streaming/clickhouse/mock.go (3)

1-12: Well-structured mock implementation with proper imports.

The mock implementation follows good practices by importing the necessary dependencies and implementing the required interfaces.


11-20: Good practice using interface assertion for compile-time verification.

The var _ clickhouse.Conn = &MockClickHouse{} line ensures that the mock properly implements the interface at compile time.


77-101: Good implementation of MockRows with proper interface assertion.

The implementation of the MockRows struct and its methods follows good practices and correctly asserts interface compliance.

pkg/gosundheit/logger.go (2)

40-40: Reduced log verbosity by commenting out debug statements.

Commenting out the debug log for health check completions will reduce log noise, especially for successful checks.


29-30:

✅ Verification successful

Reduced log verbosity by commenting out debug statements.

Commenting out the debug log for health check starts will reduce log noise, which is appropriate for high-frequency health checks.


🏁 Script executed:

#!/bin/bash
# Check how frequently health checks are performed
rg -A 5 "RegisterCheck|AddCheck" --type go

Length of output: 327


Approval: Removal of Debug Logging Verified

The debug log in pkg/gosundheit/logger.go (lines 29–30) has been commented out to reduce log noise for high-frequency health check events. The frequency observed in health check registration (in app/common/termination.go) confirms that this change appropriately minimizes unnecessary logging.

  • The change in OnCheckStarted reduces verbosity during frequent health checks.
  • The registration frequency in app/common/termination.go supports this adjustment.
etc/seed/seed.yaml (1)

12-13: Enhanced timestamp generation for better testing.

The new timestamp generation creates random times within the last 30 days, which is beneficial for testing caching functionality that depends on time-based patterns.

openmeter/meter/httphandler/query.go (2)

113-116: Added client-based caching enablement.

The code now enables caching when a client ID is provided, which aligns with the PR objective of implementing query caching for UI queries.


113-116:

❓ Verification inconclusive

Verify the impact of enabling caching on query performance.

The automatic enabling of caching for client ID-based requests is a significant change that could impact query performance and resource usage.


🌐 Web query:

What are the performance implications of query caching in ClickHouse?

💡 Result:

ClickHouse's query cache can significantly enhance performance for read-intensive workloads by reducing query execution time and server load, but its effectiveness depends on proper configuration and understanding of its design limitations. Here's a detailed analysis:

Key Performance Benefits

  • Latency Reduction: Cached query results can reduce response times from seconds to milliseconds for repeated queries, as seen in benchmarks where a query dropped from 8.7 seconds to 0.002 seconds after caching[3][4].
  • Resource Efficiency: Avoids redundant computation for frequent or expensive queries, lowering CPU and memory usage[1][12].
  • Scalability: Transactionally inconsistent design prevents invalidation bottlenecks, making it suitable for high-throughput OLAP workloads[1][4].

Configuration Considerations

  • Entry Eligibility: Queries must meet criteria like minimum execution time (use_query_cache_min_query_duration) or frequency (use_query_cache_min_query_runs) to be cached[1][4].
  • Size Limits: Default 1 MiB entry size may exclude moderately large results; adjust via max_entry_size in server config[1][3].
  • TTL Management: Default 60-second cache lifetime balances freshness with reuse; modify using query_cache_ttl[3][4].

Potential Limitations

  • Stale Results: Cache doesn't auto-invalidate on data changes, risking outdated results until entries expire. Suitable for slowly changing data but problematic for real-time use cases[4][6].
  • Security Isolation: By default, entries are user-specific to prevent cross-user data leaks. While configurable via query_cache_share_between_users, sharing reduces security[1][6].
  • Non-Deterministic Functions: Queries using now() or rand() are excluded by default; override with query_cache_store_results_of_queries_with_nondeterministic_functions[1][4].

Operational Best Practices

  • Selective Enablement: Activate caching per-query with SETTINGS use_query_cache=1 rather than globally to avoid unnecessary overhead[3][4].
  • Monitoring: Use system.query_cache for entries, system.events for hit/miss rates, and system.asynchronous_metrics for memory usage[4][7].
  • Clear Strategies: Manually invalidate stale entries with SYSTEM DROP QUERY CACHE or tag-based removal (SYSTEM DROP QUERY CACHE TAG)[4][7].

Performance Tradeoffs

  • Write Overhead: Caching adds minor latency (~5-10%) during initial query execution due to result storage[1][4].
  • Memory Pressure: Large caches may compete with other processes; monitor via QueryCacheBytes metric[4][7].

In summary, ClickHouse's query cache offers substantial performance gains for repetitive analytical queries but requires careful tuning of expiration policies, security settings, and eligibility rules. It complements other optimizations like proper indexing and projections but isn't a substitute for them[8][12].

Citations:


Action Required: Validate Query Caching Impact on Performance

After reviewing the detailed performance implications of ClickHouse's query caching, note that while enabling caching—especially for client ID-based requests—can significantly reduce query latency and lower server load under the right conditions, it also introduces several configuration-dependent tradeoffs. In particular:

  • Eligibility & Configuration: Verify that queries meet the necessary criteria (such as minimum execution time and frequency) to be effective candidates for caching.
  • Resource Overhead: Monitor the initial overhead when storing query results and potential memory pressure, especially if many queries are cached simultaneously.
  • Staleness & Security: Ensure that the benefits of lower latency don’t come at the cost of stale data or compromised isolation between users.
  • Benchmark Testing: Conduct targeted performance benchmarks to confirm that auto-enabling caching for client ID-based requests yields the expected performance improvements without adverse side effects.

Please validate these points in your current setup to confirm that the caching behavior is correctly tuned for your workload.

openmeter/streaming/query_params.go (1)

17-17: Good addition of the Cachable property.

Adding a Cachable flag to the QueryParams struct is a clean approach for controlling the caching behavior at the query parameter level.

openmeter/streaming/clickhouse/raw_events/meter_query.go (2)

141-141: Return nil instead of empty slice for invalid aggregation type.

This change maintains better error semantics by returning nil instead of an empty slice for the args parameter when there's an error. This improves consistency with error handling patterns in Go.


245-306: Well-structured row scanning implementation.

The scanRows method efficiently handles scanning database rows into the application data structure. It properly:

  • Initializes the GroupBy map
  • Handles nil values for measurements
  • Processes group-by fields correctly
  • Skips empty rows

Two minor concerns to address:

  1. There's a TODO comment about using decimal that should be resolved.
  2. The error handling could be more specific about which part of the scanning failed.

Regarding the TODO comment on line 272, do you plan to implement decimal support for more precise calculations? Float64 can have precision issues with financial or billing data.

openmeter/streaming/clickhouse/raw_events/connector_test.go (2)

20-102: Good test coverage for successful query execution.

This test effectively validates the query execution flow and result processing by:

  • Setting up a proper mock ClickHouse connection
  • Validating that the correct query parameters are passed
  • Confirming that rows are properly scanned
  • Verifying that the results match expected values

The test structure is clean and well-organized.


103-137: Comprehensive error handling test cases.

The test covers the key error scenarios:

  1. Basic query errors
  2. Meter not found errors (which should be wrapped appropriately)
  3. Row scanning errors

This provides good coverage of the error paths in the code.

openmeter/streaming/clickhouse/raw_events/cachemerge_test.go (2)

15-181: Good test coverage for merging cached and fresh rows.

The tests for mergeMeterQueryRows effectively cover various scenarios:

  • Empty cached rows
  • Window size handling
  • Aggregation with and without window size
  • Different subjects
  • Group by values

This provides a solid foundation for ensuring the merging functionality works correctly.


183-230: Well-designed key generation tests.

The tests for getMeterQueryRowKey verify that keys are correctly generated for different combinations of:

  • Subject-only grouping
  • Multiple group-by fields
  • Missing group-by fields

This ensures the cache lookup mechanism will work correctly with various grouping configurations.

openmeter/streaming/clickhouse/raw_events/cachemerge.go (5)

16-21: No concerns found with the fallback approach for missing cache.
The function returning freshRows when cachedRows is empty is a straightforward and sensible fallback.


22-32: Conditional return for windowed queries looks correct.
Concatenating and sorting when WindowSize is specified preserves expected query semantics without extra overhead.


50-60: Straightforward approach to store and reduce grouped results.
Returning the aggregated slice after processing each group keeps the logic easy to follow.


62-87: Key generation logic is robust and consistent.
Sorting params.GroupBy ensures deterministic ordering in the composite key. This approach avoids collisions and maintains clarity when merging rows.


89-134: Aggregation for Sum, Min, and Max is clear; check "Count" assumption.
Currently, "Count" is lumped in with "Sum," which implies that each row’s Value is assumed to represent exactly 1, or the total is meant to be the sum of those values. If “Count” should reflect the number of rows, then iterating over rows and adding 1 each time may be more accurate unless you guarantee each row’s Value is 1.

Would you like to confirm that each row’s Value is indeed 1 for “Count” scenarios? If not, we can revise the aggregator accordingly.

openmeter/streaming/clickhouse/raw_events/cachequery_test.go (1)

15-39: Solid validation of CREATE TABLE SQL.
Making sure column definitions and engine specs exist covers the key functionality. This test effectively prevents accidental regressions in table creation SQL.

openmeter/streaming/clickhouse/raw_events/connector.go (5)

80-82: Creation of meter query cache table on initialization is good practice.
Ensuring the cache table is set up upfront avoids runtime failures when querying or merging cached results.


115-122: Parameter validation is well-placed.
Checking the namespace and query inputs at the beginning of QueryMeter clarifies error handling early on.


124-129: Deterministic ordering of group-by keys aids testability and correctness.
Sorting params.GroupBy before building queries helps ensure consistent SQL generation and results.


163-179: Progress tracking for client-identified queries is beneficial.
This implementation cleanly separates the logic for progress-enabled queries from standard execution, making the flow easier to maintain.


457-478: Delegating progress tracking logic into a dedicated method is clean.
Separating counting logic via countSQL from the main query ensures better readability and maintainability.

openmeter/streaming/clickhouse/raw_events/cache.go (1)

21-42: Potential extension for aggregator coverage.
Currently, isQueryCachable returns true for Sum, Count, Min, and Max aggregations. If additional aggregations (e.g., Average) are introduced in the future, kindly verify whether they should be included here as well.

Would you like to confirm if aggregator types like Average or other custom metrics should also be cacheable via a broader aggregator check across the codebase?

openmeter/streaming/clickhouse/raw_events/cachequery.go (1)

106-112:

✅ Verification successful

Confirm type compatibility for UNIX timestamps.
Here, the code filters rows by doing comparisons against d.From.Unix() and d.To.Unix() when the column type is DateTime. Some ClickHouse drivers allow implicit casting, but it’s safer to verify or explicitly cast.

Consider confirming correctness by reviewing how the driver and schema handle UNIX integer timestamps for DateTime columns, for example:


🏁 Script executed:

#!/bin/bash
# Check if implicit casting from Int64 to DateTime works reliably
rg -A 10 "WHERE.*window_start" 

Length of output: 5543


Confirmed: Implicit Casting is Working Correctly

The integration tests in cachequery_test.go and cache_test.go confirm that the SQL queries are built with the expected UNIX timestamp values (using d.From.Unix() and d.To.Unix()). The tests verify that these integer values are correctly used in comparisons (e.g., "window_start >= ?" and "window_end <= ?"), indicating that the ClickHouse driver implicitly casts these Int64 values to DateTime without issue. No changes are necessary.

openmeter/streaming/clickhouse/raw_events/cache_test.go (1)

533-535: Truncation logic looks good.
Verifying hours, minutes, and seconds are zeroed correctly ensures day-level caching accuracy. This is a clean approach.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3e6c834 and adb225b.

📒 Files selected for processing (3)
  • openmeter/progressmanager/adapter/mock.go (1 hunks)
  • openmeter/streaming/clickhouse/mock.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: E2E
  • GitHub Check: CI
  • GitHub Check: Analyze (go)
🔇 Additional comments (10)
openmeter/progressmanager/adapter/mock.go (4)

1-12: Well-structured imports and interface validation.

The code has well-organized imports (standard library, third-party, internal) and includes compile-time verification that MockProgressManager implements the progressmanager.Service interface, which is a best practice.


14-16: Simple and effective constructor implementation.

The constructor follows Go's conventional naming pattern and correctly initializes the mock object.


18-21: Correct mock structure setup.

The MockProgressManager struct properly embeds mock.Mock to leverage the testify mocking capabilities.


23-32: Safe type assertion implementation.

The GetProgress method safely handles nil values by checking before performing type assertion, preventing potential panics during testing.

openmeter/streaming/clickhouse/raw_events/cache.go (3)

44-56: Ensure boundary conditions are handled in the new QueryMeter.
remainingQueryMeterFactory appropriately updates From to match the cachedQueryMeter.To. However, confirm whether From is inclusive or exclusive, and whether partial overlap with the cached period could occur. If the last cached window is inclusive, there is a risk of double counting or accidental gaps if it is exclusive. Documenting or verifying the intended behavior would prevent subtle off-by-one errors.


157-183: Ensure parameterized queries to prevent possible injection.
lookupCachedMeterRows uses a struct to build SQL statements and arguments. Confirm that toSQL() securely parameterizes inputs like hp.Database or hp.Namespace. Doing so helps prevent accidental SQL injection if any user-controlled strings are ever passed into these fields.


203-216: Confirm table creation method is idempotent.
createMeterQueryCacheTable calls Exec on a query to create the cache table if it does not exist, returning an error if creation fails. Verify that the underlying SQL includes an “IF NOT EXISTS” clause or equivalent so running this method multiple times won’t fail once the table is established.

openmeter/streaming/clickhouse/mock.go (3)

52-55: LGTM! Fixed the duplicate parameter issue.

The PrepareBatch method now correctly passes options only once to m.Called(), fixing the issue raised in a previous review.


62-65: LGTM! Consistent parameter handling.

The Exec method now uses the same pattern for handling parameters as other methods in the file, addressing the consistency concern from a previous review.


11-126: Add test coverage for the mock implementation.

While the mock implementation looks solid, there's currently no test coverage specifically for this mock implementation. To ensure the mocks behave as expected and prevent regressions, consider adding tests.

#!/bin/bash
# Check if there are any test files for the mock implementation
fd --extension go --exec grep -l "Test.*MockClickHouse\|Test.*MockRows" {} \;

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between adb225b and 31d29ae.

📒 Files selected for processing (3)
  • openmeter/streaming/clickhouse/raw_events/cache.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/meter_query.go (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: CI
  • GitHub Check: Analyze (go)
🔇 Additional comments (22)
openmeter/streaming/clickhouse/raw_events/meter_query.go (8)

9-9: No issues with the new ClickHouse driver import.


14-14: Alias rename looks good.


26-26: New 'FromExclusive' parameter is well integrated.


56-57: Centralized time filter invocation.
This refactor improves maintainability by applying time-based filters consistently.


68-68: Separate select columns and group-by columns.
This enhances clarity for the aggregated queries.


130-130: Clear error reporting for invalid aggregations.
No concerns here.


170-170: Subject filter with slicesx.Map.
This concise approach looks good.


207-218: Time filtering, grouping, and ordering logic.
The sequence of method calls is readable and logically consistent.

openmeter/streaming/clickhouse/raw_events/cache.go (8)

1-19: Initial package declaration, imports, and constants.
No issues. Definitions for minimum cache durations are clear.


44-57: Factory method for generating a fresh query after cached data processing.
This ensures continuity between cached and fresh data queries.


59-125: Structured caching workflow in queryMeterCached.
Logic for retrieving cached rows, querying new data, and storing updates is well-organized.


67-67: Typographical error in local variable name.


127-159: Preparing the time range for cached queries.
Truncation to whole days effectively avoids partial-day edge cases.


161-187: lookupCachedMeterRows retrieves cached results cleanly.
Error handling and scanning logic are well-implemented.


189-205: insertRowsToCache safely stores data.
Parameter binding mitigates risk of injection.


207-220: createMeterQueryCacheTable ensures table existence.
Straightforward usage with correct error handling.

openmeter/streaming/clickhouse/raw_events/cache_test.go (6)

1-21: Test file initial setup and imports.
Dependencies are organized appropriately.


136-269: TestConnector_QueryMeterCached ensures cached + fresh data integration.
Logic appears robust, with cohesive steps.


271-341: TestConnector_LookupCachedMeterRows verifies retrieval from cache.
Implementation is thoroughly tested without issues.


343-385: TestConnector_InsertRowsToCache ensures insertion integrity.
No concerns noted with the tested approach.


387-426: TestRemainingQueryMeterFactory corrects time boundaries.
Adjusts query meter from the last cached window.


428-541: TestGetQueryMeterForCachedPeriod.
Handling of missing From, partial truncation, and window size defaults is verified comprehensively.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

♻️ Duplicate comments (3)
openmeter/streaming/clickhouse/raw_events/cache.go (3)

67-67: 🧹 Nitpick (assertive)

Fix typographical error in the local variable name.

The variable createReaminingQueryMeter is misspelled and inconsistent with the function name remainingQueryMeterFactory.

-	createReaminingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)
+	createRemainingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)

112-119: 🧹 Nitpick (assertive)

Handle partial cache insert errors more explicitly.

The function logs cache insertion errors but continues execution successfully. While this approach prevents query failures when caching fails, it could lead to degraded performance for subsequent queries if caching repeatedly fails.

Consider implementing a retry mechanism or tracking cache failures to provide better visibility, especially if cache consistency is important for performance.


141-152: 🧹 Nitpick (assertive)

Truncating the query range might be surprising for users.

The code reduces the requested To timestamp to ensure data under 24 hours old is never cached, then truncates to whole days. While this implementation works correctly, it could be unexpected for users who request specific non-day-aligned timestamps.

Consider adding documentation about this behavior, or providing a way for users to understand when and how their query boundaries will be adjusted for caching purposes.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 31d29ae and 4411cb6.

📒 Files selected for processing (6)
  • app/common/streaming.go (1 hunks)
  • app/config/aggregation.go (3 hunks)
  • app/config/config_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/connector.go (7 hunks)
🔇 Additional comments (17)
app/common/streaming.go (1)

45-47: Well-structured parameter addition for query cache

The addition of query cache parameters is well-organized and follows the existing code pattern. The settings are properly passed from configuration to the connector.

app/config/config_test.go (1)

137-141: Good test setup for query cache configuration

The test configuration includes appropriate default values for the new query cache settings, with caching disabled by default and reasonable duration thresholds that match the PR objective of caching data older than 24 hours.

app/config/aggregation.go (3)

66-68: Well-structured addition of query cache configuration

The QueryCache field is a clean addition to the AggregationConfiguration structure.


110-134: Proper validation for query cache configuration

The implementation includes appropriate validation logic that ensures both duration parameters are greater than zero when caching is enabled. This prevents invalid configurations.


212-216: Sensible default configuration for query cache

The default values are reasonable and match the requirements:

  • Disabled by default (safe option)
  • 72h minimum query period
  • 24h minimum usage age

This aligns with the PR objective for caching rows that are older than 24 hours.

openmeter/streaming/clickhouse/raw_events/cache.go (1)

207-220: Good table creation implementation

The table creation function is well-structured and handles errors appropriately, ensuring the cache system has the necessary storage.

openmeter/streaming/clickhouse/raw_events/cache_test.go (3)

22-39: Mock connector approach
This helper function effectively sets up a mock environment for testing. No immediate correctness or design concerns observed.


41-155: Add coverage for missing aggregation types
Tests for sum, count, min, max, and unique count are good. If future aggregator types (e.g., average) are relevant, consider adding them.


378-407: LGTM for remainingQueryMeterFactory
This test reads well, and the scenario is clear. No immediate improvements needed.

openmeter/streaming/clickhouse/raw_events/connector.go (8)

41-41: New configuration field
Enabling or disabling query caching via a distinct field is clear and maintains separation of concerns.


131-131: QueryMeter signature
This public method signature aligns cleanly with existing patterns. No issues identified.


140-144: Deterministic group-by ordering
Sorting group-by columns ensures consistent SQL generation. This is a good strategy to improve reproducible test outputs.


145-157: Populating queryMeter
This struct assembly is straightforward and maintains clarity by grouping relevant fields together.


159-163: Conditional caching
Storing the result in a cached slice is a clean approach. The logic for useCache is well placed.


179-190: Progress-tracking path
Delegating to queryMeterWithProgress for clientID-based queries is a neat separation, enabling specialized logic without cluttering the basic query path.


192-195: Merging fresh and cached rows
Merging is done only when caching is active—straightforward and efficient approach.


496-527: Logging sensitive data
Beware of logging full SQL queries and arguments in production. This can risk sensitive data leaks, as previously noted.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 31d29ae and 4411cb6.

📒 Files selected for processing (6)
  • app/common/streaming.go (1 hunks)
  • app/config/aggregation.go (3 hunks)
  • app/config/config_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/connector.go (7 hunks)
🔇 Additional comments (18)
app/config/config_test.go (1)

137-141: LGTM: New query cache configuration added with sensible defaults.

The QueryCache configuration is properly added to the AggregationConfiguration with appropriate defaults:

  • Disabled by default (safe option)
  • MinimumCacheableQueryPeriod of 72 hours
  • MinimumCacheableUsageAge of 24 hours

These values align with the defaults from the configuration setup in aggregation.go.

app/common/streaming.go (1)

37-47: Configuration parameters properly passed to the ClickHouse connector.

The code successfully passes the query cache configuration parameters from the aggregation config to the raw_events.ConnectorConfig, maintaining consistent formatting with the existing parameters.

app/config/aggregation.go (3)

66-68: New query cache configuration field properly added to AggregationConfiguration.

The QueryCache field is appropriately added to the AggregationConfiguration struct with clear documentation.


110-134: Well-implemented query cache configuration structure with proper validation.

The AggregationQueryCacheConfiguration struct is well-designed with:

  • Clear field documentation
  • Proper validation logic that only validates MinimumCacheableQueryPeriod and MinimumCacheableUsageAge when caching is enabled
  • Appropriate error messages for invalid configurations

212-216: Default configuration properly set for query caching.

The default values are sensible:

  • Disabled by default (safe option)
  • 72h minimum cacheable query period
  • 24h minimum cacheable usage age

These settings are consistent with the test configuration in config_test.go.

openmeter/streaming/clickhouse/raw_events/cache.go (8)

15-42: Well-implemented query cachability check with comprehensive conditions.

The isQueryCachable function correctly checks all necessary conditions:

  • Caching is enabled in configuration
  • The query is explicitly marked as cachable
  • A "from" time is specified
  • Query duration exceeds the minimum cacheable period
  • Aggregation type is supported for caching

44-57: Clean implementation of the remaining query meter factory.

The factory pattern used here is elegant and helps maintain separation of concerns between cached and fresh data queries.


67-67: Fix typographical error in the local variable name.

The variable createReaminingQueryMeter is misspelled and inconsistent with the function name remainingQueryMeterFactory.

-	createReaminingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)
+	createRemainingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)

59-125: Comprehensive implementation of cached query handling.

The queryMeterCached function effectively:

  1. Looks up cached rows
  2. Queries new rows for uncached periods
  3. Caches new results
  4. Merges and returns combined data

The error handling is well-implemented, allowing the query to continue even if cache operations fail.


127-159: Careful handling of query time ranges for caching.

The getQueryMeterForCachedPeriod function appropriately:

  • Ensures "from" time is present
  • Sets default values for missing parameters
  • Avoids caching data that is less than 24 hours old
  • Truncates to complete days to avoid partial days in cache

This approach ensures predictable and efficient caching behavior.


161-187: Efficient implementation of cache lookup.

The lookupCachedMeterRows function properly constructs and executes the query to retrieve cached data, with appropriate error handling and resource cleanup with deferred row closure.


189-205: Clean implementation of cache insertion.

The insertRowsToCache function correctly handles the insertion of new rows into the cache with proper error handling.


207-220: Proper initialization of the cache table.

The createMeterQueryCacheTable function ensures the cache table exists in the database, with appropriate error handling. This initialization is crucial for the caching mechanism to work correctly.

openmeter/streaming/clickhouse/raw_events/cache_test.go (2)

22-39: Mock connector setup looks good.
This approach is straightforward and clearly outlines test configurations. No immediate issues found here.


41-155: Expand test coverage for additional aggregations if relevant.
These tests cover sum, count, min, max, and uniqueCount. If future aggregator types (e.g., average) become relevant, consider adding corresponding test cases for completeness.

openmeter/streaming/clickhouse/raw_events/connector.go (3)

41-45: New caching fields are well-structured.
Introducing these config flags for query caching is consistent and enhances extensibility of the connector’s configuration.


169-173: Graceful handling of cache errors.
Returning partially cached rows upon error is a sensible fallback. You might log the partial usage explicitly for better observability.


518-519: Sensitive query data may leak in logs.
Logging the full SQL statement and parameter values can reveal sensitive information. Consider redacting or minimizing these values in production.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4411cb6 and f898670.

📒 Files selected for processing (6)
  • openmeter/streaming/clickhouse/raw_events/cache.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachequery.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachequery_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/connector.go (8 hunks)
  • openmeter/streaming/clickhouse/raw_events/connector_test.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: E2E
  • GitHub Check: CI
  • GitHub Check: Analyze (go)
🔇 Additional comments (32)
openmeter/streaming/clickhouse/raw_events/connector_test.go (1)

186-212: Validates scenario with disabled cache; looks good!
This test clearly confirms that no invalidation occurs when cache is disabled. No issues found.

openmeter/streaming/clickhouse/raw_events/cachequery_test.go (3)

15-38: Table creation SQL looks correct and complete
The generated SQL includes the necessary columns and engine specs, matching the caching needs. No concerns here.


40-84: Multiple-row insertion test recommended
You are currently inserting only one MeterQueryRow. Testing bulk inserts for multiple rows ensures that the SQL generation and parameter binding remain correct under more complex scenarios.


85-179: SQL generation for retrieving meter query rows is versatile
The test covers “from/to” permutations thoroughly. Consider adding a sub-test that explicitly asserts behavior when no rows exist in the cache (empty result scenario).

openmeter/streaming/clickhouse/raw_events/connector.go (9)

26-26: Typographical consistency in doc comment
The doc string has mismatched quotes around the interface name. This was previously flagged and remains unaddressed.


41-45: Clear caching configuration fields
Introducing QueryCacheEnabled, QueryCacheMinimumCacheableQueryPeriod, and QueryCacheMinimumCacheableUsageAge clarifies intended caching behavior. This is well structured.


69-78: Revisit edge cases with cache durations
Although you require both durations to be > 0, also consider how extremely large or near-zero values might behave to avoid unintended caching logic.


96-98: Create cache table with 'IF NOT EXISTS'
For repeated connector initializations, ensure the table creation query includes IF NOT EXISTS or handles existence checks gracefully.


131-157: Parameter struct extraction is neat
Building a separate queryMeter struct promotes clarity. Verify that all call sites now pass the correct parameters.


159-173: Graceful fallback on cache errors
Returning partial cached rows is valid as a fallback. Logging partial usage more explicitly might aid debugging.


175-196: Fresh query and merge logic
Merging cached rows with fresh query results is a key aspect of the caching flow. This block is straightforward and appears robust.


484-505: Progress tracking is well integrated
The code provides progress tracking while still returning results. Handling partial progress updates if an error occurs is a fair fallback.


507-539: Potential sensitive data exposure in logs
Logging SQL and arguments can risk leaking PII or other sensitive info. Consider redacting these fields in production.

openmeter/streaming/clickhouse/raw_events/cache.go (8)

15-42: Consider extending aggregator coverage.

This function only supports Sum, Count, Min, and Max. If future aggregations like Average or UniqueCount are needed, consider adding them here.


44-57: Looks good!

The closure introduced by remainingQueryMeterFactory seems clean and straightforward, preserving relevant context while transitioning to the fresh rows.


59-125: Partial cache insert error handling.

The code logs failures to insert new rows without failing the query flow, which is often acceptable. However, if cache consistency is critical, consider implementing a fallback or retry mechanism.


128-159: Document or log truncated query range.

Truncating the To timestamp to avoid caching recent data and partial days may surprise users. Adding a comment or log message clarifying this behavior helps them understand why the query range is adjusted.


161-187: Looks correct and consistent.

Retrieving cache rows via lookupCachedMeterRows is straightforward, and errors are properly handled.


207-220: Consider configurable TTL.

The cache table creation is hard-coded to 30 days in createMeterQueryRowsCacheTable. Exposing this TTL as a configuration parameter could be valuable for different environments.


222-241: Nice reuse of built-in structures!

The function findNamespacesToInvalidateCache effectively de-duplicates namespaces and applies the time threshold correctly.


243-262: Straightforward deletion logic.

invalidateCache cleanly handles the scenario where caching is disabled and performs direct deletions otherwise.

openmeter/streaming/clickhouse/raw_events/cachequery.go (3)

14-46: Consider making the 30-day TTL configurable.

In createMeterQueryRowsCacheTable.toSQL(), the TTL is currently set to 30 days. Exposing it as a configurable setting allows more flexibility.


120-157: Clean row scanning logic.

scanRows() method is clear and robust, properly handling optional strings in groupBy and subject.


159-176: Query deletion is well-structured.

Deleting cache rows by namespace with the deleteCacheForNamespaces struct is straightforward and scalable.

openmeter/streaming/clickhouse/raw_events/cache_test.go (8)

43-157: Comprehensive coverage of aggregator constraints.

TestIsQueryCachable provides a thorough table-driven approach for each aggregator scenario and time boundary.


159-282: Add empty cache scenario test.

You're already covering cached + fresh rows. For completeness, consider adding a case with zero cached rows to confirm correct fallback logic.


284-344: Good coverage for lookupCachedMeterRows.

The test properly validates scanning a single row. You might extend coverage to multiple rows if relevant.


346-378: Bulk insertion looks fine.

TestConnector_InsertRowsToCache checks insertion flow. Consider a test with multiple rows for batch coverage.


380-409: Implementation is concise.

TestRemainingQueryMeterFactory confirm that fromExclusive is set properly. Looks straightforward.


411-513: Expand edge-case coverage.

TestGetQueryMeterForCachedPeriod strongly validates truncated days and usage thresholds. Consider verifying from > to or crossing day boundaries.


515-541: Correct and minimal.

TestInvalidateCache effectively tests single, multiple, and error scenarios for cache invalidation.


543-649: Thorough tests for namespace invalidation.

TestFindNamespacesToInvalidateCache covers multiple permutations of stale vs. fresh events, as well as deduplication.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/processor release-note/feature Release note: Exciting New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant