diff --git a/tools/storm/e2e/scenario/monitoring.go b/tools/storm/e2e/scenario/monitoring.go index 357864997..24320fc6a 100644 --- a/tools/storm/e2e/scenario/monitoring.go +++ b/tools/storm/e2e/scenario/monitoring.go @@ -1,22 +1,14 @@ package scenario import ( - "bufio" - "container/ring" "context" - "errors" "fmt" "io" - "strings" - "sync" - "time" - "github.com/digitalocean/go-libvirt" - stormutils "github.com/microsoft/storm/pkg/storm/utils" "github.com/sirupsen/logrus" "tridenttools/pkg/ref" - ioutils "tridenttools/storm/utils/io" + "tridenttools/storm/utils/libvirtutils" ) // spawnVMSerialMonitor starts the VM serial monitor for the test host IF it is a @@ -65,7 +57,7 @@ func (s *TridentE2EScenario) spawnVMSerialMonitor(ctx context.Context, output io }() defer output.Close() - err := waitForVmSerialLogLoginLibvirt(ctx, vmInfo.Lv(), vmInfo.LvDomain(), output) + err := libvirtutils.WaitForVmSerialLogLoginLibvirt(ctx, vmInfo.Lv(), vmInfo.LvDomain(), output) if err != nil { errStr := fmt.Sprintf("VM serial log monitor ended with error: %v", err) logrus.Error(errStr) @@ -81,167 +73,4 @@ func (s *TridentE2EScenario) spawnVMSerialMonitor(ctx context.Context, output io }() return doneChannel, nil -} - -func waitForVmSerialLogLoginLibvirt(ctx context.Context, lv *libvirt.Libvirt, domain libvirt.Domain, out io.Writer) error { - pipeReader, pipeWriter := io.Pipe() - consoleCtx, consoleCancel := context.WithCancel(ctx) - var wg sync.WaitGroup - - // Context to track whether the console is open. This is used to cancel the - // reader loop if the console goroutine exits, because this means there will - // never be any more data to read, so we should exit the reader loop as - // well. - consoleIsOpenCtx, consoleIsOpenCancel := context.WithCancel(ctx) - defer consoleIsOpenCancel() - - // Spawn DomainOpenConsole in a goroutine because it's a blocking call. - errCh := make(chan error, 1) - wg.Add(1) - go func() { - defer wg.Done() - defer consoleIsOpenCancel() - defer pipeWriter.Close() - pipeNotifyWriter := ioutils.NewNotifyWriter(pipeWriter) - for { - // If context is cancelled, exit the goroutine. - if consoleCtx.Err() != nil { - return - } - - // Try to open the console. This is a blocking call that only - // returns when the console is closed or an error occurs. It writes - // to the provided writer in the background. - err := lv.DomainOpenConsole(domain, nil, pipeNotifyWriter, uint32(libvirt.DomainConsoleForce)) - if err == nil && pipeNotifyWriter.Active() { - // DomainOpenConsole returned without error and data was - // written, this is an expected outcome when the console closed - // naturally. - return - } - - if consoleCtx.Err() != nil { - // Context was cancelled while the console was open/opening, - // exit the goroutine. - return - } - - if !pipeNotifyWriter.Active() { - // No data has been written yet, so this is likely a - // transient error such as the domain not being fully - // started yet. Retry silently. - time.Sleep(100 * time.Millisecond) - continue - } - - // If we get here, there was an error after some data was - // successfully written. Log the error and exit the goroutine. This - // may happen naturally when the pipe is closed. But if that happens - // naturally, the readerLoop will have exited first, so this error - // won't matter. - logrus.Errorf("DomainOpenConsole error after data written: %v", err) - errCh <- err - return - } - }() - - // Call inner loop - loopErr := readerLoop(consoleIsOpenCtx, pipeReader, errCh, out, 30) - // Regardless of whether readerLoop returned an error, cancel the console - // context and close the pipe to stop the DomainOpenConsole goroutine. - consoleCancel() - pipeReader.Close() - pipeWriter.Close() - - // Even after we close all of this, the DomainOpenConsole goroutine may - // still be running because it doesn't take in a context. We force it to - // close by opening a new console with the DomainConsoleForce flag, and a - // nil stream, which will signal the existing DomainOpenConsole to exit, and - // make this new one exit immediately. - err := lv.DomainOpenConsole(domain, nil, nil, uint32(libvirt.DomainConsoleForce)) - if err != nil { - logrus.Warnf("failed to force close DomainOpenConsole: %v", err) - } - - // Wait for DomainOpenConsole goroutine to exit - wg.Wait() - - return loopErr -} - -func readerLoop(ctx context.Context, in io.Reader, errCh <-chan error, out io.Writer, ringSize int) error { - // Create a ring buffer to hold the last N lines of the serial log - ring := ring.New(ringSize) - - reader := bufio.NewReader(in) - var lineBuffBuilder strings.Builder - for { - // Check for context cancellation - if ctx.Err() != nil { - // Print the last `ringSize` lines of the serial log before timing out - logrus.Errorf("VM serial monitor was cancelled. Last %d lines of VM serial log before timeout:\n%s", ringSize, func() string { - var sb strings.Builder - ring.Do(func(p interface{}) { - if p != nil { - sb.WriteString(p.(string) + "\n") - } - }) - return sb.String() - }()) - - return ctx.Err() - } - - // Check if the console stream has ended - select { - case err := <-errCh: - if err != nil { - err = fmt.Errorf("libvirt console stream ended with error: %w", err) - } - logrus.Infof("Libvirt console stream ended") - return err - default: - // Continue reading - } - - // Check if the current line contains the login prompt, and return if it - // does. The log-in prompt is expected to contain the string "login:" - // but we need to block the false positive caused by the installer OS - // login prompt. The installer OS hostname includes the string "mos" so - // we can use that to filter out installer login prompts. - if strings.Contains(lineBuffBuilder.String(), "login:") && - !strings.Contains(lineBuffBuilder.String(), "mos") { - logrus.Infof("Login prompt found in VM serial log") - return nil - } - - // Read a rune from reader, if EOF is encountered, retry until either a new - // character is read or the timeout is reached - readRune, _, err := reader.ReadRune() - if errors.Is(err, io.EOF) { - // Wait for new serial output - time.Sleep(100 * time.Millisecond) - continue - } else if err != nil { - return fmt.Errorf("failed to read from serial log file: %w", err) - } - - if readRune == '\n' { - // Store the line in the ring buffer - ring.Value = lineBuffBuilder.String() - ring = ring.Next() - - // Output the line to the provided writer - _, err := out.Write([]byte(stormutils.RemoveAllANSI(lineBuffBuilder.String()) + "\n")) - if err != nil { - return fmt.Errorf("failed to write serial log output: %w", err) - } - - // New line, reset line buffer - lineBuffBuilder.Reset() - } else { - // Append rune to line buffer, this operation always succeeds. - lineBuffBuilder.WriteRune(readRune) - } - } -} +} \ No newline at end of file diff --git a/tools/storm/helpers/direct_streaming.go b/tools/storm/helpers/direct_streaming.go index 46b938f1c..fa400c44e 100644 --- a/tools/storm/helpers/direct_streaming.go +++ b/tools/storm/helpers/direct_streaming.go @@ -2,14 +2,16 @@ package helpers import ( "context" + "errors" "fmt" + "io" "os" "os/exec" "path/filepath" "strings" "time" "tridenttools/pkg/netlaunch" - stormutils "tridenttools/storm/utils" + "tridenttools/storm/utils/libvirtutils" "github.com/microsoft/storm" "github.com/sirupsen/logrus" @@ -44,8 +46,6 @@ func (h *DirectStreamingHelper) RegisterTestCases(r storm.TestRegistrar) error { } func (h *DirectStreamingHelper) directStreaming(tc storm.TestCase) error { - startTime := time.Now() - // Create Host Configuration with image information for direct streaming test hostConfig, err := h.createTempHostConfig() if err != nil { @@ -79,32 +79,61 @@ func (h *DirectStreamingHelper) directStreaming(tc storm.TestCase) error { } defer tc.ArtifactBroker().PublishLogFile("vm-serial.log", vmSerialLog) - // Start netlaunch in background because the VM will not connect back to - // netlaunch and we need the file server to continue running until the image - // has been pulled and deployed. Netlaunch is intended to run until the test - // ends. - netlaunchContext := context.Background() + bootCtx, bootCtxCancel := context.WithTimeout(tc.Context(), time.Duration(h.args.TimeoutInSeconds)*time.Second) + defer bootCtxCancel() + + // Run netlaunch in a separate goroutine since it is a blocking call. It + // will not exit regularly from a success/failure signal from Trident, but + // it needs to keep running to ensure the file server is up for Trident to + // pull the image from. It will be forcefully stopped when the test finishes. go func() { logrus.Info("Starting netlaunch...") - netlaunchErr := netlaunch.RunNetlaunch(netlaunchContext, netlaunchConfig) - logrus.Info("netlaunch stopped.") - if netlaunchErr != nil { - tc.FailFromError(netlaunchErr) + netlaunchErr := netlaunch.RunNetlaunch(tc.Context(), netlaunchConfig) + if netlaunchErr != nil && !errors.Is(netlaunchErr, context.Canceled) { + // If we got here, netlaunch failed from an internal error, not from + // the test finishing and canceling the context. + logrus.Errorf("netlaunch returned an error: %v", netlaunchErr) + + // Cancel the boot context to signal the main test goroutine to stop + // monitoring the serial log, since netlaunch has stopped abnormally + // and the file server is no longer available. + bootCtxCancel() } }() time.Sleep(10 * time.Second) // Give netlaunch some time to start - // Wait for login message in serial log - remainingTimeout := (time.Duration(h.args.TimeoutInSeconds) * time.Second) - time.Since(startTime) - err = stormutils.WaitForLoginMessageInSerialLog(vmSerialLog, true, 1, "/tmp/serial.log", remainingTimeout) - tc.ArtifactBroker().PublishLogFile("serial.log", "/tmp/serial.log") + lv, err := libvirtutils.Connect() if err != nil { - logrus.Errorf("Failed to find login message in VM serial log: %v", err) - tc.FailFromError(err) + return fmt.Errorf("failed to connect to libvirt: %w", err) + } + defer lv.Disconnect() + + domain, err := lv.DomainLookupByName(h.args.VmName) + if err != nil { + return fmt.Errorf("failed to find domain by name '%s': %w", h.args.VmName, err) + } + + logFile, err := os.CreateTemp("", "console-*.log") + if err != nil { + return fmt.Errorf("failed to create temp file for console log: %w", err) + } + defer os.Remove(logFile.Name()) + defer func() { + tc.ArtifactBroker().PublishLogFile("vm-serial.log", logFile.Name()) + }() + defer logFile.Close() + + err = libvirtutils.WaitForVmSerialLogLoginLibvirt(bootCtx, lv, domain, io.MultiWriter(logFile, os.Stdout)) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + tc.Fail("Login prompt not found within timeout") + } + + return fmt.Errorf("error while monitoring VM serial log: %w", err) } - logrus.Info("Direct streaming test completed successfully") + logrus.Info("Successfully found login message in VM serial log") return nil } diff --git a/tools/storm/utils/libvirtutils/console.go b/tools/storm/utils/libvirtutils/console.go new file mode 100644 index 000000000..988fc640d --- /dev/null +++ b/tools/storm/utils/libvirtutils/console.go @@ -0,0 +1,182 @@ +package libvirtutils + +import ( + "bufio" + "container/ring" + "context" + "errors" + "fmt" + "io" + "strings" + "sync" + "time" + + "github.com/digitalocean/go-libvirt" + stormutils "github.com/microsoft/storm/pkg/storm/utils" + "github.com/sirupsen/logrus" + + ioutils "tridenttools/storm/utils/io" +) + +func WaitForVmSerialLogLoginLibvirt(ctx context.Context, lv *libvirt.Libvirt, domain libvirt.Domain, out io.Writer) error { + pipeReader, pipeWriter := io.Pipe() + consoleCtx, consoleCancel := context.WithCancel(ctx) + var wg sync.WaitGroup + + // Context to track whether the console is open. This is used to cancel the + // reader loop if the console goroutine exits, because this means there will + // never be any more data to read, so we should exit the reader loop as + // well. + consoleIsOpenCtx, consoleIsOpenCancel := context.WithCancel(ctx) + defer consoleIsOpenCancel() + + // Spawn DomainOpenConsole in a goroutine because it's a blocking call. + errCh := make(chan error, 1) + wg.Add(1) + go func() { + defer wg.Done() + defer consoleIsOpenCancel() + defer pipeWriter.Close() + pipeNotifyWriter := ioutils.NewNotifyWriter(pipeWriter) + for { + // If context is cancelled, exit the goroutine. + if consoleCtx.Err() != nil { + return + } + + // Try to open the console. This is a blocking call that only + // returns when the console is closed or an error occurs. It writes + // to the provided writer in the background. + err := lv.DomainOpenConsole(domain, nil, pipeNotifyWriter, uint32(libvirt.DomainConsoleForce)) + if err == nil && pipeNotifyWriter.Active() { + // DomainOpenConsole returned without error and data was + // written, this is an expected outcome when the console closed + // naturally. + return + } + + if consoleCtx.Err() != nil { + // Context was cancelled while the console was open/opening, + // exit the goroutine. + return + } + + if !pipeNotifyWriter.Active() { + // No data has been written yet, so this is likely a + // transient error such as the domain not being fully + // started yet. Retry silently. + time.Sleep(100 * time.Millisecond) + continue + } + + // If we get here, there was an error after some data was + // successfully written. Log the error and exit the goroutine. This + // may happen naturally when the pipe is closed. But if that happens + // naturally, the readerLoop will have exited first, so this error + // won't matter. + logrus.Errorf("DomainOpenConsole error after data written: %v", err) + errCh <- err + return + } + }() + + // Call inner loop + loopErr := readerLoop(consoleIsOpenCtx, pipeReader, errCh, out, 30) + // Regardless of whether readerLoop returned an error, cancel the console + // context and close the pipe to stop the DomainOpenConsole goroutine. + consoleCancel() + pipeReader.Close() + pipeWriter.Close() + + // Even after we close all of this, the DomainOpenConsole goroutine may + // still be running because it doesn't take in a context. We force it to + // close by opening a new console with the DomainConsoleForce flag, and a + // nil stream, which will signal the existing DomainOpenConsole to exit, and + // make this new one exit immediately. + err := lv.DomainOpenConsole(domain, nil, nil, uint32(libvirt.DomainConsoleForce)) + if err != nil { + logrus.Warnf("failed to force close DomainOpenConsole: %v", err) + } + + // Wait for DomainOpenConsole goroutine to exit + wg.Wait() + + return loopErr +} + +func readerLoop(ctx context.Context, in io.Reader, errCh <-chan error, out io.Writer, ringSize int) error { + // Create a ring buffer to hold the last N lines of the serial log + ring := ring.New(ringSize) + + reader := bufio.NewReader(in) + var lineBuffBuilder strings.Builder + for { + // Check for context cancellation + if ctx.Err() != nil { + // Print the last `ringSize` lines of the serial log before timing out + logrus.Errorf("VM serial monitor was cancelled. Last %d lines of VM serial log before timeout:\n%s", ringSize, func() string { + var sb strings.Builder + ring.Do(func(p interface{}) { + if p != nil { + sb.WriteString(p.(string) + "\n") + } + }) + return sb.String() + }()) + + return ctx.Err() + } + + // Check if the console stream has ended + select { + case err := <-errCh: + if err != nil { + err = fmt.Errorf("libvirt console stream ended with error: %w", err) + } + logrus.Infof("Libvirt console stream ended") + return err + default: + // Continue reading + } + + // Check if the current line contains the login prompt, and return if it + // does. The log-in prompt is expected to contain the string "login:" + // but we need to block the false positive caused by the installer OS + // login prompt. The installer OS hostname includes the string "mos" so + // we can use that to filter out installer login prompts. + if strings.Contains(lineBuffBuilder.String(), "login:") && + !strings.Contains(lineBuffBuilder.String(), "mos") { + logrus.Infof("Login prompt found in VM serial log") + return nil + } + + // Read a rune from reader, if EOF is encountered, retry until either a new + // character is read or the timeout is reached + readRune, _, err := reader.ReadRune() + if errors.Is(err, io.EOF) { + // Wait for new serial output + time.Sleep(100 * time.Millisecond) + continue + } else if err != nil { + return fmt.Errorf("failed to read from serial log file: %w", err) + } + + if readRune == '\n' { + // Store the line in the ring buffer + ring.Value = lineBuffBuilder.String() + ring = ring.Next() + + // Output the line to the provided writer + _, err := out.Write([]byte(stormutils.RemoveAllANSI(lineBuffBuilder.String()) + "\n")) + if err != nil { + return fmt.Errorf("failed to write serial log output: %w", err) + } + + // New line, reset line buffer + lineBuffBuilder.Reset() + } else { + // Append rune to line buffer, this operation always succeeds. + lineBuffBuilder.WriteRune(readRune) + } + } +} diff --git a/tools/storm/utils/libvirtutils/open.go b/tools/storm/utils/libvirtutils/open.go new file mode 100644 index 000000000..f412268ea --- /dev/null +++ b/tools/storm/utils/libvirtutils/open.go @@ -0,0 +1,27 @@ +package libvirtutils + +import ( + "fmt" + "net/url" + + "github.com/digitalocean/go-libvirt" + "github.com/sirupsen/logrus" +) + +// Connect establishes a connection to the local libvirt daemon. It +// returns the libvirt connection instance or an error if the connection could +// not be established. +func Connect() (*libvirt.Libvirt, error) { + parsedURL, err := url.Parse("qemu:///system") + if err != nil { + return nil, fmt.Errorf("failed to parse libvirt URI: %w", err) + } + + logrus.Debugf("Connecting to libvirt at '%s'", parsedURL.String()) + lvConn, err := libvirt.ConnectToURI(parsedURL) + if err != nil { + return nil, fmt.Errorf("failed to connect to libvirt hypervisor '%s', Is your user in the libvirt group?: %w", parsedURL.String(), err) + } + + return lvConn, nil +}