Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 3 additions & 174 deletions tools/storm/e2e/scenario/monitoring.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
package scenario

import (
"bufio"
"container/ring"
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/digitalocean/go-libvirt"
stormutils "github.com/microsoft/storm/pkg/storm/utils"
"github.com/sirupsen/logrus"

"tridenttools/pkg/ref"
ioutils "tridenttools/storm/utils/io"
"tridenttools/storm/utils/libvirtutils"
)

// spawnVMSerialMonitor starts the VM serial monitor for the test host IF it is a
Expand Down Expand Up @@ -65,7 +57,7 @@ func (s *TridentE2EScenario) spawnVMSerialMonitor(ctx context.Context, output io
}()
defer output.Close()

err := waitForVmSerialLogLoginLibvirt(ctx, vmInfo.Lv(), vmInfo.LvDomain(), output)
err := libvirtutils.WaitForVmSerialLogLoginLibvirt(ctx, vmInfo.Lv(), vmInfo.LvDomain(), output)
if err != nil {
errStr := fmt.Sprintf("VM serial log monitor ended with error: %v", err)
logrus.Error(errStr)
Expand All @@ -81,167 +73,4 @@ func (s *TridentE2EScenario) spawnVMSerialMonitor(ctx context.Context, output io
}()

return doneChannel, nil
}

func waitForVmSerialLogLoginLibvirt(ctx context.Context, lv *libvirt.Libvirt, domain libvirt.Domain, out io.Writer) error {
pipeReader, pipeWriter := io.Pipe()
consoleCtx, consoleCancel := context.WithCancel(ctx)
var wg sync.WaitGroup

// Context to track whether the console is open. This is used to cancel the
// reader loop if the console goroutine exits, because this means there will
// never be any more data to read, so we should exit the reader loop as
// well.
consoleIsOpenCtx, consoleIsOpenCancel := context.WithCancel(ctx)
defer consoleIsOpenCancel()

// Spawn DomainOpenConsole in a goroutine because it's a blocking call.
errCh := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
defer consoleIsOpenCancel()
defer pipeWriter.Close()
pipeNotifyWriter := ioutils.NewNotifyWriter(pipeWriter)
for {
// If context is cancelled, exit the goroutine.
if consoleCtx.Err() != nil {
return
}

// Try to open the console. This is a blocking call that only
// returns when the console is closed or an error occurs. It writes
// to the provided writer in the background.
err := lv.DomainOpenConsole(domain, nil, pipeNotifyWriter, uint32(libvirt.DomainConsoleForce))
if err == nil && pipeNotifyWriter.Active() {
// DomainOpenConsole returned without error and data was
// written, this is an expected outcome when the console closed
// naturally.
return
}

if consoleCtx.Err() != nil {
// Context was cancelled while the console was open/opening,
// exit the goroutine.
return
}

if !pipeNotifyWriter.Active() {
// No data has been written yet, so this is likely a
// transient error such as the domain not being fully
// started yet. Retry silently.
time.Sleep(100 * time.Millisecond)
continue
}

// If we get here, there was an error after some data was
// successfully written. Log the error and exit the goroutine. This
// may happen naturally when the pipe is closed. But if that happens
// naturally, the readerLoop will have exited first, so this error
// won't matter.
logrus.Errorf("DomainOpenConsole error after data written: %v", err)
errCh <- err
return
}
}()

// Call inner loop
loopErr := readerLoop(consoleIsOpenCtx, pipeReader, errCh, out, 30)
// Regardless of whether readerLoop returned an error, cancel the console
// context and close the pipe to stop the DomainOpenConsole goroutine.
consoleCancel()
pipeReader.Close()
pipeWriter.Close()

// Even after we close all of this, the DomainOpenConsole goroutine may
// still be running because it doesn't take in a context. We force it to
// close by opening a new console with the DomainConsoleForce flag, and a
// nil stream, which will signal the existing DomainOpenConsole to exit, and
// make this new one exit immediately.
err := lv.DomainOpenConsole(domain, nil, nil, uint32(libvirt.DomainConsoleForce))
if err != nil {
logrus.Warnf("failed to force close DomainOpenConsole: %v", err)
}

// Wait for DomainOpenConsole goroutine to exit
wg.Wait()

return loopErr
}

func readerLoop(ctx context.Context, in io.Reader, errCh <-chan error, out io.Writer, ringSize int) error {
// Create a ring buffer to hold the last N lines of the serial log
ring := ring.New(ringSize)

reader := bufio.NewReader(in)
var lineBuffBuilder strings.Builder
for {
// Check for context cancellation
if ctx.Err() != nil {
// Print the last `ringSize` lines of the serial log before timing out
logrus.Errorf("VM serial monitor was cancelled. Last %d lines of VM serial log before timeout:\n%s", ringSize, func() string {
var sb strings.Builder
ring.Do(func(p interface{}) {
if p != nil {
sb.WriteString(p.(string) + "\n")
}
})
return sb.String()
}())

return ctx.Err()
}

// Check if the console stream has ended
select {
case err := <-errCh:
if err != nil {
err = fmt.Errorf("libvirt console stream ended with error: %w", err)
}
logrus.Infof("Libvirt console stream ended")
return err
default:
// Continue reading
}

// Check if the current line contains the login prompt, and return if it
// does. The log-in prompt is expected to contain the string "login:"
// but we need to block the false positive caused by the installer OS
// login prompt. The installer OS hostname includes the string "mos" so
// we can use that to filter out installer login prompts.
if strings.Contains(lineBuffBuilder.String(), "login:") &&
!strings.Contains(lineBuffBuilder.String(), "mos") {
logrus.Infof("Login prompt found in VM serial log")
return nil
}

// Read a rune from reader, if EOF is encountered, retry until either a new
// character is read or the timeout is reached
readRune, _, err := reader.ReadRune()
if errors.Is(err, io.EOF) {
// Wait for new serial output
time.Sleep(100 * time.Millisecond)
continue
} else if err != nil {
return fmt.Errorf("failed to read from serial log file: %w", err)
}

if readRune == '\n' {
// Store the line in the ring buffer
ring.Value = lineBuffBuilder.String()
ring = ring.Next()

// Output the line to the provided writer
_, err := out.Write([]byte(stormutils.RemoveAllANSI(lineBuffBuilder.String()) + "\n"))
if err != nil {
return fmt.Errorf("failed to write serial log output: %w", err)
}

// New line, reset line buffer
lineBuffBuilder.Reset()
} else {
// Append rune to line buffer, this operation always succeeds.
lineBuffBuilder.WriteRune(readRune)
}
}
}
}
67 changes: 48 additions & 19 deletions tools/storm/helpers/direct_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package helpers

