Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): raw event connector #1720

Merged
merged 3 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 48 additions & 15 deletions app/common/openmeter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/openmeterio/openmeter/openmeter/namespace"
"github.com/openmeterio/openmeter/openmeter/sink/flushhandler"
"github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/materialized_view"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/raw_events"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/driver/noop"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
Expand All @@ -34,25 +36,56 @@ func NewMeterRepository(meters []*models.Meter) *meter.InMemoryRepository {
return meter.NewInMemoryRepository(slicesx.Map(meters, lo.FromPtr[models.Meter]))
}

func NewClickHouseStreamingConnector(
func NewStreamingConnector(
ctx context.Context,
conf config.AggregationConfiguration,
clickHouse clickhouse.Conn,
meterRepository meter.Repository,
logger *slog.Logger,
) (*clickhouse_connector.ClickhouseConnector, error) {
streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: conf.CreateOrReplaceMeter,
PopulateMeter: conf.PopulateMeter,
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
) (streaming.Connector, error) {
var (
connector streaming.Connector
err error
)

switch conf.Engine {
case config.AggregationEngineClickHouseRaw:
connector, err = raw_events.NewConnector(ctx, raw_events.ConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
EventsTableName: conf.EventsTableName,
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse raw engine: %w", err)
}

case config.AggregationEngineClickHouseMV:
connector, err = materialized_view.NewConnector(ctx, materialized_view.ConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
EventsTableName: conf.EventsTableName,
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,

Meters: meterRepository,
PopulateMeter: conf.PopulateMeter,
CreateOrReplaceMeter: conf.CreateOrReplaceMeter,
QueryRawEvents: conf.QueryRawEvents,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse mv engine: %w", err)
}
default:
return nil, fmt.Errorf("invalid aggregation engine: %s", conf.Engine)
}

return streamingConnector, nil
return connector, nil
}

func NewNamespacedTopicResolver(config config.Configuration) (*topicresolver.NamespacedTopicResolver, error) {
Expand Down Expand Up @@ -135,7 +168,7 @@ func NewKafkaNamespaceHandler(

func NewNamespaceHandlers(
kafkaHandler *kafkaingest.NamespaceHandler,
clickHouseHandler *clickhouse_connector.ClickhouseConnector,
clickHouseHandler streaming.Connector,
) []namespace.Handler {
return []namespace.Handler{
kafkaHandler,
Expand Down
5 changes: 1 addition & 4 deletions app/common/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver"
"github.com/openmeterio/openmeter/openmeter/meter"
registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/router"
)
Expand Down Expand Up @@ -106,8 +104,7 @@ var OpenMeter = wire.NewSet(
NewMeterRepository,
wire.Bind(new(meter.Repository), new(*meter.InMemoryRepository)),

NewClickHouseStreamingConnector,
wire.Bind(new(streaming.Connector), new(*clickhouse_connector.ClickhouseConnector)),
NewStreamingConnector,

NewNamespacedTopicResolver,
wire.Bind(new(topicresolver.Resolver), new(*topicresolver.NamespacedTopicResolver)),
Expand Down
80 changes: 80 additions & 0 deletions app/config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,64 @@ import (
"crypto/tls"
"errors"
"fmt"
"slices"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/spf13/viper"
)

type AggregationEngine string

const (
// Raw engine queries the raw events table
AggregationEngineClickHouseRaw AggregationEngine = "clickhouse_raw"
// MV engine maintains and queries materialized views
AggregationEngineClickHouseMV AggregationEngine = "clickhouse_mv"
)

func (e AggregationEngine) Values() []AggregationEngine {
return []AggregationEngine{AggregationEngineClickHouseRaw, AggregationEngineClickHouseMV}
}

func (e AggregationEngine) Validate() error {
if !slices.Contains(e.Values(), e) {
return fmt.Errorf("invalid value")
}
return nil
}

type AggregationConfiguration struct {
// Engine is the aggregation engine to use
Engine AggregationEngine
ClickHouse ClickHouseAggregationConfiguration
hekike marked this conversation as resolved.
Show resolved Hide resolved

EventsTableName string

// Set true for ClickHouse first store the incoming inserts into an in-memory buffer
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediately
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool

// See https://clickhouse.com/docs/en/operations/settings/settings
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
InsertQuerySettings map[string]string

// Engine specific options

// Populate creates the materialized view with data from the events table
// This is not safe to use in production as requires to stop ingestion
PopulateMeter bool
// CreateOrReplace is used to force the recreation of the materialized view
// This is not safe to use in production as it will drop the existing views
CreateOrReplaceMeter bool
// QueryRawEvents is used to query the raw events table instead of the materialized view
QueryRawEvents bool
}

// Validate validates the configuration.
Expand All @@ -26,6 +70,37 @@ func (c AggregationConfiguration) Validate() error {
return fmt.Errorf("clickhouse: %w", err)
}

if c.Engine == "" {
return errors.New("engine is required")
}

if err := c.Engine.Validate(); err != nil {
return fmt.Errorf("engine: %w", err)
}

if c.EventsTableName == "" {
return errors.New("events table is required")
}

if c.AsyncInsertWait && !c.AsyncInsert {
return errors.New("async insert wait is set but async insert is not")
}

// Validate engine specific options
if c.Engine != AggregationEngineClickHouseMV {
if c.PopulateMeter {
return errors.New("populate meter is only supported with materialized view engine")
}

if c.CreateOrReplaceMeter {
return errors.New("create or replace meter is only with materialized view engine")
}

if c.QueryRawEvents {
return errors.New("query raw events is only with materialized view engine")
}
}

return nil
}

Expand Down Expand Up @@ -100,6 +175,11 @@ func (c ClickHouseAggregationConfiguration) GetClientOptions() *clickhouse.Optio

// ConfigureAggregation configures some defaults in the Viper instance.
func ConfigureAggregation(v *viper.Viper) {
v.SetDefault("aggregation.engine", AggregationEngineClickHouseMV)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably we should change default to raw events

v.SetDefault("aggregation.eventsTableName", "om_events")
v.SetDefault("aggregation.asyncInsert", false)
v.SetDefault("aggregation.asyncInsertWait", false)

v.SetDefault("aggregation.clickhouse.address", "127.0.0.1:9000")
v.SetDefault("aggregation.clickhouse.tls", false)
v.SetDefault("aggregation.clickhouse.database", "openmeter")
Expand Down
4 changes: 4 additions & 0 deletions app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func TestComplete(t *testing.T) {
ConnMaxLifetime: 10 * time.Minute,
BlockBufferSize: 10,
},
Engine: AggregationEngineClickHouseMV,
EventsTableName: "om_events",
AsyncInsert: false,
AsyncInsertWait: false,
},
Sink: SinkConfiguration{
GroupId: "openmeter-sink-worker",
Expand Down
4 changes: 3 additions & 1 deletion app/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type SinkConfiguration struct {
IngestNotifications IngestNotificationsConfiguration
// Kafka client/Consumer configuration
Kafka KafkaConfig
// TODO: remove, config moved to aggregation config
// Storage configuration
Storage StorageConfiguration

Expand Down Expand Up @@ -102,7 +103,7 @@ type StorageConfiguration struct {
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediatelyy
// Set true if you want an insert statement to return with an acknowledgment immediately
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool
Expand Down Expand Up @@ -154,6 +155,7 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.namespaceRefetchTimeout", "10s")
v.SetDefault("sink.namespaceTopicRegexp", "^om_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$")

// TODO: remove, config moved to aggregation config
// Sink Storage
v.SetDefault("sink.storage.asyncInsert", false)
v.SetDefault("sink.storage.asyncInsertWait", false)
Expand Down
4 changes: 2 additions & 2 deletions cmd/balance-worker/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions cmd/jobs/entitlement/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/app/common"
"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/openmeter/registry"
registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
Expand Down Expand Up @@ -50,14 +50,7 @@ func initEntitlements(ctx context.Context, conf config.Configuration, logger *sl
return nil, fmt.Errorf("failed to initialize clickhouse client: %w", err)
}

streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Database: conf.Aggregation.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter,
PopulateMeter: conf.Aggregation.PopulateMeter,
})
streamingConnector, err := common.NewStreamingConnector(ctx, conf.Aggregation, clickHouseClient, meterRepository, logger)
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/notification-service/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func main() {
})

for _, meter := range conf.Meters {
err := app.StreamingConnector.CreateMeter(ctx, app.NamespaceManager.GetDefaultNamespace(), meter)
err := app.StreamingConnector.CreateMeter(ctx, app.NamespaceManager.GetDefaultNamespace(), *meter)
if err != nil {
slog.Warn("failed to initialize meter", "error", err)
os.Exit(1)
Expand Down
6 changes: 3 additions & 3 deletions cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading