Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : Execute dashboard query as part of health check #5676

Merged
merged 18 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions admin/worker/deployments_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,39 @@ func (w *Worker) deploymentHealthCheck(ctx context.Context, d *database.Deployme
for k, v := range annotations.ToMap() {
f = append(f, zap.String(k, v))
}
if health.OlapError != "" {
f = append(f, zap.String("olap_error", health.OlapError))

// log metrics view errors separately
for d, err := range health.MetricsViewErrors {
w.logger.Warn("deployment health check: metrics view error", zap.String("metrics_view", d), zap.String("error", err))
}

logAtError := false
if health.ControllerError != "" {
logAtError = true
f = append(f, zap.String("controller_error", health.ControllerError))
}
if health.OlapError != "" {
logAtError = true
f = append(f, zap.String("olap_error", health.OlapError))
}
if health.RepoError != "" {
logAtError = true
f = append(f, zap.String("repo_error", health.RepoError))
}
w.logger.Error("deployment health check: runtime instance is unhealthy", f...)
if len(health.MetricsViewErrors) > 0 {
f = append(f, zap.Int("metrics_view_errors", len(health.MetricsViewErrors)))
}
if health.ParseErrorCount > 0 {
f = append(f, zap.Int32("parse_errors", health.ParseErrorCount))
}
if health.ReconcileErrorCount > 0 {
f = append(f, zap.Int32("reconcile_errors", health.ReconcileErrorCount))
}
if logAtError {
w.logger.Error("deployment health check: instance is unhealthy", f...)
} else {
w.logger.Warn("deployment health check: instance is unhealthy", f...)
}
}
return instances, true
}
Expand All @@ -211,7 +234,7 @@ func runtimeUnhealthy(r *runtimev1.HealthResponse) bool {
}

func instanceUnhealthy(i *runtimev1.InstanceHealth) bool {
return i.OlapError != "" || i.ControllerError != "" || i.RepoError != ""
return i.OlapError != "" || i.ControllerError != "" || i.RepoError != "" || len(i.MetricsViewErrors) != 0 || i.ParseErrorCount > 0 || i.ReconcileErrorCount > 0
}

