Skip to content

Commit

Permalink
check compaction settings from API to compare to desired, add ability…
Browse files Browse the repository at this point in the history
… to set dynamic configurations with Druid
  • Loading branch information
jmcwilliams-te committed Dec 19, 2024
1 parent 3615fc0 commit 5171aa2
Show file tree
Hide file tree
Showing 18 changed files with 692 additions and 259 deletions.
9 changes: 9 additions & 0 deletions apis/druid/v1alpha1/druid_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package v1alpha1
import (
"encoding/json"

druidapi "github.com/datainfrahq/druid-operator/pkg/druidapi"
appsv1 "k8s.io/api/apps/v1"
autoscalev2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

// druid-operator deploys a druid cluster from given spec below, based on the spec it would create following
Expand Down Expand Up @@ -281,6 +283,13 @@ type DruidSpec struct {
// CoreSite Contents of `core-site.xml`.
// +optional
CoreSite string `json:"core-site.xml,omitempty"`

// Dynamic Configurations for Druid. Applied through the dynamic configuration API.
// +optional
DynamicConfig runtime.RawExtension `json:"dynamicConfig,omitempty"`

// +optional
Auth druidapi.Auth `json:"auth,omitempty"`
}

// DruidNodeSpec Specification of `Druid` Node type and its configurations.
Expand Down
17 changes: 2 additions & 15 deletions apis/druid/v1alpha1/druidingestion_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
druidapi "github.com/datainfrahq/druid-operator/pkg/druidapi"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -40,7 +41,7 @@ type DruidIngestionSpec struct {
// +required
Ingestion IngestionSpec `json:"ingestion"`
// +optional
Auth Auth `json:"auth"`
Auth druidapi.Auth `json:"auth"`
}

type IngestionSpec struct {
Expand Down Expand Up @@ -72,23 +73,9 @@ type DruidIngestionStatus struct {
// CurrentIngestionSpec is a string instead of RawExtension to maintain compatibility with existing
// IngestionSpecs that are stored as JSON strings.
CurrentIngestionSpec string `json:"currentIngestionSpec.json"`
CurrentCompaction runtime.RawExtension `json:"compaction,omitempty"`
CurrentRules []runtime.RawExtension `json:"rules,omitempty"`
}

type AuthType string

const (
BasicAuth AuthType = "basic-auth"
)

type Auth struct {
// +required
Type AuthType `json:"type"`
// +required
SecretRef v1.SecretReference `json:"secretRef"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
Expand Down
19 changes: 2 additions & 17 deletions apis/druid/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions chart/crds/druid.apache.org_druidingestions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ spec:
type: object
status:
properties:
compaction:
type: object
x-kubernetes-preserve-unknown-fields: true
currentIngestionSpec.json:
description: |-
CurrentIngestionSpec is a string instead of RawExtension to maintain compatibility with existing
Expand Down
28 changes: 28 additions & 0 deletions chart/crds/druid.apache.org_druids.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,29 @@ spec:
type: array
type: object
type: object
auth:
properties:
secretRef:
description: |-
SecretReference represents a Secret Reference. It has enough information to retrieve secret
in any namespace
properties:
name:
description: name is unique within a namespace to reference
a secret resource.
type: string
namespace:
description: namespace defines the space within which the
secret name must be unique.
type: string
type: object
x-kubernetes-map-type: atomic
type:
type: string
required:
- secretRef
- type
type: object
common.runtime.properties:
description: CommonRuntimeProperties Content fo the `common.runtime.properties`
configuration file.
Expand Down Expand Up @@ -1478,6 +1501,11 @@ spec:
description: DisablePVCDeletionFinalizer Whether PVCs shall be deleted
on the deletion of the Druid cluster.
type: boolean
dynamicConfig:
description: Dynamic Configurations for Druid. Applied through the
dynamic configuration API.
type: object
x-kubernetes-preserve-unknown-fields: true
env:
description: Env Environment variables for druid containers.
items:
Expand Down
3 changes: 0 additions & 3 deletions config/crd/bases/druid.apache.org_druidingestions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ spec:
type: object
status:
properties:
compaction:
type: object
x-kubernetes-preserve-unknown-fields: true
currentIngestionSpec.json:
description: |-
CurrentIngestionSpec is a string instead of RawExtension to maintain compatibility with existing
Expand Down
28 changes: 28 additions & 0 deletions config/crd/bases/druid.apache.org_druids.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,29 @@ spec:
type: array
type: object
type: object
auth:
properties:
secretRef:
description: |-
SecretReference represents a Secret Reference. It has enough information to retrieve secret
in any namespace
properties:
name:
description: name is unique within a namespace to reference
a secret resource.
type: string
namespace:
description: namespace defines the space within which the
secret name must be unique.
type: string
type: object
x-kubernetes-map-type: atomic
type:
type: string
required:
- secretRef
- type
type: object
common.runtime.properties:
description: CommonRuntimeProperties Content fo the `common.runtime.properties`
configuration file.
Expand Down Expand Up @@ -1478,6 +1501,11 @@ spec:
description: DisablePVCDeletionFinalizer Whether PVCs shall be deleted
on the deletion of the Druid cluster.
type: boolean
dynamicConfig:
description: Dynamic Configurations for Druid. Applied through the
dynamic configuration API.
type: object
x-kubernetes-preserve-unknown-fields: true
env:
description: Env Environment variables for druid containers.
items:
Expand Down
11 changes: 9 additions & 2 deletions controllers/druid/druid_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,18 @@ func (r *DruidReconciler) Reconcile(ctx context.Context, request reconcile.Reque
// Initialize Emit Events
var emitEvent EventEmitter = EmitEventFuncs{r.Recorder}

// Deploy Druid Cluster
if err := deployDruidCluster(ctx, r.Client, instance, emitEvent); err != nil {
return ctrl.Result{}, err
} else {
return ctrl.Result{RequeueAfter: r.ReconcileWait}, nil
}

// Update Druid Dynamic Configs
if err := updateDruidDynamicConfigs(ctx, r.Client, instance, emitEvent); err != nil {
return ctrl.Result{}, err
}

// If both operations succeed, requeue after specified wait time
return ctrl.Result{RequeueAfter: r.ReconcileWait}, nil
}

func (r *DruidReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand Down
102 changes: 102 additions & 0 deletions controllers/druid/dynamic_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package druid

import (
"context"
"errors"
"fmt"
"net/http"

"github.com/datainfrahq/druid-operator/apis/druid/v1alpha1"
druidapi "github.com/datainfrahq/druid-operator/pkg/druidapi"
internalhttp "github.com/datainfrahq/druid-operator/pkg/http"
"github.com/datainfrahq/druid-operator/pkg/util"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// updateDruidDynamicConfigs updates the Druid cluster's dynamic configurations.
func updateDruidDynamicConfigs(ctx context.Context, client client.Client, druid *v1alpha1.Druid, emitEvent EventEmitter) error {
if druid.Spec.DynamicConfig.Size() == 0 {
return nil
}

svcName, err := druidapi.GetRouterSvcUrl(druid.Namespace, druid.Name, client)
if err != nil {
emitEvent.EmitEventGeneric(druid, "GetRouterSvcUrlFailed", "Failed to get router service URL", err)
return err
}

basicAuth, err := druidapi.GetAuthCreds(
ctx,
client,
druid.Spec.Auth,
)
if err != nil {
emitEvent.EmitEventGeneric(druid, "GetAuthCredsFailed", "Failed to get authentication credentials", err)
return err
}

// Create the HTTP client with basic authentication
httpClient := internalhttp.NewHTTPClient(
&http.Client{},
&internalhttp.Auth{BasicAuth: basicAuth},
)

// Define the URL path for dynamic configurations
dynamicConfigPath := druidapi.MakePath(svcName, "indexer", "worker")

// Fetch current dynamic configurations
currentResp, err := httpClient.Do(
http.MethodGet,
dynamicConfigPath,
nil,
)
if err != nil {
emitEvent.EmitEventGeneric(druid, "FetchCurrentConfigsFailed", "Failed to fetch current dynamic configurations", err)
return err
}
if currentResp.StatusCode != http.StatusOK {
err = fmt.Errorf(
"failed to retrieve current Druid dynamic configurations. Status code: %d, Response body: %s",
currentResp.StatusCode, string(currentResp.ResponseBody),
)
emitEvent.EmitEventGeneric(druid, "FetchCurrentConfigsFailed", "Failed to fetch current dynamic configurations", err)
return err
}

// Handle empty response body
var currentConfigsJson string
if len(currentResp.ResponseBody) == 0 {
currentConfigsJson = "{}" // Initialize as empty JSON object if response body is empty
} else {
currentConfigsJson = currentResp.ResponseBody
}

// Compare current and desired configurations
equal, err := util.IncludesJson(currentConfigsJson, string(druid.Spec.DynamicConfig.Raw))
if err != nil {
emitEvent.EmitEventGeneric(druid, "ConfigComparisonFailed", "Failed to compare configurations", err)
return err
}
if equal {
// Configurations are already up-to-date
emitEvent.EmitEventGeneric(druid, "ConfigsUpToDate", "Dynamic configurations are already up-to-date", nil)
return nil
}

// Update the Druid cluster's dynamic configurations if needed
respDynamicConfigs, err := httpClient.Do(
http.MethodPost,
dynamicConfigPath,
druid.Spec.DynamicConfig.Raw,
)
if err != nil {
emitEvent.EmitEventGeneric(druid, "UpdateConfigsFailed", "Failed to update dynamic configurations", err)
return err
}
if respDynamicConfigs.StatusCode != http.StatusOK {
return errors.New("failed to update Druid dynamic configurations")
}

emitEvent.EmitEventGeneric(druid, "ConfigsUpdated", "Successfully updated dynamic configurations", nil)
return nil
}
Loading

0 comments on commit 5171aa2

Please sign in to comment.