Skip to content
This repository was archived by the owner on Jun 11, 2021. It is now read-only.

Commit f4713dd

Browse files
Expose SyncPeriod To Force Reconciliation (#130)
1 parent 532abcb commit f4713dd

File tree

5 files changed

+64
-14
lines changed

5 files changed

+64
-14
lines changed

components/controller/cmd/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"sigs.k8s.io/controller-runtime/pkg/client/config"
2121
"sigs.k8s.io/controller-runtime/pkg/manager"
2222
"strconv"
23+
"time"
2324
)
2425

2526
// The Main Function (Go Command)
@@ -58,7 +59,8 @@ func main() {
5859
// flag.Parse()
5960
// mgr, err := manager.New(cfg, manager.Options{MetricsBindAddress: metricsAddr})
6061
logger.Info("Creating Controller Manager")
61-
mgr, err := manager.New(cfg, manager.Options{})
62+
syncPeriod := time.Duration(environment.SyncPeriodMinutes) * time.Minute // Force Re-Reconciliation At Regular Intervals (Recovery)
63+
mgr, err := manager.New(cfg, manager.Options{SyncPeriod: &syncPeriod})
6264
if err != nil {
6365
logger.Error("Failed To Create Controller Manager", zap.Error(err))
6466
os.Exit(1)

components/controller/local-env.sh

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
1010
export SYSTEM_NAMESPACE="knative-eventing"
1111
export SERVICE_ACCOUNT="knative-kafka-channel-controller"
1212
export METRICS_PORT="8081"
13+
export SYNC_PERIOD_MINUTES="15"
1314
export KAFKA_PROVIDER="azure"
1415
export KAFKA_BROKERS="eventhub.servicebus.windows.net:9093"
1516
export KAFKA_OFFSET_COMMIT_MESSAGE_COUNT="50"
@@ -25,9 +26,19 @@ export DEFAULT_NUM_PARTITIONS="4"
2526
export DEFAULT_REPLICATION_FACTOR="1"
2627
export DEFAULT_RETENTION_MILLIS="604800000"
2728
export DISPATCHER_REPLICAS="1"
28-
export DISPATCHER_RETRY_RETRY_INITIAL_INTERVAL_MILLIS="500"
29-
export DISPATCHER_RETRY_EVENT_RETRY_TIME_MILLIS="300000"
29+
export DISPATCHER_RETRY_INITIAL_INTERVAL_MILLIS="500"
30+
export DISPATCHER_RETRY_TIME_MILLIS="300000"
3031
export DISPATCHER_RETRY_EXPONENTIAL_BACKOFF="true"
32+
export DISPATCHER_CPU_REQUEST="50Mi"
33+
export DISPATCHER_CPU_LIMIT="128Mi"
34+
export DISPATCHER_MEMORY_REQUEST="300m"
35+
export DISPATCHER_MEMORY_LIMIT="500m"
36+
export CHANNEL_MEMORY_REQUEST="50Mi"
37+
export CHANNEL_MEMORY_LIMIT="100Mi"
38+
export CHANNEL_CPU_REQUEST="100m"
39+
export CHANNEL_CPU_LIMIT="200m"
40+
41+
3142

3243
# Log Environment Variables
3344
echo ""
@@ -36,6 +47,7 @@ echo "-----------------"
3647
echo "SYSTEM_NAMESPACE=${SYSTEM_NAMESPACE}"
3748
echo "SERVICE_ACCOUNT=${SERVICE_ACCOUNT}"
3849
echo "METRICS_PORT=${METRICS_PORT}"
50+
echo "SYNC_PERIOD_MINUTES=${SYNC_PERIOD_MINUTES}"
3951
echo "KAFKA_PROVIDER=${KAFKA_PROVIDER}"
4052
echo "KAFKA_BROKERS=${KAFKA_BROKERS}"
4153
echo "KAFKA_OFFSET_COMMIT_MESSAGE_COUNT=${KAFKA_OFFSET_COMMIT_MESSAGE_COUNT}"
@@ -51,7 +63,15 @@ echo "DEFAULT_NUM_PARTITIONS=${DEFAULT_NUM_PARTITIONS}"
5163
echo "DEFAULT_REPLICATION_FACTOR=${DEFAULT_REPLICATION_FACTOR}"
5264
echo "DEFAULT_RETENTION_MILLIS=${DEFAULT_RETENTION_MILLIS}"
5365
echo "DISPATCHER_REPLICAS=${DISPATCHER_REPLICAS}"
54-
echo "DISPATCHER_RETRY_RETRY_INITIAL_INTERVAL_MILLIS=${DEFAULT_EVENT_RETRY_INITIAL_INTERVAL_MILLIS}"
55-
echo "DISPATCHER_RETRY_RETRY_TIME_MILLIS=${DEFAULT_EVENT_RETRY_TIME_MILLIS}"
66+
echo "DISPATCHER_RETRY_INITIAL_INTERVAL_MILLIS=${DISPATCHER_RETRY_INITIAL_INTERVAL_MILLIS}"
67+
echo "DISPATCHER_RETRY_TIME_MILLIS=${DISPATCHER_RETRY_TIME_MILLIS}"
5668
echo "DISPATCHER_RETRY_EXPONENTIAL_BACKOFF=${DEFAULT_EXPONENTIAL_BACKOFF}"
69+
echo "DISPATCHER_CPU_REQUEST=${DISPATCHER_CPU_REQUEST}"
70+
echo "DISPATCHER_CPU_LIMIT=${DISPATCHER_CPU_LIMIT}"
71+
echo "DISPATCHER_MEMORY_REQUEST=${DISPATCHER_MEMORY_REQUEST}"
72+
echo "DISPATCHER_MEMORY_LIMIT=${DISPATCHER_MEMORY_LIMIT}"
73+
echo "CHANNEL_MEMORY_REQUEST=${CHANNEL_MEMORY_REQUEST}"
74+
echo "CHANNEL_MEMORY_LIMIT=${CHANNEL_MEMORY_LIMIT}"
75+
echo "CHANNEL_CPU_REQUEST=${CHANNEL_CPU_REQUEST}"
76+
echo "CHANNEL_CPU_LIMIT=${CHANNEL_CPU_LIMIT}"
5777
echo ""

components/controller/pkg/env/environment.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ import (
1212
// Package Constants
1313
const (
1414
// Knative-Kafka Configuration
15-
ServiceAccountEnvVarKey = "SERVICE_ACCOUNT"
16-
MetricsPortEnvVarKey = "METRICS_PORT"
17-
HealthPortEnvVarKey = "HEALTH_PORT"
15+
SyncPeriodMinutesEnvVarKey = "SYNC_PERIOD_MINUTES"
16+
ServiceAccountEnvVarKey = "SERVICE_ACCOUNT"
17+
MetricsPortEnvVarKey = "METRICS_PORT"
18+
HealthPortEnvVarKey = "HEALTH_PORT"
1819

1920
// Kafka Authorization
2021
KafkaBrokerEnvVarKey = "KAFKA_BROKERS"
@@ -34,6 +35,7 @@ const (
3435
MaxRetryTimeEnvVarKey = "MAX_RETRY_TIME"
3536

3637
// Default Values To Use If Not Available In Env Variables
38+
DefaultSyncPeriodMinutes = "240"
3739
DefaultKafkaOffsetCommitMessageCount = "100"
3840
DefaultKafkaOffsetCommitDurationMillis = "5000"
3941

@@ -79,8 +81,9 @@ const (
7981
type Environment struct {
8082

8183
// Knative-Kafka Configuration
82-
ServiceAccount string // Required
83-
MetricsPort int // Required
84+
SyncPeriodMinutes int // Optional
85+
ServiceAccount string // Required
86+
MetricsPort int // Required
8487

8588
// Kafka Configuration / Authorization
8689
KafkaProvider string // Required
@@ -123,6 +126,14 @@ func GetEnvironment(logger *zap.Logger) (*Environment, error) {
123126
// The ControllerConfig Reference
124127
environment := &Environment{}
125128

129+
// Get The Optional SyncPeriodMinutes Config Value - Default To 4 Hours If Not Specified
130+
syncPeriodMinutesString := getOptionalConfigValue(logger, SyncPeriodMinutesEnvVarKey, DefaultSyncPeriodMinutes)
131+
environment.SyncPeriodMinutes, err = strconv.Atoi(syncPeriodMinutesString)
132+
if err != nil {
133+
logger.Error("Invalid SyncPeriodMinutes (Non Integer)", zap.String("Value", syncPeriodMinutesString), zap.Error(err))
134+
return nil, fmt.Errorf("invalid (non-integer) value '%s' for environment variable '%s'", syncPeriodMinutesString, SyncPeriodMinutesEnvVarKey)
135+
}
136+
126137
// Get The Required K8S ServiceAccount Config Value
127138
environment.ServiceAccount, err = getRequiredConfigValue(logger, ServiceAccountEnvVarKey)
128139
if err != nil {
@@ -224,7 +235,7 @@ func GetEnvironment(logger *zap.Logger) (*Environment, error) {
224235
} else {
225236
environment.DispatcherReplicas, err = strconv.Atoi(dispatcherReplicasString)
226237
if err != nil {
227-
logger.Error("Invalid DispatcherRepli cas (Non Integer)", zap.String("Value", dispatcherReplicasString), zap.Error(err))
238+
logger.Error("Invalid DispatcherReplicas (Non Integer)", zap.String("Value", dispatcherReplicasString), zap.Error(err))
228239
return nil, fmt.Errorf("invalid (non-integer) value '%s' for environment variable '%s'", dispatcherReplicasString, DispatcherReplicasEnvVarKey)
229240
}
230241
}

components/controller/pkg/env/environment_test.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ import (
1212

1313
// Test Constants
1414
const (
15-
serviceAccount = "TestServiceAccount"
16-
metricsPort = "9999"
17-
kafkaProvider = "confluent"
15+
syncPeriodMinutes = "11"
16+
serviceAccount = "TestServiceAccount"
17+
metricsPort = "9999"
18+
kafkaProvider = "confluent"
1819

1920
kafkaOffsetCommitMessageCount = "500"
2021
kafkaOffsetCommitDurationMillis = "2000"
@@ -45,6 +46,7 @@ const (
4546
// Define The TestCase Struct
4647
type TestCase struct {
4748
name string
49+
syncPeriodMinutes string
4850
serviceAccount string
4951
metricsPort string
5052
kafkaProvider string
@@ -83,6 +85,10 @@ func TestGetEnvironment(t *testing.T) {
8385
testCase := getValidTestCase("Valid Complete Config")
8486
testCases = append(testCases, testCase)
8587

88+
testCase = getValidTestCase("Missing Optional Config - SyncPeriodMinutes")
89+
testCase.syncPeriodMinutes = ""
90+
testCases = append(testCases, testCase)
91+
8692
testCase = getValidTestCase("Missing Required Config - ServiceAccount")
8793
testCase.serviceAccount = ""
8894
testCase.expectedError = getMissingRequiredEnvironmentVariableError(ServiceAccountEnvVarKey)
@@ -287,6 +293,7 @@ func TestGetEnvironment(t *testing.T) {
287293

288294
// (Re)Setup The Environment Variables From TestCase
289295
os.Clearenv()
296+
assert.Nil(t, os.Setenv(SyncPeriodMinutesEnvVarKey, testCase.syncPeriodMinutes))
290297
assert.Nil(t, os.Setenv(ServiceAccountEnvVarKey, testCase.serviceAccount))
291298
if len(testCase.metricsPort) > 0 {
292299
assert.Nil(t, os.Setenv(MetricsPortEnvVarKey, testCase.metricsPort))
@@ -332,6 +339,13 @@ func TestGetEnvironment(t *testing.T) {
332339

333340
assert.Nil(t, err)
334341
assert.NotNil(t, environment)
342+
343+
if len(testCase.syncPeriodMinutes) > 0 {
344+
assert.Equal(t, testCase.syncPeriodMinutes, strconv.Itoa(environment.SyncPeriodMinutes))
345+
} else {
346+
assert.Equal(t, DefaultSyncPeriodMinutes, strconv.Itoa(environment.SyncPeriodMinutes))
347+
}
348+
335349
assert.Equal(t, testCase.serviceAccount, environment.ServiceAccount)
336350
assert.Equal(t, testCase.metricsPort, strconv.Itoa(environment.MetricsPort))
337351
assert.Equal(t, testCase.channelImage, environment.ChannelImage)
@@ -390,6 +404,7 @@ func TestGetEnvironment(t *testing.T) {
390404
func getValidTestCase(name string) TestCase {
391405
return TestCase{
392406
name: name,
407+
syncPeriodMinutes: syncPeriodMinutes,
393408
serviceAccount: serviceAccount,
394409
metricsPort: metricsPort,
395410
kafkaProvider: kafkaProvider,

resources/knative-kafka/templates/deployment.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ spec:
3737
fieldPath: spec.serviceAccountName
3838
- name: METRICS_PORT
3939
value: {{ .Values.kafka.networking.container.metricsPort | quote }}
40+
- name: SYNC_PERIOD_MINUTES
41+
value: "240"
4042
- name: KAFKA_PROVIDER
4143
value: {{ .Values.environment.kafkaProvider | quote }}
4244
- name: KAFKA_OFFSET_COMMIT_MESSAGE_COUNT

0 commit comments

Comments
 (0)