Skip to content

Commit

Permalink
Handle process termination race
Browse files Browse the repository at this point in the history
The process might get terminated before EOF, so add a check for that specific error. This does a bit of refactoring too to decouple line-reading from line-parsing.
  • Loading branch information
jefferbrecht committed Nov 21, 2024
1 parent 19a24c5 commit 801db3a
Showing 1 changed file with 36 additions and 45 deletions.
81 changes: 36 additions & 45 deletions transformation_test/transformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -443,60 +444,50 @@ func (transformationConfig transformationTest) runOTelTestInner(t *testing.T, na
t.Fatal("Failed to start command:", err)
}

var errors []any
var errorList []any

// Read from stderr until EOF and put any errors in `errors`.
eg.Go(func() error {
consumingCount := 0
r := bufio.NewReader(stderr)
d := json.NewDecoder(r)
for {
s := bufio.NewScanner(stderr)
for s.Scan() {
line := s.Text()
log := map[string]any{}
if err := d.Decode(&log); err == io.EOF {
return nil
} else if err != nil {
// Not valid JSON, just print the raw stderr.
// This happens when the config is invalid.
buf, err2 := io.ReadAll(d.Buffered())
if err2 != nil {
return err
if err := json.Unmarshal([]byte(line), &log); err == nil {
t.Logf("collector log output: %s", line)
delete(log, "ts")
level, _ := log["level"].(string)
if level != "info" && level != "debug" && level != "None" {
errorList = append(errorList, log)
}
buf2, err2 := io.ReadAll(r)
if err2 != nil {
return err
}
stderr := fmt.Sprintf("%s%s", string(buf), string(buf2))
t.Logf("collector stderr:\n%s", stderr)
stderr = sanitizeStacktrace(t, stderr)
errors = append(errors, map[string]any{"stderr": stderr})
return nil
}
b, err := json.Marshal(log)
if err != nil {
t.Errorf("failed to marshal otel log: %v", err)
} else {
t.Logf("collector log output: %s", b)
}
delete(log, "ts")
level, _ := log["level"].(string)
if level != "info" && level != "debug" && level != "None" {
errors = append(errors, log)
}
msg, _ := log["msg"].(string)
if strings.HasPrefix(msg, "Consuming files") {
consumingCount += 1
if consumingCount == 2 {
// We've processed the entire input file. Signal the collector to stop.
if err := cmd.Process.Signal(os.Interrupt); err != nil {
t.Errorf("failed to signal process: %v", err)
msg, _ := log["msg"].(string)
if strings.HasPrefix(msg, "Consuming files") {
consumingCount += 1
if consumingCount == 2 {
// We've processed the entire input file. Signal the collector to stop.
if err := cmd.Process.Signal(os.Interrupt); err != nil {
t.Errorf("failed to signal process: %v", err)
}
}
}
stacktrace, ok := log["stacktrace"].(string)
if ok {
log["stacktrace"] = sanitizeStacktrace(t, stacktrace)
}
} else {
t.Logf("collector stderr:\n%s", line)
line = sanitizeStacktrace(t, line)
errorList = append(errorList, map[string]any{"stderr": line})
}
stacktrace, ok := log["stacktrace"].(string)
if ok {
log["stacktrace"] = sanitizeStacktrace(t, stacktrace)
}
if err := s.Err(); err != nil {
if errors.Is(err, os.ErrClosed) {
t.Logf("encountered non-fatal scanner error: %s", err)
} else {
return err
}
}
return nil
})
// Read and sanitize requests.
eg.Go(func() error {
Expand Down Expand Up @@ -529,8 +520,8 @@ func (transformationConfig transformationTest) runOTelTestInner(t *testing.T, na
if exitErr != nil {
got = append(got, map[string]any{"exit_error": exitErr.Error()})
}
if len(errors) != 0 {
got = append(got, map[string]any{"collector_errors": errors})
if len(errorList) != 0 {
got = append(got, map[string]any{"collector_errors": errorList})
}
return got
}
Expand Down

0 comments on commit 801db3a

Please sign in to comment.