Skip to content

Commit

Permalink
Fix status update post-teardown (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
lrao100 authored Apr 23, 2020
1 parent 9fe7fc1 commit 6f6a2eb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
19 changes: 9 additions & 10 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,13 @@ func getCancelFlag(app *v1beta1.FlinkApplication) bool {
// DeploymentMode is set to BlueGreen
func (s *FlinkStateMachine) handleDualRunning(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) {
if application.Spec.TearDownVersionHash != "" {
return s.teardownApplicationVersion(ctx, application)
versionHashToTeardown := application.Spec.TearDownVersionHash
_, _, err := s.flinkController.GetVersionAndJobIDForHash(ctx, application, versionHashToTeardown)
if err != nil {
logger.Warnf(ctx, "Cannot find flink application with tearDownVersionhash %s. The hash may be obsolete; Ignoring hash", versionHashToTeardown)
} else {
return s.teardownApplicationVersion(ctx, application)
}
}

// Update status of the cluster
Expand All @@ -969,14 +975,7 @@ func (s *FlinkStateMachine) handleDualRunning(ctx context.Context, application *

func (s *FlinkStateMachine) teardownApplicationVersion(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) {
versionHashToTeardown := application.Spec.TearDownVersionHash
versionToTeardown, jobID, err := s.flinkController.GetVersionAndJobIDForHash(ctx, application, versionHashToTeardown)

if err != nil {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "TeardownFailed",
fmt.Sprintf("Failed to find application version %v",
versionHashToTeardown))
return statusUnchanged, nil
}
versionToTeardown, jobID, _ := s.flinkController.GetVersionAndJobIDForHash(ctx, application, versionHashToTeardown)

s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "TeardownInitated",
fmt.Sprintf("Tearing down application with hash %s and version %v", versionHashToTeardown,
Expand All @@ -986,7 +985,7 @@ func (s *FlinkStateMachine) teardownApplicationVersion(ctx context.Context, appl
fmt.Sprintf("Force-canceling application with version %v and hash %s",
versionToTeardown, versionHashToTeardown))

err = s.flinkController.ForceCancel(ctx, application, versionHashToTeardown, jobID)
err := s.flinkController.ForceCancel(ctx, application, versionHashToTeardown, jobID)
if err != nil {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "TeardownFailed",
fmt.Sprintf("Failed to force-cancel application version %v and hash %s; will attempt to tear down cluster immediately: %s",
Expand Down
19 changes: 19 additions & 0 deletions pkg/controller/flinkapplication/flink_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,7 @@ func TestRunningToDualRunning(t *testing.T) {
mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (s string, err error) {
return "jobID2", nil
}

err = stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)
assert.Equal(t, "jobID2", app.Status.VersionStatuses[1].JobStatus.JobID)
Expand All @@ -1684,6 +1685,24 @@ func TestRunningToDualRunning(t *testing.T) {
assert.Equal(t, v1beta1.GreenFlinkApplication, app.Status.DeployVersion)
assert.Equal(t, v1beta1.BlueFlinkApplication, app.Status.UpdatingVersion)

// Set an obsolete tearDownVersionHash and ensure that the application continues to run in DualRunning
// And updates all status fields
mockFlinkController.GetVersionAndJobIDForHashFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (version string, s string, err error) {
return "", "", errors.New("no version found")
}
mockFlinkController.CompareAndUpdateJobStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) {
assert.Equal(t, deployHash, application.Status.VersionStatuses[0].VersionHash)
assert.Equal(t, updatingHash, application.Status.VersionStatuses[1].VersionHash)
return true, nil
}

app.Spec.TearDownVersionHash = "obsoleteHash"
err = stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)
assert.Equal(t, v1beta1.FlinkApplicationDualRunning, app.Status.Phase)
assert.Equal(t, "jobId", app.Status.VersionStatuses[0].JobStatus.JobID)
assert.Equal(t, "jobID2", app.Status.VersionStatuses[1].JobStatus.JobID)

}

func TestDualRunningToRunning(t *testing.T) {
Expand Down

0 comments on commit 6f6a2eb

Please sign in to comment.