Skip to content

Commit 7b17e96

Browse files
committed
feat(connector): add raw events
1 parent 16e4975 commit 7b17e96

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2622
-1351
lines changed

app/common/openmeter.go

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
"github.com/openmeterio/openmeter/openmeter/namespace"
2222
"github.com/openmeterio/openmeter/openmeter/sink/flushhandler"
2323
"github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification"
24-
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
24+
"github.com/openmeterio/openmeter/openmeter/streaming"
25+
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/materialized_view"
26+
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/raw_events"
2527
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
2628
"github.com/openmeterio/openmeter/openmeter/watermill/driver/noop"
2729
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
@@ -34,25 +36,56 @@ func NewMeterRepository(meters []*models.Meter) *meter.InMemoryRepository {
3436
return meter.NewInMemoryRepository(slicesx.Map(meters, lo.FromPtr[models.Meter]))
3537
}
3638

37-
func NewClickHouseStreamingConnector(
39+
func NewStreamingConnector(
40+
ctx context.Context,
3841
conf config.AggregationConfiguration,
3942
clickHouse clickhouse.Conn,
4043
meterRepository meter.Repository,
4144
logger *slog.Logger,
42-
) (*clickhouse_connector.ClickhouseConnector, error) {
43-
streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
44-
ClickHouse: clickHouse,
45-
Database: conf.ClickHouse.Database,
46-
Meters: meterRepository,
47-
CreateOrReplaceMeter: conf.CreateOrReplaceMeter,
48-
PopulateMeter: conf.PopulateMeter,
49-
Logger: logger,
50-
})
51-
if err != nil {
52-
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
45+
) (streaming.Connector, error) {
46+
var (
47+
connector streaming.Connector
48+
err error
49+
)
50+
51+
switch conf.Engine {
52+
case config.AggregationEngineClickHouseRaw:
53+
connector, err = raw_events.NewConnector(ctx, raw_events.ConnectorConfig{
54+
ClickHouse: clickHouse,
55+
Database: conf.ClickHouse.Database,
56+
EventsTableName: conf.EventsTableName,
57+
Logger: logger,
58+
AsyncInsert: conf.AsyncInsert,
59+
AsyncInsertWait: conf.AsyncInsertWait,
60+
InsertQuerySettings: conf.InsertQuerySettings,
61+
})
62+
if err != nil {
63+
return nil, fmt.Errorf("init clickhouse raw engine: %w", err)
64+
}
65+
66+
case config.AggregationEngineClickHouseMV:
67+
connector, err = materialized_view.NewConnector(ctx, materialized_view.ConnectorConfig{
68+
ClickHouse: clickHouse,
69+
Database: conf.ClickHouse.Database,
70+
EventsTableName: conf.EventsTableName,
71+
Logger: logger,
72+
AsyncInsert: conf.AsyncInsert,
73+
AsyncInsertWait: conf.AsyncInsertWait,
74+
InsertQuerySettings: conf.InsertQuerySettings,
75+
76+
Meters: meterRepository,
77+
PopulateMeter: conf.PopulateMeter,
78+
CreateOrReplaceMeter: conf.CreateOrReplaceMeter,
79+
QueryRawEvents: conf.QueryRawEvents,
80+
})
81+
if err != nil {
82+
return nil, fmt.Errorf("init clickhouse mv engine: %w", err)
83+
}
84+
default:
85+
return nil, fmt.Errorf("invalid aggregation engine: %s", conf.Engine)
5386
}
5487

55-
return streamingConnector, nil
88+
return connector, nil
5689
}
5790

