Skip to content

Commit 373cb21

Browse files
[Backward Incompatible] Add env variables for Flink managers and fix for JM HA (#45)
* Add env variables for Flink managers and fix for JM HA * Inject customized env from the operator
1 parent 9053e4f commit 373cb21

14 files changed

+205
-54
lines changed

examples/wordcount/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH
99
COPY . /code
1010

1111
# Configure Flink version
12-
ENV FLINK_VERSION=1.8.0 \
12+
ENV FLINK_VERSION=1.8.1 \
1313
HADOOP_SCALA_VARIANT=scala_2.12
1414

1515
# Install dependencies

examples/wordcount/docker-entrypoint.sh

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,13 @@ drop_privs_cmd() {
1313
fi
1414
}
1515

16-
envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml
17-
18-
# As the taskmanager pods are accessible only by (cluster) ip address,
19-
# we must manually configure this based on the podIp kubernetes
20-
# variable, which is assigned to TASKMANAGER_HOSTNAME env var by the
21-
# operator.
22-
if [ -n "$TASKMANAGER_HOSTNAME" ]; then
23-
echo "taskmanager.host: $TASKMANAGER_HOSTNAME" >> "$FLINK_HOME/conf/flink-conf.yaml"
24-
fi
25-
2616
# Add in extra configs set by the operator
2717
if [ -n "$OPERATOR_FLINK_CONFIG" ]; then
28-
echo "$OPERATOR_FLINK_CONFIG" >> "$FLINK_HOME/conf/flink-conf.yaml"
18+
echo "$OPERATOR_FLINK_CONFIG" >> "/usr/local/flink-conf.yaml"
2919
fi
3020

21+
envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml
22+
3123
COMMAND=$@
3224

3325
if [ $# -lt 1 ]; then
@@ -37,11 +29,11 @@ fi
3729
if [ "$COMMAND" = "help" ]; then
3830
echo "Usage: $(basename "$0") (jobmanager|taskmanager|local|help)"
3931
exit 0
40-
elif [ "$COMMAND" = "jobmanager" ]; then
32+
elif [ "$FLINK_DEPLOYMENT_TYPE" = "jobmanager" ]; then
4133
echo "Starting Job Manager"
4234
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
4335
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground
44-
elif [ "$COMMAND" = "taskmanager" ]; then
36+
elif [ "$FLINK_DEPLOYMENT_TYPE" = "taskmanager" ]; then
4537
echo "Starting Task Manager"
4638
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
4739
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground

integ/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ By default the tests create, use, and clean up the namespace
2323
These tests use a sample Flink job [operator-test-app](/integ/operator-test-app/). The
2424
tests currently use two images built from here:
2525

26-
* `lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.1`
27-
* `lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.2`
26+
* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1`
27+
* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2`
2828

2929
Those images are available on our private Dockerhub registry, and you
3030
will either need to pull them locally or give Kubernetes access to the

integ/operator-test-app/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH
99
COPY . /code
1010

1111
# Configure Flink version
12-
ENV FLINK_VERSION=1.8.0 \
12+
ENV FLINK_VERSION=1.8.1 \
1313
HADOOP_SCALA_VARIANT=scala_2.12
1414

1515
# Install dependencies

integ/operator-test-app/docker-entrypoint.sh

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,13 @@ drop_privs_cmd() {
1313
fi
1414
}
1515

16-
envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml
17-
18-
# As the taskmanager pods are accessible only by (cluster) ip address,
19-
# we must manually configure this based on the podIp kubernetes
20-
# variable, which is assigned to TASKMANAGER_HOSTNAME env var by the
21-
# operator.
22-
if [ -n "$TASKMANAGER_HOSTNAME" ]; then
23-
echo "taskmanager.host: $TASKMANAGER_HOSTNAME" >> "$FLINK_HOME/conf/flink-conf.yaml"
24-
fi
25-
2616
# Add in extra configs set by the operator
2717
if [ -n "$OPERATOR_FLINK_CONFIG" ]; then
28-
echo "$OPERATOR_FLINK_CONFIG" >> "$FLINK_HOME/conf/flink-conf.yaml"
18+
echo "$OPERATOR_FLINK_CONFIG" >> "/usr/local/flink-conf.yaml"
2919
fi
3020

21+
envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml
22+
3123
COMMAND=$@
3224

3325
if [ $# -lt 1 ]; then
@@ -37,11 +29,11 @@ fi
3729
if [ "$COMMAND" = "help" ]; then
3830
echo "Usage: $(basename "$0") (jobmanager|taskmanager|local|help)"
3931
exit 0
40-
elif [ "$COMMAND" = "jobmanager" ]; then
32+
elif [ "$FLINK_DEPLOYMENT_TYPE" = "jobmanager" ]; then
4133
echo "Starting Job Manager"
4234
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
4335
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground
44-
elif [ "$COMMAND" = "taskmanager" ]; then
36+
elif [ "$FLINK_DEPLOYMENT_TYPE" = "taskmanager" ]; then
4537
echo "Starting Task Manager"
4638
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
4739
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground

integ/simple_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1616
)
1717

18-
const NewImage = "lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.2"
18+
const NewImage = "lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2"
1919

2020
func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1alpha1.FlinkApplication), failurePhase v1alpha1.FlinkApplicationPhase) *v1alpha1.FlinkApplication {
2121
app, err := s.Util.GetFlinkApplication(name)

integ/test_app.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ metadata:
66
labels:
77
environment: development
88
spec:
9-
image: lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.1
9+
image: lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
1010
imagePullSecrets:
1111
- name: dockerhub
1212
flinkConfig:

pkg/controller/flink/config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package flink
22

33
import (
4+
"strings"
5+
46
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1"
57
"gopkg.in/yaml.v2"
68
)
@@ -14,6 +16,7 @@ const (
1416
UIDefaultPort = 8081
1517
MetricsQueryDefaultPort = 50101
1618
OffHeapMemoryDefaultFraction = 0.5
19+
HighAvailabilityKey = "high-availability"
1720
)
1821

1922
func firstNonNil(x *int32, y int32) int32 {
@@ -118,3 +121,13 @@ func renderFlinkConfig(app *v1alpha1.FlinkApplication) (string, error) {
118121
}
119122
return string(b), nil
120123
}
124+
125+
func isHAEnabled(flinkConfig v1alpha1.FlinkConfig) bool {
126+
if val, ok := flinkConfig[HighAvailabilityKey]; ok {
127+
value := val.(string)
128+
if strings.ToLower(strings.TrimSpace(value)) != "none" {
129+
return true
130+
}
131+
}
132+
return false
133+
}

pkg/controller/flink/container_utils.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ const (
2424
AwsMetadataServiceTimeout = "5"
2525
AwsMetadataServiceNumAttempts = "20"
2626
OperatorFlinkConfig = "OPERATOR_FLINK_CONFIG"
27+
HostName = "HOST_NAME"
28+
HostIP = "HOST_IP"
29+
FlinkDeploymentTypeEnv = "FLINK_DEPLOYMENT_TYPE"
2730
FlinkDeploymentType = "flink-deployment-type"
2831
FlinkDeploymentTypeJobmanager = "jobmanager"
2932
FlinkDeploymentTypeTaskmanager = "taskmanager"
@@ -87,6 +90,22 @@ func getFlinkEnv(app *v1alpha1.FlinkApplication) ([]v1.EnvVar, error) {
8790
Name: OperatorFlinkConfig,
8891
Value: flinkConfig,
8992
},
93+
{
94+
Name: HostName,
95+
ValueFrom: &v1.EnvVarSource{
96+
FieldRef: &v1.ObjectFieldSelector{
97+
FieldPath: "metadata.name",
98+
},
99+
},
100+
},
101+
{
102+
Name: HostIP,
103+
ValueFrom: &v1.EnvVarSource{
104+
FieldRef: &v1.ObjectFieldSelector{
105+
FieldPath: "status.podIP",
106+
},
107+
},
108+
},
90109
}...)
91110
return env, nil
92111
}
@@ -155,14 +174,23 @@ func HashForApplication(app *v1alpha1.FlinkApplication) string {
155174
return fmt.Sprintf("%08x", hasher.Sum32())
156175
}
157176

