Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream release command app logs #4020

Merged
merged 3 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 116 additions & 30 deletions internal/command/deploy/machines_releasecommand.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deploy

import (
"container/ring"
"context"
"errors"
"fmt"
Expand All @@ -9,16 +10,21 @@ import (
"strings"
"time"

"github.com/logrusorgru/aurora"
"github.com/samber/lo"
fly "github.com/superfly/fly-go"
"github.com/superfly/fly-go/flaps"
"github.com/superfly/flyctl/helpers"
"github.com/superfly/flyctl/internal/appconfig"
"github.com/superfly/flyctl/internal/config"
"github.com/superfly/flyctl/internal/flag"
"github.com/superfly/flyctl/internal/format"
"github.com/superfly/flyctl/internal/machine"
"github.com/superfly/flyctl/internal/statuslogger"
"github.com/superfly/flyctl/internal/tracing"
"github.com/superfly/flyctl/logs"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)

func (md *machineDeployment) runReleaseCommand(ctx context.Context) (err error) {
Expand Down Expand Up @@ -47,12 +53,74 @@ func (md *machineDeployment) runReleaseCommand(ctx context.Context) (err error)
loggerCleanup(false)
}()

err = md.createOrUpdateReleaseCmdMachine(ctx)
if err != nil {
tracing.RecordError(span, err, "failed to create release cmd machine")
return fmt.Errorf("error running release_command machine: %w", err)
logOpts := &logs.LogOptions{
AppName: appconfig.NameFromContext(ctx),
RegionCode: config.FromContext(ctx).Region,
NoTail: false,
}
var stream logs.LogStream

eg, groupCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
err := md.createOrUpdateReleaseCmdMachine(groupCtx)
if err != nil {
tracing.RecordError(span, err, "failed to create release cmd machine")
return fmt.Errorf("error running release_command machine: %w", err)
}
return nil
})
eg.Go(func() error {
stream, err = logs.NewNatsStream(ctx, md.apiClient, logOpts)
if err != nil {
// Silently fallback to app logs polling if NATS streaming client is unavailable.
stream = logs.NewPollingStream(md.apiClient)
}
return nil
})
if err = eg.Wait(); err != nil {
return err
}

releaseCmdMachine := md.releaseCommandMachine.GetMachines()[0]

logOpts.VMID = releaseCmdMachine.Machine().ID
logsCtx, cancelLogs := context.WithCancel(ctx)
defer cancelLogs()
var buf *ring.Ring
if !flag.GetBool(ctx, "verbose") {
buf = ring.New(100)
}
go func() {
defer cancelLogs()
if stream == nil {
return
}
for entry := range stream.Stream(logsCtx, logOpts) {
var ts time.Time
if ts, err = time.Parse(time.RFC3339Nano, entry.Timestamp); err != nil {
err = fmt.Errorf("failed parsing timestamp %q: %w", entry.Timestamp, err)
return
}
msg := fmt.Sprintf("%s %s", aurora.Faint(format.Time(ts)), entry.Message)
if buf != nil {
buf.Value = msg
buf = buf.Next()
} else {
fmt.Fprintln(md.io.ErrOut)
}
if strings.Contains(entry.Message, "Main child exited normally") {
return
}
}
}()

fmt.Fprintln(md.io.ErrOut, "Starting machine")

if err = releaseCmdMachine.Start(ctx); err != nil {
fmt.Fprintf(md.io.ErrOut, "error starting release_command machine: %v\n", err)
return
}

// FIXME: consolidate this wait stuff with deploy waits? Especially once we improve the outpu
err = md.waitForReleaseCommandToFinish(ctx, releaseCmdMachine)
if err != nil {
Expand All @@ -69,35 +137,29 @@ func (md *machineDeployment) runReleaseCommand(ctx context.Context) (err error)
return fmt.Errorf("error get release_command machine %s exit code: %w", releaseCmdMachine.Machine().ID, err)
}