func addExpectedInstance(expectedInstances map[string][]string, d *database.Deployment) {
Expand Down
2,392 changes: 1,217 additions & 1,175 deletions proto/gen/rill/runtime/v1/api.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions proto/gen/rill/runtime/v1/api.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4331,6 +4331,16 @@ definitions:
type: string
repoError:
type: string
metricsViewErrors:
type: object
additionalProperties:
type: string
parseErrorCount:
type: integer
format: int32
reconcileErrorCount:
type: integer
format: int32
v1InstanceHealthResponse:
type: object
properties:
Expand Down
3 changes: 3 additions & 0 deletions proto/rill/runtime/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ message InstanceHealth {
string controller_error = 1;
string olap_error = 2;
string repo_error = 3;
map<string, string> metrics_view_errors = 4;
int32 parse_error_count = 5;
int32 reconcile_error_count = 6;
}

// **********
Expand Down
10 changes: 10 additions & 0 deletions runtime/drivers/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type CatalogStore interface {
UpdateModelSplitPending(ctx context.Context, modelID, splitKey string) error
UpdateModelSplitsPendingIfError(ctx context.Context, modelID string) error
DeleteModelSplits(ctx context.Context, modelID string) error

FindInstanceHealth(ctx context.Context, instanceID string) (*InstanceHealth, error)
UpsertInstanceHealth(ctx context.Context, h *InstanceHealth) error
}

// Resource is an entry in a catalog store
Expand Down Expand Up @@ -87,3 +90,10 @@ type FindModelSplitsOptions struct {
AfterIndex int
AfterKey string
}

// InstanceHealth represents the health of an instance.
type InstanceHealth struct {
InstanceID string `db:"instance_id"`
HealthJSON []byte `db:"health_json"`
UpdatedOn time.Time `db:"updated_on"`
}
9 changes: 6 additions & 3 deletions runtime/drivers/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ type configProperties struct {
// EmbedPort is the port to run Clickhouse locally (0 is random port).
EmbedPort int `mapstructure:"embed_port"`
// DataDir is the path to directory where db files will be created.
DataDir string `mapstructure:"data_dir"`
TempDir string `mapstructure:"temp_dir"`
DataDir string `mapstructure:"data_dir"`
TempDir string `mapstructure:"temp_dir"`
CanScaleToZero bool `mapstructure:"can_scale_to_zero"`
}

// Open connects to Clickhouse using std API.
Expand All @@ -116,7 +117,9 @@ func (d driver) Open(instanceID string, config map[string]any, client *activity.
return nil, errors.New("clickhouse driver can't be shared")
}

conf := &configProperties{}
conf := &configProperties{
CanScaleToZero: true,
}
err := mapstructure.WeakDecode(config, conf)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions runtime/drivers/clickhouse/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ func (c *connection) renameTable(ctx context.Context, oldName, newName, onCluste
return c.DropTable(context.Background(), oldName, false)
}

func (c *connection) MayBeScaledToZero(ctx context.Context) bool {
return c.config.CanScaleToZero
}

// acquireMetaConn gets a connection from the pool for "meta" queries like information schema (i.e. fast queries).
// It returns a function that puts the connection back in the pool (if applicable).
func (c *connection) acquireMetaConn(ctx context.Context) (*sqlx.Conn, func() error, error) {
Expand Down
4 changes: 4 additions & 0 deletions runtime/drivers/druid/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (c *connection) Execute(ctx context.Context, stmt *drivers.Statement) (*dri
return r, nil
}

func (c *connection) MayBeScaledToZero(ctx context.Context) bool {
return false
}

func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) {
if r == nil {
return nil, nil
Expand Down
8 changes: 8 additions & 0 deletions runtime/drivers/duckdb/catalogv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,11 @@ func (c *connection) UpdateModelSplitsPendingIfError(ctx context.Context, modelI
func (c *connection) DeleteModelSplits(ctx context.Context, modelID string) error {
return drivers.ErrNotImplemented
}

func (c *connection) FindInstanceHealth(ctx context.Context, instanceID string) (*drivers.InstanceHealth, error) {
return nil, drivers.ErrNotImplemented
}

func (c *connection) UpsertInstanceHealth(ctx context.Context, h *drivers.InstanceHealth) error {
return drivers.ErrNotImplemented
}
4 changes: 4 additions & 0 deletions runtime/drivers/duckdb/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ func (c *connection) RenameTable(ctx context.Context, oldName, newName string, v
return err
}

func (c *connection) MayBeScaledToZero(ctx context.Context) bool {
return false
}

func (c *connection) execIncrementalInsert(ctx context.Context, safeName, sql string, byName bool, strategy drivers.IncrementalStrategy, uniqueKey []string) error {
var byNameClause string
if byName {
Expand Down
2 changes: 2 additions & 0 deletions runtime/drivers/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type OLAPStore interface {
RenameTable(ctx context.Context, name, newName string, view bool) error
AddTableColumn(ctx context.Context, tableName, columnName string, typ string) error
AlterTableColumn(ctx context.Context, tableName, columnName string, newType string) error

MayBeScaledToZero(ctx context.Context) bool
}

// Statement wraps a query to execute against an OLAP driver.
Expand Down
4 changes: 4 additions & 0 deletions runtime/drivers/pinot/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (c *connection) Execute(ctx context.Context, stmt *drivers.Statement) (*dri
return r, nil
}

func (c *connection) MayBeScaledToZero(ctx context.Context) bool {
return false
}

type informationSchema struct {
c *connection
}
Expand Down
17 changes: 17 additions & 0 deletions runtime/drivers/sqlite/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,20 @@ func (c *catalogStore) DeleteModelSplits(ctx context.Context, modelID string) er

return nil
}

func (c *catalogStore) FindInstanceHealth(ctx context.Context, instanceID string) (*drivers.InstanceHealth, error) {
var h drivers.InstanceHealth
err := c.db.QueryRowContext(ctx, "SELECT health, created_on FROM instance_health WHERE instance_id=?", instanceID).Scan(&h.HealthJSON, &h.UpdatedOn)
if err != nil {
return nil, err
}

return &h, nil
}

func (c *catalogStore) UpsertInstanceHealth(ctx context.Context, h *drivers.InstanceHealth) error {
_, err := c.db.ExecContext(ctx, `INSERT INTO instance_health(instance_id, health, created_on) Values (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(instance_id) DO UPDATE SET health=excluded.health, created_on=excluded.created_on;
`, h.InstanceID, h.HealthJSON)
return err
}
1 change: 1 addition & 0 deletions runtime/drivers/sqlite/migrations/0025.sql
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
UPDATE catalogv2 SET kind = 'rill.runtime.v1.Canvas' WHERE kind = 'rill.runtime.v1.Dashboard';

8 changes: 8 additions & 0 deletions runtime/drivers/sqlite/migrations/0026.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
PRAGMA foreign_keys = ON;

CREATE TABLE IF NOT EXISTS instance_health (
instance_id TEXT PRIMARY KEY,
health_json BLOB NOT NULL,
updated_on TIMESTAMP NOT NULL,
FOREIGN KEY (instance_id) REFERENCES instances(id) ON DELETE CASCADE
);
Loading
Loading