158-
func InjectHashesIntoConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkApplication, hash string) {
177+
func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkApplication, hash string, deploymentType string) {
159178
var newContainers []v1.Container
160179
for _, container := range deployment.Spec.Template.Spec.Containers {
161180
var newEnv []v1.EnvVar
162181
for _, env := range container.Env {
163182
if env.Name == OperatorFlinkConfig {
164-
env.Value = fmt.Sprintf("%s\nhigh-availability.cluster-id: %s-%s\n", env.Value, app.Name, hash)
165-
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash))
183+
if isHAEnabled(app.Spec.FlinkConfig) {
184+
env.Value = fmt.Sprintf("%s\nhigh-availability.cluster-id: %s-%s\n", env.Value, app.Name, hash)
185+
if deploymentType == FlinkDeploymentTypeJobmanager {
186+
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: $HOST_IP\n", env.Value)
187+
}
188+
} else {
189+
env.Value = fmt.Sprintf("%s\njobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash))
190+
}
191+
if deploymentType == FlinkDeploymentTypeTaskmanager {
192+
env.Value = fmt.Sprintf("%staskmanager.host: $HOST_IP\n", env.Value)
193+
}
166194
}
167195
newEnv = append(newEnv, env)
168196
}

pkg/controller/flink/flink_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
const testImage = "123.xyz.com/xx:11ae1218924428faabd9b64423fa0c332efba6b2"
3030

3131
// Note: if you find yourself changing this to fix a test, that should be treated as a breaking API change
32-
const testAppHash = "cb56c9a1"
32+
const testAppHash = "de844839"
3333
const testAppName = "app-name"
3434
const testNamespace = "ns"
3535
const testJobID = "j1"

0 commit comments

Comments
 (0)