5891
func NewNamespacedTopicResolver(config config.Configuration) (*topicresolver.NamespacedTopicResolver, error) {
@@ -135,7 +168,7 @@ func NewKafkaNamespaceHandler(
135168

136169
func NewNamespaceHandlers(
137170
kafkaHandler *kafkaingest.NamespaceHandler,
138-
clickHouseHandler *clickhouse_connector.ClickhouseConnector,
171+
clickHouseHandler streaming.Connector,
139172
) []namespace.Handler {
140173
return []namespace.Handler{
141174
kafkaHandler,

app/common/wire.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ import (
1717
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver"
1818
"github.com/openmeterio/openmeter/openmeter/meter"
1919
registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder"
20-
"github.com/openmeterio/openmeter/openmeter/streaming"
21-
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
2220
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
2321
"github.com/openmeterio/openmeter/openmeter/watermill/router"
2422
)
@@ -106,8 +104,7 @@ var OpenMeter = wire.NewSet(
106104
NewMeterRepository,
107105
wire.Bind(new(meter.Repository), new(*meter.InMemoryRepository)),
108106

109-
NewClickHouseStreamingConnector,
110-
wire.Bind(new(streaming.Connector), new(*clickhouse_connector.ClickhouseConnector)),
107+
NewStreamingConnector,
111108

112109
NewNamespacedTopicResolver,
113110
wire.Bind(new(topicresolver.Resolver), new(*topicresolver.NamespacedTopicResolver)),

app/config/aggregation.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,64 @@ import (
44
"crypto/tls"
55
"errors"
66
"fmt"
7+
"slices"
78
"time"
89

910
"github.com/ClickHouse/clickhouse-go/v2"
1011
"github.com/spf13/viper"
1112
)
1213

14+
type AggregationEngine string
15+
16+
const (
17+
// Raw engine queries the raw events table
18+
AggregationEngineClickHouseRaw AggregationEngine = "clickhouse_raw"
19+
// MV engine maintains and queries materialized views
20+
AggregationEngineClickHouseMV AggregationEngine = "clickhouse_mv"
21+
)
22+
23+
func (e AggregationEngine) Values() []AggregationEngine {
24+
return []AggregationEngine{AggregationEngineClickHouseRaw, AggregationEngineClickHouseMV}
25+
}
26+
27+
func (e AggregationEngine) Validate() error {
28+
if !slices.Contains(e.Values(), e) {
29+
return fmt.Errorf("invalid value")
30+
}
31+
return nil
32+
}
33+
1334
type AggregationConfiguration struct {
35+
// Engine is the aggregation engine to use
36+
Engine AggregationEngine
1437
ClickHouse ClickHouseAggregationConfiguration
38+
39+
EventsTableName string
40+
41+
// Set true for ClickHouse first store the incoming inserts into an in-memory buffer
42+
// before flushing them regularly to disk.
43+
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
44+
AsyncInsert bool
45+
// Set true if you want an insert statement to return with an acknowledgment immediately
46+
// without waiting for the data got inserted into the buffer.
47+
// Setting true can cause silent errors that you need to monitor separately.
48+
AsyncInsertWait bool
49+
50+
// See https://clickhouse.com/docs/en/operations/settings/settings
51+
// For example, you can set the `max_insert_threads` setting to control the number of threads
52+
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
53+
InsertQuerySettings map[string]string
54+
55+
// Engine specific options
56+
1557
// Populate creates the materialized view with data from the events table
1658
// This is not safe to use in production as requires to stop ingestion
1759
PopulateMeter bool
1860
// CreateOrReplace is used to force the recreation of the materialized view
1961
// This is not safe to use in production as it will drop the existing views
2062
CreateOrReplaceMeter bool
63+
// QueryRawEvents is used to query the raw events table instead of the materialized view
64+
QueryRawEvents bool
2165
}
2266

2367
// Validate validates the configuration.
@@ -26,6 +70,37 @@ func (c AggregationConfiguration) Validate() error {
2670
return fmt.Errorf("clickhouse: %w", err)
2771
}
2872

73+
if c.Engine == "" {
74+
return errors.New("engine is required")
75+
}
76+
77+
if err := c.Engine.Validate(); err != nil {
78+
return fmt.Errorf("engine: %w", err)
79+
}
80+
81+
if c.EventsTableName == "" {
82+
return errors.New("events table is required")
83+
}
84+
85+
if c.AsyncInsertWait && !c.AsyncInsert {
86+
return errors.New("async insert wait is set but async insert is not")
87+
}
88+
89+
// Validate engine specific options
90+
if c.Engine != AggregationEngineClickHouseMV {
91+
if c.PopulateMeter {
92+
return errors.New("populate meter is only supported with materialized view engine")
93+
}
94+
95+
if c.CreateOrReplaceMeter {
96+
return errors.New("create or replace meter is only with materialized view engine")
97+
}
98+
99+
if c.QueryRawEvents {
100+
return errors.New("query raw events is only with materialized view engine")
101+
}
102+
}
103+
29104
return nil
30105
}
31106

@@ -100,6 +175,11 @@ func (c ClickHouseAggregationConfiguration) GetClientOptions() *clickhouse.Optio
100175

101176
// ConfigureAggregation configures some defaults in the Viper instance.
102177
func ConfigureAggregation(v *viper.Viper) {
178+
v.SetDefault("aggregation.engine", AggregationEngineClickHouseMV)
179+
v.SetDefault("aggregation.eventsTableName", "om_events")
180+
v.SetDefault("aggregation.asyncInsert", false)
181+
v.SetDefault("aggregation.asyncInsertWait", false)
182+
103183
v.SetDefault("aggregation.clickhouse.address", "127.0.0.1:9000")
104184
v.SetDefault("aggregation.clickhouse.tls", false)
105185
v.SetDefault("aggregation.clickhouse.database", "openmeter")

app/config/config_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ func TestComplete(t *testing.T) {
124124
ConnMaxLifetime: 10 * time.Minute,
125125
BlockBufferSize: 10,
126126
},
127+
Engine: AggregationEngineClickHouseMV,
128+
EventsTableName: "om_events",
129+
AsyncInsert: false,
130+
AsyncInsertWait: false,
127131
},
128132
Sink: SinkConfiguration{
129133
GroupId: "openmeter-sink-worker",

app/config/sink.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type SinkConfiguration struct {
2222
IngestNotifications IngestNotificationsConfiguration
2323
// Kafka client/Consumer configuration
2424
Kafka KafkaConfig
25+
// TODO: remove, config moved to aggregation config
2526
// Storage configuration
2627
Storage StorageConfiguration
2728

@@ -102,7 +103,7 @@ type StorageConfiguration struct {
102103
// before flushing them regularly to disk.
103104
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
104105
AsyncInsert bool
105-
// Set true if you want an insert statement to return with an acknowledgment immediatelyy
106+
// Set true if you want an insert statement to return with an acknowledgment immediately
106107
// without waiting for the data got inserted into the buffer.
107108
// Setting true can cause silent errors that you need to monitor separately.
108109
AsyncInsertWait bool
@@ -154,6 +155,7 @@ func ConfigureSink(v *viper.Viper) {
154155
v.SetDefault("sink.namespaceRefetchTimeout", "10s")
155156
v.SetDefault("sink.namespaceTopicRegexp", "^om_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$")
156157

158+
// TODO: remove, config moved to aggregation config
157159
// Sink Storage
158160
v.SetDefault("sink.storage.asyncInsert", false)
159161
v.SetDefault("sink.storage.asyncInsertWait", false)

cmd/balance-worker/wire_gen.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/jobs/entitlement/init.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import (
88
"github.com/ClickHouse/clickhouse-go/v2"
99
"go.opentelemetry.io/otel/metric"
1010

11+
"github.com/openmeterio/openmeter/app/common"
1112
"github.com/openmeterio/openmeter/app/config"
1213
"github.com/openmeterio/openmeter/openmeter/meter"
1314
"github.com/openmeterio/openmeter/openmeter/registry"
1415
registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder"
15-
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
1616
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
1717
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
1818
entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
@@ -50,14 +50,7 @@ func initEntitlements(ctx context.Context, conf config.Configuration, logger *sl
5050
return nil, fmt.Errorf("failed to initialize clickhouse client: %w", err)
5151
}
5252

53-
streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
54-
Logger: logger,
55-
ClickHouse: clickHouseClient,
56-
Database: conf.Aggregation.ClickHouse.Database,
57-
Meters: meterRepository,
58-
CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter,
59-
PopulateMeter: conf.Aggregation.PopulateMeter,
60-
})
53+
streamingConnector, err := common.NewStreamingConnector(ctx, conf.Aggregation, clickHouseClient, meterRepository, logger)
6154
if err != nil {
6255
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
6356
}

cmd/notification-service/wire_gen.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ func main() {
419419
})
420420

421421
for _, meter := range conf.Meters {
422-
err := app.StreamingConnector.CreateMeter(ctx, app.NamespaceManager.GetDefaultNamespace(), meter)
422+
err := app.StreamingConnector.CreateMeter(ctx, app.NamespaceManager.GetDefaultNamespace(), *meter)
423423
if err != nil {
424424
slog.Warn("failed to initialize meter", "error", err)
425425
os.Exit(1)

cmd/server/wire_gen.go

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)