Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [TKC-2561] do not merge #5925

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,11 @@ func NewRunTestWorkflowCmd() *cobra.Command {
if outputPretty {
ui.NL()
if watchEnabled {
fmt.Println("uiWatch")
exitCode = uiWatch(execution, client)
ui.NL()
if downloadArtifactsEnabled {
fmt.Println("downlaod artifact")
tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty)
}
} else {
Expand Down Expand Up @@ -130,7 +132,9 @@ func NewRunTestWorkflowCmd() *cobra.Command {
}

func uiWatch(execution testkube.TestWorkflowExecution, client apiclientv1.Client) int {
fmt.Println("watch logs")
result, err := watchTestWorkflowLogs(execution.Id, execution.Signature, client)
fmt.Println("watch logs done", err, result)
ui.ExitOnError("reading test workflow execution logs", err)

// Apply the result in the execution
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/creasty/defaults v1.7.0
github.com/davecgh/go-spew v1.1.1
github.com/denisbrodbeck/machineid v1.0.1
github.com/dustin/go-humanize v1.0.1
github.com/dustinkirkland/golang-petname v0.0.0-20191129215211-8e5a1ed0cff0
Expand Down Expand Up @@ -97,7 +98,6 @@ require (
github.com/containerd/console v1.0.3 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.14.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/docker/cli v24.0.0+incompatible // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
Expand Down
7 changes: 6 additions & 1 deletion internal/app/api/v1/testworkflowexecutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,20 +456,25 @@ func (s *TestkubeAPI) GetTestWorkflowNotificationsStream(ctx context.Context, ex
// Check for the logs
ctrl, err := testworkflowcontroller.New(ctx, s.Clientset, execution.GetNamespace(s.Namespace), execution.Id, execution.ScheduledAt)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create test workflow controller")
}

fmt.Println("GetTestWorkflowNotificationsStream", executionID)
// Stream the notifications
ch := make(chan testkube.TestWorkflowExecutionNotification)
go func() {
for n := range ctrl.Watch(ctx) {
if n.Error == nil {
ch <- n.Value.ToInternal()
} else {
s.Log.Errorw("failed to watch logs", "error", n.Error)
}
}
ctrl.StopController()
close(ch)
}()
fmt.Println("GetTestWorkflowNotificationsStream done", executionID)

return ch, nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/testworkflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand Down Expand Up @@ -114,6 +115,7 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c
}
}
if err != nil {
ag.logger.Errorf("error executing workflow notifications request: %s", err.Error())
message := fmt.Sprintf("cannot get pod logs: %s", err.Error())
ag.testWorkflowNotificationsResponseBuffer <- &cloud.TestWorkflowNotificationsResponse{
StreamId: req.StreamId,
Expand All @@ -139,6 +141,9 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c
Ref: n.Ref,
Type: t,
}

fmt.Println("Notification", n)
spew.Dump(n)
if n.Result != nil {
m, _ := json.Marshal(n.Result)
msg.Message = string(m)
Expand All @@ -153,9 +158,11 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c
select {
case ag.testWorkflowNotificationsResponseBuffer <- msg:
case <-ctx.Done():
fmt.Println("Agent Context error ", ctx.Err(), context.Cause(ctx))
return ctx.Err()
}
case <-ctx.Done():
fmt.Println("Agent Context error2", ctx.Err(), context.Cause(ctx))
return ctx.Err()
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/v1/client/direct_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func (t DirectClient[A]) GetLogsV2(uri string, logs chan events.Log) error {

// GetTestWorkflowExecutionNotifications returns logs stream from job pods, based on job pods logs
func (t DirectClient[A]) GetTestWorkflowExecutionNotifications(uri string, notifications chan testkube.TestWorkflowExecutionNotification) error {

fmt.Println("get", uri)
req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil {
return err
Expand All @@ -234,6 +236,7 @@ func (t DirectClient[A]) GetTestWorkflowExecutionNotifications(uri string, notif
defer close(notifications)
defer resp.Body.Close()

fmt.Println("StreamToTestWorkflowExecutionNotificationsChannel")
StreamToTestWorkflowExecutionNotificationsChannel(resp.Body, notifications)
}()

Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1/client/testworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (c TestWorkflowClient) ExecuteTestWorkflow(name string, request testkube.Te
func (c TestWorkflowClient) GetTestWorkflowExecutionNotifications(id string) (notifications chan testkube.TestWorkflowExecutionNotification, err error) {
notifications = make(chan testkube.TestWorkflowExecutionNotification)
uri := c.testWorkflowTransport.GetURI("/test-workflow-executions/%s/notifications", id)
fmt.Println("uri", uri)
err = c.testWorkflowTransport.GetTestWorkflowExecutionNotifications(uri, notifications)
return notifications, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/testworkflows/testworkflowcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testworkflowcontroller

import (
"context"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -170,6 +171,8 @@ func (c *controller) Watch(parentCtx context.Context) <-chan ChannelMessage[Noti
ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.watcher, WatchInstrumentedPodOptions{})
if err != nil {
v := newChannel[Notification](context.Background(), 1)

fmt.Println("Error in WatchInstrumentedPod", err)
v.Error(err)
v.Close()
return v.Channel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,21 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 1", ctx.Err(), context.Cause(ctx))
return
}

// Handle the case when it has been complete without pod start
if !watcher.State().PodStarted() && (watcher.State().Completed() || opts.DisableFollow) {
notifier.Align(watcher.State())
fmt.Println("Pod has not been started")
return
}

// Load the pod information
if watcher.State().EstimatedPodStartTimestamp().IsZero() {
notifier.Error(fmt.Errorf("cannot estimate Pod start"))
fmt.Println("cannot estimate Pod start")
return
}

Expand All @@ -95,6 +98,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
actions, err := watcher.State().ActionGroups()
if err != nil {
notifier.Error(fmt.Errorf("cannot read execution instructions: %v", err))
fmt.Println("cannot read execution instructions", err)
return
}
refs, endRefs := ExtractRefsFromActionGroup(actions)
Expand Down Expand Up @@ -148,6 +152,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 2", ctx.Err(), context.Cause(ctx))
return
}

Expand All @@ -164,6 +169,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
for v := range WatchContainerLogs(ctx, clientSet, watcher.State().Namespace(), watcher.State().PodName(), container, 10, isDone, isLastHint) {
if v.Error != nil {
notifier.Error(v.Error)
fmt.Println("watch container logs", v.Error)
continue
}

Expand All @@ -185,19 +191,22 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 3", ctx.Err(), context.Cause(ctx))
return
}

// Wait until the Container is terminated
for ok := true; ok; _, ok = <-updatesCh {
// Determine if the container should be already stopped
if watcher.State().ContainerFinished(container) || watcher.State().Completed() || opts.DisableFollow {
fmt.Println("Container finished", container)
break
}
}

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 4", ctx.Err(), context.Cause(ctx))
return
}

Expand Down Expand Up @@ -240,6 +249,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Stop immediately after the operation is canceled
if ctx.Err() != nil {
fmt.Println("Context error 5", ctx.Err(), context.Cause(ctx))
return
}

Expand Down
Loading