import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"tridenttools/pkg/netlaunch"
stormutils "tridenttools/storm/utils"
"tridenttools/storm/utils/libvirtutils"

"github.com/microsoft/storm"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -44,8 +46,6 @@ func (h *DirectStreamingHelper) RegisterTestCases(r storm.TestRegistrar) error {
}

func (h *DirectStreamingHelper) directStreaming(tc storm.TestCase) error {
startTime := time.Now()

// Create Host Configuration with image information for direct streaming test
hostConfig, err := h.createTempHostConfig()
if err != nil {
Expand Down Expand Up @@ -79,32 +79,61 @@ func (h *DirectStreamingHelper) directStreaming(tc storm.TestCase) error {
}
defer tc.ArtifactBroker().PublishLogFile("vm-serial.log", vmSerialLog)
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are conflicting deferred calls to publish "vm-serial.log": this line publishes vmSerialLog (the VM's serial log file path from libvirt domain XML), while lines 122-124 publish logFile.Name() (a newly created temp file). Since the new libvirt console approach writes to a new temp file instead of using the existing VM serial log file, this defer statement should be removed. The VM serial log file at vmSerialLog path is no longer being used or written to in the new implementation.

Suggested change
defer tc.ArtifactBroker().PublishLogFile("vm-serial.log", vmSerialLog)

Copilot uses AI. Check for mistakes.

// Start netlaunch in background because the VM will not connect back to
// netlaunch and we need the file server to continue running until the image
// has been pulled and deployed. Netlaunch is intended to run until the test
// ends.
netlaunchContext := context.Background()
bootCtx, bootCtxCancel := context.WithTimeout(tc.Context(), time.Duration(h.args.TimeoutInSeconds)*time.Second)
defer bootCtxCancel()

// Run netlaunch in a separate goroutine since it is a blocking call. It
// will not exit regularly from a success/failure signal from Trident, but
// it needs to keep running to ensure the file server is up for Trident to
// pull the image from. It will be forcefully stopped when the test finishes.
go func() {
logrus.Info("Starting netlaunch...")
netlaunchErr := netlaunch.RunNetlaunch(netlaunchContext, netlaunchConfig)
logrus.Info("netlaunch stopped.")
if netlaunchErr != nil {
tc.FailFromError(netlaunchErr)
netlaunchErr := netlaunch.RunNetlaunch(tc.Context(), netlaunchConfig)
if netlaunchErr != nil && !errors.Is(netlaunchErr, context.Canceled) {
// If we got here, netlaunch failed from an internal error, not from
// the test finishing and canceling the context.
logrus.Errorf("netlaunch returned an error: %v", netlaunchErr)

// Cancel the boot context to signal the main test goroutine to stop
// monitoring the serial log, since netlaunch has stopped abnormally
// and the file server is no longer available.
bootCtxCancel()
}
}()

time.Sleep(10 * time.Second) // Give netlaunch some time to start

// Wait for login message in serial log
remainingTimeout := (time.Duration(h.args.TimeoutInSeconds) * time.Second) - time.Since(startTime)
err = stormutils.WaitForLoginMessageInSerialLog(vmSerialLog, true, 1, "/tmp/serial.log", remainingTimeout)
tc.ArtifactBroker().PublishLogFile("serial.log", "/tmp/serial.log")
lv, err := libvirtutils.Connect()
if err != nil {
logrus.Errorf("Failed to find login message in VM serial log: %v", err)
tc.FailFromError(err)
return fmt.Errorf("failed to connect to libvirt: %w", err)
}
defer lv.Disconnect()

domain, err := lv.DomainLookupByName(h.args.VmName)
if err != nil {
return fmt.Errorf("failed to find domain by name '%s': %w", h.args.VmName, err)
}

logFile, err := os.CreateTemp("", "console-*.log")
if err != nil {
return fmt.Errorf("failed to create temp file for console log: %w", err)
}
defer os.Remove(logFile.Name())
defer func() {
tc.ArtifactBroker().PublishLogFile("vm-serial.log", logFile.Name())
}()
defer logFile.Close()

err = libvirtutils.WaitForVmSerialLogLoginLibvirt(bootCtx, lv, domain, io.MultiWriter(logFile, os.Stdout))
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
tc.Fail("Login prompt not found within timeout")
}

return fmt.Errorf("error while monitoring VM serial log: %w", err)
}

logrus.Info("Direct streaming test completed successfully")
logrus.Info("Successfully found login message in VM serial log")
return nil
}

Expand Down
Loading
Loading