if exitCode != 0 || flag.GetBool(ctx, "verbose") {
if exitCode != 0 {
statuslogger.LogStatus(ctx, statuslogger.StatusFailure, "release_command failed")
}
if flag.GetBool(ctx, "verbose") {
waitForLogs(md, logsCtx, stream, releaseCmdMachine.Machine().ID)
}

if exitCode != 0 {
statuslogger.LogStatus(ctx, statuslogger.StatusFailure, "release_command failed")

// Preemptive cleanup of the logger so that the logs have a clean place to write to
loggerCleanup(false)

time.Sleep(2 * time.Second) // Wait 2 secs to be sure logs have reached OpenSearch
if exitCode != 0 {
fmt.Fprintf(md.io.ErrOut, "Error release_command failed running on machine %s with exit code %s.\n",
md.colorize.Bold(releaseCmdMachine.Machine().ID), md.colorize.Red(strconv.Itoa(exitCode)))
}
fmt.Fprintf(md.io.ErrOut, "Check its logs: here's the last 100 lines below, or run 'fly logs -i %s':\n",
releaseCmdMachine.Machine().ID)
releaseCmdLogs, _, err := md.apiClient.GetAppLogs(ctx, md.app.Name, "", md.appConfig.PrimaryRegion, releaseCmdMachine.Machine().ID)
if fly.IsNotAuthenticatedError(err) {
fmt.Fprintf(md.io.ErrOut, "Warn: not authorized to retrieve app logs (this can happen when using deploy tokens), so we can't show you what failed. Use `fly logs -i %s` or open the monitoring dashboard to see them: https://fly.io/apps/%s/monitoring?region=&instance=%s\n", releaseCmdMachine.Machine().ID, md.appConfig.AppName, releaseCmdMachine.Machine().ID)
} else {
if err != nil {
return fmt.Errorf("error getting release_command logs: %w", err)
}
for _, l := range releaseCmdLogs {
fmt.Fprintf(md.io.ErrOut, " %s\n", l.Message)
}
}
if exitCode != 0 {
return fmt.Errorf("error release_command machine %s exited with non-zero status of %d", releaseCmdMachine.Machine().ID, exitCode)
fmt.Fprintf(md.io.ErrOut, "Error release_command failed running on machine %s with exit code %s.\n",
md.colorize.Bold(releaseCmdMachine.Machine().ID), md.colorize.Red(strconv.Itoa(exitCode)))

if !flag.GetBool(ctx, "verbose") {
fmt.Fprintf(md.io.ErrOut, "Checking logs: fetching the last 100 lines below:\n")
waitForLogs(md, logsCtx, stream, releaseCmdMachine.Machine().ID)
buf.Do(func(str any) {
if str != nil {
fmt.Fprintln(md.io.ErrOut, str)
}
})
}
return fmt.Errorf("machine %s exited with non-zero status of %d", releaseCmdMachine.Machine().ID, exitCode)
}
statuslogger.LogfStatus(ctx,
statuslogger.StatusSuccess,
Expand All @@ -107,6 +169,22 @@ func (md *machineDeployment) runReleaseCommand(ctx context.Context) (err error)
return nil
}

// Wait up to 20 secs to be sure logs have been fully ingested, and log any errors.
func waitForLogs(md *machineDeployment, ctx context.Context, stream logs.LogStream, id string) {
timer := time.NewTimer(20 * time.Second)
defer timer.Stop()
select {
case <-ctx.Done():
if fly.IsNotAuthenticatedError(stream.Err()) {
fmt.Fprintf(md.io.ErrOut, "Warn: not authorized to retrieve app logs (this can happen when using deploy tokens). Use `fly logs -i %s` or open the monitoring dashboard to see them: https://fly.io/apps/%s/monitoring?region=&instance=%s\n", id, md.appConfig.AppName, id)
} else if stream.Err() != nil && !errors.Is(stream.Err(), context.Canceled) {
fmt.Fprintf(md.io.ErrOut, "error getting release command logs: %v\n", stream.Err())
}
case <-timer.C:
fmt.Fprintf(md.io.ErrOut, "timeout waiting for release command logs\n")
}
}

// dedicatedHostIdMismatch checks if the dedicatedHostID on a machine is the same as the one set in the fly.toml
// a mismatch will result in a delete+recreate op
func dedicatedHostIdMismatch(m *fly.Machine, ac *appconfig.Config) bool {
Expand Down Expand Up @@ -147,6 +225,13 @@ func (md *machineDeployment) createReleaseCommandMachine(ctx context.Context) er

statuslogger.Logf(ctx, "Created release_command machine %s", md.colorize.Bold(releaseCmdMachine.ID))
md.releaseCommandMachine = machine.NewMachineSet(md.flapsClient, md.io, []*fly.Machine{releaseCmdMachine}, true)

lm := md.releaseCommandMachine.GetMachines()[0]
if err := lm.WaitForState(ctx, fly.MachineStateStopped, md.waitTimeout, false); err != nil {
err = suggestChangeWaitTimeout(err, "wait-timeout")
return err
}

return nil
}

Expand Down Expand Up @@ -194,8 +279,9 @@ func (md *machineDeployment) launchInputForReleaseCommand(origMachineRaw *fly.Ma
}

return &fly.LaunchMachineInput{
Config: mConfig,
Region: origMachineRaw.Region,
Config: mConfig,
Region: origMachineRaw.Region,
SkipLaunch: true,
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/command/deploy/machines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func Test_resolveUpdatedMachineConfig_ReleaseCommand(t *testing.T) {
},
Guest: fly.MachinePresets["shared-cpu-2x"],
},
SkipLaunch: true,
}, got)

// Update existing release command machine
Expand Down Expand Up @@ -227,6 +228,7 @@ func Test_resolveUpdatedMachineConfig_ReleaseCommand(t *testing.T) {
},
Guest: fly.MachinePresets["shared-cpu-2x"],
},
SkipLaunch: true,
}, got)
}

Expand Down
4 changes: 2 additions & 2 deletions logs/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type pollingStream struct {
apiClient flyutil.Client
}

func NewPollingStream(client flyutil.Client, opts *LogOptions) (LogStream, error) {
return &pollingStream{apiClient: client}, nil
func NewPollingStream(client flyutil.Client) LogStream {
return &pollingStream{apiClient: client}
}

func (s *pollingStream) Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry {
Expand Down
Loading