Skip to content

Commit

Permalink
feat(connector): add raw events
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike committed Nov 15, 2024
1 parent 1f15210 commit df8f6eb
Show file tree
Hide file tree
Showing 42 changed files with 2,584 additions and 1,333 deletions.
63 changes: 48 additions & 15 deletions app/common/openmeter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/openmeterio/openmeter/openmeter/namespace"
"github.com/openmeterio/openmeter/openmeter/sink/flushhandler"
"github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/materialized_view"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/raw_events"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/driver/noop"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
Expand All @@ -34,25 +36,56 @@ func NewMeterRepository(meters []*models.Meter) *meter.InMemoryRepository {
return meter.NewInMemoryRepository(slicesx.Map(meters, lo.FromPtr[models.Meter]))
}

func NewClickHouseStreamingConnector(
func NewStreamingConnector(
ctx context.Context,
conf config.AggregationConfiguration,
clickHouse clickhouse.Conn,
meterRepository meter.Repository,
logger *slog.Logger,
) (*clickhouse_connector.ClickhouseConnector, error) {
streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: conf.CreateOrReplaceMeter,
PopulateMeter: conf.PopulateMeter,
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
) (streaming.Connector, error) {
var (
connector streaming.Connector
err error
)

switch conf.Engine {
case config.AggregationEngineClickHouseRaw:
connector, err = raw_events.NewConnector(ctx, raw_events.ConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
EventsTableName: conf.EventsTableName,
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse raw engine: %w", err)
}

case config.AggregationEngineClickHouseMV:
connector, err = materialized_view.NewConnector(ctx, materialized_view.ConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
EventsTableName: conf.EventsTableName,
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,

Meters: meterRepository,
PopulateMeter: conf.PopulateMeter,
CreateOrReplaceMeter: conf.CreateOrReplaceMeter,
QueryRawEvents: conf.QueryRawEvents,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse mv engine: %w", err)
}
default:
return nil, fmt.Errorf("invalid aggregation engine: %s", conf.Engine)
}

return streamingConnector, nil
return connector, nil
}

func NewNamespacedTopicResolver(config config.Configuration) (*topicresolver.NamespacedTopicResolver, error) {
Expand Down Expand Up @@ -135,7 +168,7 @@ func NewKafkaNamespaceHandler(

func NewNamespaceHandlers(
kafkaHandler *kafkaingest.NamespaceHandler,
clickHouseHandler *clickhouse_connector.ClickhouseConnector,
clickHouseHandler streaming.Connector,
) []namespace.Handler {
return []namespace.Handler{
kafkaHandler,
Expand Down
5 changes: 1 addition & 4 deletions app/common/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver"
"github.com/openmeterio/openmeter/openmeter/meter"
registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/router"
)
Expand Down Expand Up @@ -102,8 +100,7 @@ var OpenMeter = wire.NewSet(
NewMeterRepository,
wire.Bind(new(meter.Repository), new(*meter.InMemoryRepository)),

NewClickHouseStreamingConnector,
wire.Bind(new(streaming.Connector), new(*clickhouse_connector.ClickhouseConnector)),
NewStreamingConnector,

NewNamespacedTopicResolver,
wire.Bind(new(topicresolver.Resolver), new(*topicresolver.NamespacedTopicResolver)),
Expand Down
80 changes: 80 additions & 0 deletions app/config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,64 @@ import (
"crypto/tls"
"errors"
"fmt"
"slices"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/spf13/viper"
)

type AggregationEngine string

const (
// Raw engine queries the raw events table
AggregationEngineClickHouseRaw AggregationEngine = "clickhouse_raw"
// MV engine maintains and queries materialized views
AggregationEngineClickHouseMV AggregationEngine = "clickhouse_mv"
)

func (e AggregationEngine) Values() []AggregationEngine {
return []AggregationEngine{AggregationEngineClickHouseRaw, AggregationEngineClickHouseMV}
}

func (e AggregationEngine) Validate() error {
if !slices.Contains(e.Values(), e) {
return fmt.Errorf("invalid value")
}
return nil
}

type AggregationConfiguration struct {
// Engine is the aggregation engine to use
Engine AggregationEngine
ClickHouse ClickHouseAggregationConfiguration

EventsTableName string

// Set true for ClickHouse first store the incoming inserts into an in-memory buffer
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediately
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool

// See https://clickhouse.com/docs/en/operations/settings/settings
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
InsertQuerySettings map[string]string

// Engine specific options

// Populate creates the materialized view with data from the events table
// This is not safe to use in production as requires to stop ingestion
PopulateMeter bool
// CreateOrReplace is used to force the recreation of the materialized view
// This is not safe to use in production as it will drop the existing views
CreateOrReplaceMeter bool
// QueryRawEvents is used to query the raw events table instead of the materialized view
QueryRawEvents bool
}

// Validate validates the configuration.
Expand All @@ -26,6 +70,37 @@ func (c AggregationConfiguration) Validate() error {
return fmt.Errorf("clickhouse: %w", err)
}

if c.Engine == "" {
return errors.New("engine is required")
}

if err := c.Engine.Validate(); err != nil {
return fmt.Errorf("engine: %w", err)
}

if c.EventsTableName == "" {
return errors.New("events table is required")
}

if c.AsyncInsertWait && !c.AsyncInsert {
return errors.New("async insert wait is set but async insert is not")
}

// Validate engine specific options
if c.Engine != AggregationEngineClickHouseMV {
if c.PopulateMeter {
return errors.New("populate meter is only supported with materialized view engine")
}

if c.CreateOrReplaceMeter {
return errors.New("create or replace meter is only with materialized view engine")
}

if c.QueryRawEvents {
return errors.New("query raw events is only with materialized view engine")
}
}

return nil
}

Expand Down Expand Up @@ -100,6 +175,11 @@ func (c ClickHouseAggregationConfiguration) GetClientOptions() *clickhouse.Optio

// ConfigureAggregation configures some defaults in the Viper instance.
func ConfigureAggregation(v *viper.Viper) {
v.SetDefault("aggregation.engine", AggregationEngineClickHouseMV)
v.SetDefault("aggregation.eventsTableName", "om_events")
v.SetDefault("aggregation.asyncInsert", false)
v.SetDefault("aggregation.asyncInsertWait", false)

v.SetDefault("aggregation.clickhouse.address", "127.0.0.1:9000")
v.SetDefault("aggregation.clickhouse.tls", false)
v.SetDefault("aggregation.clickhouse.database", "openmeter")
Expand Down
4 changes: 4 additions & 0 deletions app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func TestComplete(t *testing.T) {
ConnMaxLifetime: 10 * time.Minute,
BlockBufferSize: 10,
},
Engine: AggregationEngineClickHouseMV,
EventsTableName: "om_events",
AsyncInsert: false,
AsyncInsertWait: false,
},
Sink: SinkConfiguration{
GroupId: "openmeter-sink-worker",
Expand Down
4 changes: 3 additions & 1 deletion app/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type SinkConfiguration struct {
IngestNotifications IngestNotificationsConfiguration
// Kafka client/Consumer configuration
Kafka KafkaConfig
// TODO: remove, config moved to aggregation config
// Storage configuration
Storage StorageConfiguration

Expand Down Expand Up @@ -102,7 +103,7 @@ type StorageConfiguration struct {
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediatelyy
// Set true if you want an insert statement to return with an acknowledgment immediately
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool
Expand Down Expand Up @@ -154,6 +155,7 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.namespaceRefetchTimeout", "10s")
v.SetDefault("sink.namespaceTopicRegexp", "^om_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$")

// TODO: remove, config moved to aggregation config
// Sink Storage
v.SetDefault("sink.storage.asyncInsert", false)
v.SetDefault("sink.storage.asyncInsertWait", false)
Expand Down
4 changes: 2 additions & 2 deletions cmd/balance-worker/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions cmd/jobs/entitlement/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/app/common"
"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/openmeter/registry"
registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
Expand Down Expand Up @@ -50,14 +50,7 @@ func initEntitlements(ctx context.Context, conf config.Configuration, logger *sl
return nil, fmt.Errorf("failed to initialize clickhouse client: %w", err)
}

streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Database: conf.Aggregation.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter,
PopulateMeter: conf.Aggregation.PopulateMeter,
})
streamingConnector, err := common.NewStreamingConnector(ctx, conf.Aggregation, clickHouseClient, meterRepository, logger)
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/notification-service/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func main() {
})

for _, meter := range conf.Meters {
err := app.StreamingConnector.CreateMeter(ctx, app.NamespaceManager.GetDefaultNamespace(), meter)
err := app.StreamingConnector.CreateMeter(ctx, app.NamespaceManager.GetDefaultNamespace(), *meter)
if err != nil {
slog.Warn("failed to initialize meter", "error", err)
os.Exit(1)
Expand Down
6 changes: 3 additions & 3 deletions cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit df8f6eb

Please sign in to comment.