Skip to content

Commit

Permalink
added container tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vijeyash1 committed Jun 10, 2024
1 parent 893d38d commit de1d46f
Show file tree
Hide file tree
Showing 8 changed files with 569 additions and 205 deletions.
76 changes: 76 additions & 0 deletions agent/container/pkg/clients/mock_nats_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions agent/container/pkg/clients/nats_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"github.com/nats-io/nats.go"
)

type NATSClientInterface interface {
Close()
CreateStream() (nats.JetStreamContext, error)
Publish(event []byte, repo string) error
}

// constant variables to use with nats stream and
// nats publishing
const (
Expand Down
165 changes: 164 additions & 1 deletion agent/container/pkg/handler/api_handler.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
package handler

import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"strings"

"github.com/gin-gonic/gin"
"github.com/intelops/kubviz/agent/container/api"
"github.com/intelops/kubviz/agent/container/pkg/clients"
"github.com/intelops/kubviz/model"
"github.com/intelops/kubviz/pkg/opentelemetry"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)

type APIHandler struct {
conn *clients.NATSContext
conn clients.NATSClientInterface
}

const (
Expand Down Expand Up @@ -56,6 +64,7 @@ func (ah *APIHandler) BindRequest(r *gin.Engine) {
// This endpoint can be used by tools like Swagger UI to provide interactive documentation for the API.
func (ah *APIHandler) GetApiDocs(c *gin.Context) {
swagger, err := api.GetSwagger()
fmt.Println(swagger)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
Expand All @@ -73,3 +82,157 @@ func (ah *APIHandler) GetStatus(c *gin.Context) {
c.Header(contentType, appJSONContentType)
c.Status(http.StatusOK)
}

var ErrInvalidPayload = errors.New("invalid or malformed Azure Container Registry webhook payload")

// PostEventAzureContainer listens for Azure Container Registry image push events.
// When a new image is pushed, this endpoint receives the event payload, validates it,
// and then publishes it to a NATS messaging system. This allows client of the
// application to subscribe to these events and respond to changes in the container registry.
// If the payload is invalid or the publishing process fails, an error response is returned.
func (ah *APIHandler) PostEventAzureContainer(c *gin.Context) {

tracer := otel.Tracer("azure-container")
_, span := tracer.Start(c.Request.Context(), "PostEventAzureContainer")
span.SetAttributes(attribute.String("http.method", "POST"))
defer span.End()

defer func() {
_, _ = io.Copy(io.Discard, c.Request.Body)
_ = c.Request.Body.Close()
}()
payload, err := io.ReadAll(c.Request.Body)
if err != nil || len(payload) == 0 {
log.Printf("%v: %v", ErrReadingBody, err)
c.Status(http.StatusBadRequest)
return
}

var pushEvent model.AzureContainerPushEventPayload
err = json.Unmarshal(payload, &pushEvent)
if err != nil {
log.Printf("%v: %v", ErrInvalidPayload, err)
c.JSON(http.StatusBadRequest, gin.H{"error": "Bad Request"})
return
}

log.Printf("Received event from Azure Container Registry: %v", pushEvent)

err = ah.conn.Publish(payload, "Azure_Container_Registry")
if err != nil {
log.Printf("%v: %v", ErrPublishToNats, err)
c.Status(http.StatusInternalServerError)
return
}
c.Status(http.StatusOK)
}

// parse errors
var (
ErrReadingBody = errors.New("error reading the request body")
ErrPublishToNats = errors.New("error while publishing to nats")
)

func (ah *APIHandler) PostEventDockerHub(c *gin.Context) {

tracer := otel.Tracer("dockerhub-container")
_, span := tracer.Start(c.Request.Context(), "PostEventDockerHub")
span.SetAttributes(attribute.String("http.method", "POST"))
defer span.End()

payload, err := io.ReadAll(c.Request.Body)
if err != nil {
log.Printf("%v: %v", ErrReadingBody, err)
c.Status(http.StatusBadRequest)
return
}
if len(payload) == 0 || strings.TrimSpace(string(payload)) == "" {
log.Printf("%v: %v", ErrReadingBody, "empty body")
c.Status(http.StatusBadRequest)
return
}
log.Printf("Received event from docker artifactory: %v", string(payload))
err = ah.conn.Publish(payload, "Dockerhub_Registry")
if err != nil {
log.Printf("%v: %v", ErrPublishToNats, err)
c.AbortWithStatus(http.StatusInternalServerError) // Use AbortWithStatus
return
}
c.Status(http.StatusOK)
}

var ErrInvalidPayloads = errors.New("invalid or malformed jfrog Container Registry webhook payload")

func (ah *APIHandler) PostEventJfrogContainer(c *gin.Context) {

tracer := otel.Tracer("jfrog-container")
_, span := tracer.Start(c.Request.Context(), "PostEventJfrogContainer")
span.SetAttributes(attribute.String("http.method", "POST"))
defer span.End()

payload, err := io.ReadAll(c.Request.Body)
if err != nil {
log.Printf("%v: %v", ErrReadingBody, err)
c.Status(http.StatusBadRequest)
return
}
if len(payload) == 0 || strings.TrimSpace(string(payload)) == "" {
log.Printf("%v: %v", ErrReadingBody, "empty body")
c.Status(http.StatusBadRequest)
return
}

var pushEvent model.JfrogContainerPushEventPayload
err = json.Unmarshal(payload, &pushEvent)
if err != nil {
log.Printf("%v: %v", ErrInvalidPayloads, err)
c.JSON(http.StatusBadRequest, gin.H{"error": "Bad Request"})
return
}

log.Printf("Received event from jfrog Container Registry: %v", pushEvent)

err = ah.conn.Publish(payload, "Jfrog_Container_Registry")
if err != nil {
log.Printf("%v: %v", ErrPublishToNats, err)
c.AbortWithStatus(http.StatusInternalServerError) // Use AbortWithStatus
return
}
c.Status(http.StatusOK)
}

func (ah *APIHandler) PostEventQuayContainer(c *gin.Context) {

tracer := otel.Tracer("quay-container")
_, span := tracer.Start(c.Request.Context(), "PostEventQuayContainer")
span.SetAttributes(attribute.String("http.method", "POST"))
defer span.End()

payload, err := io.ReadAll(c.Request.Body)
if err != nil {
log.Printf("%v: %v", ErrReadingBody, err)
c.Status(http.StatusBadRequest)
return
}
if len(payload) == 0 || strings.TrimSpace(string(payload)) == "" {
log.Printf("%v: %v", ErrReadingBody, "empty body")
c.Status(http.StatusBadRequest)
return
}
var pushEvent model.QuayImagePushPayload
err = json.Unmarshal(payload, &pushEvent)
if err != nil {
log.Printf("%v: %v", "invalid or malformed Quay Container Registry webhook payload", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "Bad Request"})
return
}
log.Printf("Received event from Quay Container Registry: %v", pushEvent)

err = ah.conn.Publish(payload, "Quay_Container_Registry")
if err != nil {
log.Printf("%v: %v", ErrPublishToNats, err)
c.AbortWithStatus(http.StatusInternalServerError) // Use AbortWithStatus
return
}
c.Status(http.StatusOK)
}
Loading

0 comments on commit de1d46f

Please sign in to comment.