Skip to content

Commit

Permalink
stream release command app logs (#4020)
Browse files Browse the repository at this point in the history
* stream release command app logs
stream logs to the console with the `--verbose` flag,
or display the last 100 lines after a command failure.
  • Loading branch information
wjordan authored Nov 1, 2024
1 parent 87d6db8 commit 37a4189
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 32 deletions.
146 changes: 116 additions & 30 deletions internal/command/deploy/machines_releasecommand.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deploy

import (
"container/ring"
"context"
"errors"
"fmt"
Expand All @@ -9,16 +10,21 @@ import (
"strings"
"time"

"github.com/logrusorgru/aurora"
"github.com/samber/lo"
fly "github.com/superfly/fly-go"
"github.com/superfly/fly-go/flaps"
"github.com/superfly/flyctl/helpers"
"github.com/superfly/flyctl/internal/appconfig"
"github.com/superfly/flyctl/internal/config"
"github.com/superfly/flyctl/internal/flag"
"github.com/superfly/flyctl/internal/format"
"github.com/superfly/flyctl/internal/machine"
"github.com/superfly/flyctl/internal/statuslogger"
"github.com/superfly/flyctl/internal/tracing"
"github.com/superfly/flyctl/logs"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)

func (md *machineDeployment) runReleaseCommand(ctx context.Context) (err error) {
Expand Down Expand Up @@ -47,12 +53,74 @@ func (md *machineDeployment) runReleaseCommand(ctx context.Context) (err error)
loggerCleanup(false)
}()

err = md.createOrUpdateReleaseCmdMachine(ctx)
if err != nil {
tracing.RecordError(span, err, "failed to create release cmd machine")
return fmt.Errorf("error running release_command machine: %w", err)
logOpts := &logs.LogOptions{
AppName: appconfig.NameFromContext(ctx),
RegionCode: config.FromContext(ctx).Region,
NoTail: false,
}
var stream logs.LogStream

eg, groupCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
err := md.createOrUpdateReleaseCmdMachine(groupCtx)
if err != nil {
tracing.RecordError(span, err, "failed to create release cmd machine")
return fmt.Errorf("error running release_command machine: %w", err)
}
return nil
})
eg.Go(func() error {
stream, err = logs.NewNatsStream(ctx, md.apiClient, logOpts)
if err != nil {
// Silently fallback to app logs polling if NATS streaming client is unavailable.
stream = logs.NewPollingStream(md.apiClient)
}
return nil
})
if err = eg.Wait(); err != nil {
return err
}

releaseCmdMachine := md.releaseCommandMachine.GetMachines()[0]

logOpts.VMID = releaseCmdMachine.Machine().ID
logsCtx, cancelLogs := context.WithCancel(ctx)
defer cancelLogs()
var buf *ring.Ring
if !flag.GetBool(ctx, "verbose") {
buf = ring.New(100)
}
go func() {
defer cancelLogs()
if stream == nil {
return
}
for entry := range stream.Stream(logsCtx, logOpts) {
var ts time.Time
if ts, err = time.Parse(time.RFC3339Nano, entry.Timestamp); err != nil {
err = fmt.Errorf("failed parsing timestamp %q: %w", entry.Timestamp, err)
return
}
msg := fmt.Sprintf("%s %s", aurora.Faint(format.Time(ts)), entry.Message)
if buf != nil {
buf.Value = msg
buf = buf.Next()
} else {
fmt.Fprintln(md.io.ErrOut)
}
if strings.Contains(entry.Message, "Main child exited normally") {
return
}
}
}()

fmt.Fprintln(md.io.ErrOut, "Starting machine")

if err = releaseCmdMachine.Start(ctx); err != nil {
fmt.Fprintf(md.io.ErrOut, "error starting release_command machine: %v\n", err)
return
}

// FIXME: consolidate this wait stuff with deploy waits? Especially once we improve the outpu
err = md.waitForReleaseCommandToFinish(ctx, releaseCmdMachine)
if err != nil {
Expand All @@ -69,35 +137,29 @@ func (md *machineDeployment) runReleaseCommand(ctx context.Context) (err error)
return fmt.Errorf("error get release_command machine %s exit code: %w", releaseCmdMachine.Machine().ID, err)
}

if exitCode != 0 || flag.GetBool(ctx, "verbose") {
if exitCode != 0 {
statuslogger.LogStatus(ctx, statuslogger.StatusFailure, "release_command failed")
}
if flag.GetBool(ctx, "verbose") {
waitForLogs(md, logsCtx, stream, releaseCmdMachine.Machine().ID)
}

if exitCode != 0 {
statuslogger.LogStatus(ctx, statuslogger.StatusFailure, "release_command failed")

// Preemptive cleanup of the logger so that the logs have a clean place to write to
loggerCleanup(false)

time.Sleep(2 * time.Second) // Wait 2 secs to be sure logs have reached OpenSearch
if exitCode != 0 {
fmt.Fprintf(md.io.ErrOut, "Error release_command failed running on machine %s with exit code %s.\n",
md.colorize.Bold(releaseCmdMachine.Machine().ID), md.colorize.Red(strconv.Itoa(exitCode)))
}
fmt.Fprintf(md.io.ErrOut, "Check its logs: here's the last 100 lines below, or run 'fly logs -i %s':\n",
releaseCmdMachine.Machine().ID)
releaseCmdLogs, _, err := md.apiClient.GetAppLogs(ctx, md.app.Name, "", md.appConfig.PrimaryRegion, releaseCmdMachine.Machine().ID)
if fly.IsNotAuthenticatedError(err) {
fmt.Fprintf(md.io.ErrOut, "Warn: not authorized to retrieve app logs (this can happen when using deploy tokens), so we can't show you what failed. Use `fly logs -i %s` or open the monitoring dashboard to see them: https://fly.io/apps/%s/monitoring?region=&instance=%s\n", releaseCmdMachine.Machine().ID, md.appConfig.AppName, releaseCmdMachine.Machine().ID)
} else {
if err != nil {
return fmt.Errorf("error getting release_command logs: %w", err)
}
for _, l := range releaseCmdLogs {
fmt.Fprintf(md.io.ErrOut, " %s\n", l.Message)
}
}
if exitCode != 0 {
return fmt.Errorf("error release_command machine %s exited with non-zero status of %d", releaseCmdMachine.Machine().ID, exitCode)
fmt.Fprintf(md.io.ErrOut, "Error release_command failed running on machine %s with exit code %s.\n",
md.colorize.Bold(releaseCmdMachine.Machine().ID), md.colorize.Red(strconv.Itoa(exitCode)))

if !flag.GetBool(ctx, "verbose") {
fmt.Fprintf(md.io.ErrOut, "Checking logs: fetching the last 100 lines below:\n")
waitForLogs(md, logsCtx, stream, releaseCmdMachine.Machine().ID)
buf.Do(func(str any) {
if str != nil {
fmt.Fprintln(md.io.ErrOut, str)
}
})
}
return fmt.Errorf("machine %s exited with non-zero status of %d", releaseCmdMachine.Machine().ID, exitCode)
}
statuslogger.LogfStatus(ctx,
statuslogger.StatusSuccess,
Expand All @@ -107,6 +169,22 @@ func (md *machineDeployment) runReleaseCommand(ctx context.Context) (err error)
return nil
}

// Wait up to 20 secs to be sure logs have been fully ingested, and log any errors.
func waitForLogs(md *machineDeployment, ctx context.Context, stream logs.LogStream, id string) {
timer := time.NewTimer(20 * time.Second)
defer timer.Stop()
select {
case <-ctx.Done():
if fly.IsNotAuthenticatedError(stream.Err()) {
fmt.Fprintf(md.io.ErrOut, "Warn: not authorized to retrieve app logs (this can happen when using deploy tokens). Use `fly logs -i %s` or open the monitoring dashboard to see them: https://fly.io/apps/%s/monitoring?region=&instance=%s\n", id, md.appConfig.AppName, id)
} else if stream.Err() != nil && !errors.Is(stream.Err(), context.Canceled) {
fmt.Fprintf(md.io.ErrOut, "error getting release command logs: %v\n", stream.Err())
}
case <-timer.C:
fmt.Fprintf(md.io.ErrOut, "timeout waiting for release command logs\n")
}
}

// dedicatedHostIdMismatch checks if the dedicatedHostID on a machine is the same as the one set in the fly.toml
// a mismatch will result in a delete+recreate op
func dedicatedHostIdMismatch(m *fly.Machine, ac *appconfig.Config) bool {
Expand Down Expand Up @@ -147,6 +225,13 @@ func (md *machineDeployment) createReleaseCommandMachine(ctx context.Context) er

statuslogger.Logf(ctx, "Created release_command machine %s", md.colorize.Bold(releaseCmdMachine.ID))
md.releaseCommandMachine = machine.NewMachineSet(md.flapsClient, md.io, []*fly.Machine{releaseCmdMachine}, true)

lm := md.releaseCommandMachine.GetMachines()[0]
if err := lm.WaitForState(ctx, fly.MachineStateStopped, md.waitTimeout, false); err != nil {
err = suggestChangeWaitTimeout(err, "wait-timeout")
return err
}

return nil
}

Expand Down Expand Up @@ -194,8 +279,9 @@ func (md *machineDeployment) launchInputForReleaseCommand(origMachineRaw *fly.Ma
}

return &fly.LaunchMachineInput{
Config: mConfig,
Region: origMachineRaw.Region,
Config: mConfig,
Region: origMachineRaw.Region,
SkipLaunch: true,
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/command/deploy/machines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func Test_resolveUpdatedMachineConfig_ReleaseCommand(t *testing.T) {
},
Guest: fly.MachinePresets["shared-cpu-2x"],
},
SkipLaunch: true,
}, got)

// Update existing release command machine
Expand Down Expand Up @@ -227,6 +228,7 @@ func Test_resolveUpdatedMachineConfig_ReleaseCommand(t *testing.T) {
},
Guest: fly.MachinePresets["shared-cpu-2x"],
},
SkipLaunch: true,
}, got)
}

Expand Down
4 changes: 2 additions & 2 deletions logs/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type pollingStream struct {
apiClient flyutil.Client
}

func NewPollingStream(client flyutil.Client, opts *LogOptions) (LogStream, error) {
return &pollingStream{apiClient: client}, nil
func NewPollingStream(client flyutil.Client) LogStream {
return &pollingStream{apiClient: client}
}

func (s *pollingStream) Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry {
Expand Down

0 comments on commit 37a4189

Please sign in to comment.