Skip to content

Commit

Permalink
feat(config): unify streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike committed Oct 21, 2024
1 parent aca4e7b commit ae3da9b
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 75 deletions.
44 changes: 20 additions & 24 deletions app/common/openmeter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,46 +52,42 @@ func NewClickHouseStreamingConnector(
switch conf.Engine {
case config.AggregationEngineClickHouseRaw:
connector, err = clickhouse_connector_raw.NewClickhouseConnector(ctx, clickhouse_connector_raw.ClickhouseConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
Logger: logger,

// TODO: add insert related config after moved from sink config
// AsyncInsert: conf.Aggregation.AsyncInsert,
// AsyncInsertWait: conf.Aggregation.AsyncInsertWait,
// InsertQuerySettings: conf.Aggregation.QuerySettings,
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse raw engine: %w", err)
}

case config.AggregationEngineClickHouseMV:
connector, err = clickhouse_connector.NewClickhouseConnector(ctx, clickhouse_connector.ClickhouseConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
Logger: logger,
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,

Meters: meterRepository,
PopulateMeter: conf.PopulateMeter,
CreateOrReplaceMeter: conf.CreateOrReplaceMeter,
QueryRawEvents: conf.QueryRawEvents,

// TODO: add insert related config after moved from sink config
// AsyncInsert: conf.Aggregation.AsyncInsert,
// AsyncInsertWait: conf.Aggregation.AsyncInsertWait,
// InsertQuerySettings: conf.Aggregation.QuerySettings,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse mv engine: %w", err)
}
case config.AggregationEngineClickHouseParse:
connector, err = clickhouse_connector_parse.NewClickhouseConnector(ctx, clickhouse_connector_parse.ClickhouseConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
Logger: logger,

// TODO: add insert related config after moved from sink config
// AsyncInsert: conf.Aggregation.AsyncInsert,
// AsyncInsertWait: conf.Aggregation.AsyncInsertWait,
// InsertQuerySettings: conf.Aggregation.QuerySettings,
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse parse engine: %w", err)
Expand Down
23 changes: 23 additions & 0 deletions app/config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ type AggregationConfiguration struct {
// Engine is the aggregation engine to use
Engine AggregationEngine
ClickHouse ClickHouseAggregationConfiguration

// Set true for ClickHouse first store the incoming inserts into an in-memory buffer
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediatelyy
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool

// See https://clickhouse.com/docs/en/operations/settings/settings
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
InsertQuerySettings map[string]string

// Engine specific options

// Populate creates the materialized view with data from the events table
// This is not safe to use in production as requires to stop ingestion
PopulateMeter bool
Expand All @@ -61,6 +78,10 @@ func (c AggregationConfiguration) Validate() error {
return fmt.Errorf("engine: %w", err)
}

if c.AsyncInsertWait && !c.AsyncInsert {
return errors.New("async insert wait is set but async insert is not")
}

// Validate engine specific options
if c.Engine != AggregationEngineClickHouseMV {
if c.PopulateMeter {
Expand Down Expand Up @@ -151,6 +172,8 @@ func (c ClickHouseAggregationConfiguration) GetClientOptions() *clickhouse.Optio
// ConfigureAggregation configures some defaults in the Viper instance.
func ConfigureAggregation(v *viper.Viper) {
v.SetDefault("aggregation.engine", AggregationEngineClickHouseMV)
v.SetDefault("aggregation.asyncInsert", false)
v.SetDefault("aggregation.asyncInsertWait", false)

v.SetDefault("aggregation.clickhouse.address", "127.0.0.1:9000")
v.SetDefault("aggregation.clickhouse.tls", false)
Expand Down
2 changes: 2 additions & 0 deletions app/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type SinkConfiguration struct {
IngestNotifications IngestNotificationsConfiguration
// Kafka client/Consumer configuration
Kafka KafkaConfig
// TODO: remove, config moved to aggregation config
// Storage configuration
Storage StorageConfiguration
}
Expand Down Expand Up @@ -137,6 +138,7 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.drainTimeout", "10s")
v.SetDefault("sink.ingestNotifications.maxEventsInBatch", 500)

// TODO: remove, config moved to aggregation config
// Sink Storage
v.SetDefault("sink.storage.asyncInsert", false)
v.SetDefault("sink.storage.asyncInsertWait", false)
Expand Down
67 changes: 16 additions & 51 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"

"github.com/openmeterio/openmeter/app/common"
"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/dedupe"
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver"
"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/openmeter/sink"
"github.com/openmeterio/openmeter/openmeter/sink/flushhandler"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector_parse"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector_raw"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
)

Expand Down Expand Up @@ -143,54 +140,22 @@ func initSink(ctx context.Context, conf config.Configuration, logger *slog.Logge
return nil, fmt.Errorf("init clickhouse client: %w", err)
}

// Initialize streaming connector
var (
streaming streaming.Connector
)

switch conf.Aggregation.Engine {
case config.AggregationEngineClickHouseRaw:
streaming, err = clickhouse_connector_raw.NewClickhouseConnector(ctx, clickhouse_connector_raw.ClickhouseConnectorConfig{
ClickHouse: clickhouseClient,
Database: conf.Aggregation.ClickHouse.Database,
Logger: logger,
AsyncInsert: conf.Sink.Storage.AsyncInsert,
AsyncInsertWait: conf.Sink.Storage.AsyncInsertWait,
InsertQuerySettings: conf.Sink.Storage.QuerySettings,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse raw engine: %w", err)
}
// Temporary: copy over sink storage settings
// TODO: remove after config migration is over
if conf.Sink.Storage.AsyncInsert {
conf.Aggregation.AsyncInsert = conf.Sink.Storage.AsyncInsert
}
if conf.Sink.Storage.AsyncInsertWait {
conf.Aggregation.AsyncInsertWait = conf.Sink.Storage.AsyncInsertWait
}
if conf.Sink.Storage.QuerySettings != nil {
conf.Aggregation.InsertQuerySettings = conf.Sink.Storage.QuerySettings
}

case config.AggregationEngineClickHouseMV:
streaming, err = clickhouse_connector.NewClickhouseConnector(ctx, clickhouse_connector.ClickhouseConnectorConfig{
ClickHouse: clickhouseClient,
Database: conf.Aggregation.ClickHouse.Database,
Logger: logger,
PopulateMeter: conf.Aggregation.PopulateMeter,
CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter,
QueryRawEvents: conf.Aggregation.QueryRawEvents,
AsyncInsert: conf.Sink.Storage.AsyncInsert,
AsyncInsertWait: conf.Sink.Storage.AsyncInsertWait,
InsertQuerySettings: conf.Sink.Storage.QuerySettings,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse mv engine: %w", err)
}
case config.AggregationEngineClickHouseParse:
streaming, err = clickhouse_connector_parse.NewClickhouseConnector(ctx, clickhouse_connector_parse.ClickhouseConnectorConfig{
ClickHouse: clickhouseClient,
Database: conf.Aggregation.ClickHouse.Database,
Logger: logger,
AsyncInsert: conf.Sink.Storage.AsyncInsert,
AsyncInsertWait: conf.Sink.Storage.AsyncInsertWait,
InsertQuerySettings: conf.Sink.Storage.QuerySettings,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse parse engine: %w", err)
}
default:
return nil, fmt.Errorf("invalid aggregation engine: %s", conf.Aggregation.Engine)
// Initialize streaming connector
streaming, err := common.NewClickHouseStreamingConnector(ctx, conf.Aggregation, clickhouseClient, meterRepository, logger)
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming connector: %w", err)
}

// Initialize deduplicator if enabled
Expand Down

0 comments on commit ae3da9b

Please sign in to comment.