Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add tracking apis #7

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
244 changes: 233 additions & 11 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"regexp"
"strconv"
"sync/atomic"
"time"
Expand All @@ -38,14 +39,19 @@ const (
)

const (
deploymentsEndpoint = "/configurations"
blobEndpointPath = "/blobs"
blobEndpoint = blobEndpointPath + "/{blobId}"
deploymentsEndpoint = "/configurations"
blobEndpointPath = "/blobs"
blobEndpoint = blobEndpointPath + "/{blobId}"
configStatusEndpoint = "/configurations/status"
heartbeatEndpoint = "/heartbeat/{uuid}"
registerEndpoint = "/register/{uuid}"
)

const (
API_ERR_BAD_BLOCK = iota + 1
API_ERR_INTERNAL
API_ERR_INVALID_PARAMETERS
API_ERR_FROM_TRACKER
)

const (
Expand Down Expand Up @@ -102,14 +108,18 @@ type apiManagerInterface interface {
}

type apiManager struct {
dbMan dbManagerInterface
deploymentsEndpoint string
blobEndpoint string
eTag int64
deploymentsChanged chan interface{}
addSubscriber chan chan deploymentsResult
removeSubscriber chan chan deploymentsResult
apiInitialized bool
dbMan dbManagerInterface
trackerCl trackerClientInterface
deploymentsEndpoint string
blobEndpoint string
configStatusEndpoint string
heartbeatEndpoint string
registerEndpoint string
eTag int64
deploymentsChanged chan interface{}
addSubscriber chan chan deploymentsResult
removeSubscriber chan chan deploymentsResult
apiInitialized bool
}

func (a *apiManager) InitAPI() {
Expand All @@ -118,6 +128,9 @@ func (a *apiManager) InitAPI() {
}
services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET")
services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET")
services.API().HandleFunc(a.configStatusEndpoint, a.apiPutConfigStatus).Methods("PUT")
services.API().HandleFunc(a.heartbeatEndpoint, a.apiPutHeartbeat).Methods("PUT")
services.API().HandleFunc(a.registerEndpoint, a.apiPutRegister).Methods("PUT")
a.apiInitialized = true
log.Debug("API endpoints initialized")
}
Expand Down Expand Up @@ -346,6 +359,137 @@ func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []DataDeplo
w.Write(b)
}

func (a *apiManager) apiPutRegister(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
uuid := vars["uuid"]

// parse & validate body
body := r.Body
defer body.Close()
bodyBytes, err := ioutil.ReadAll(body)
if err != nil {
log.Errorf("apiPutRegister error: %v", err)
a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, "Failed to read request body.")
return
}

reqBody := &registerBody{}
err = json.Unmarshal(bodyBytes, reqBody)
if err != nil {
log.Debugf("apiPutRegister error: %v", err)
a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Failed to read request body json: "+err.Error())
return
}

isValid, reason := reqBody.validateBody(uuid)
if !isValid {
a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, reason)
return
}

// connect to tracker
trackerResp := a.trackerCl.putRegister(uuid, reqBody)

// write response
switch trackerResp.code {
case http.StatusOK:
a.writePutRegisterResp(w, trackerResp)
default:
log.Debugf("apiPutRegister code: %v Reason: %v", trackerResp.code, string(trackerResp.body))
a.writeError(w, trackerResp.code, API_ERR_FROM_TRACKER, string(trackerResp.body))
}

}

func (a *apiManager) apiPutConfigStatus(w http.ResponseWriter, r *http.Request) {

// parse & validate body
body := r.Body
defer body.Close()
bodyBytes, err := ioutil.ReadAll(body)
if err != nil {
log.Errorf("apiPutConfigStatus error: %v", err)
a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, "Failed to read request body.")
return
}

reqBody := &configStatusBody{}
err = json.Unmarshal(bodyBytes, reqBody)
if err != nil {
log.Debugf("apiPutConfigStatus error: %v", err)
a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Failed to read request body json: "+err.Error())
return
}

isValid, reason := reqBody.validateBody()
if !isValid {
a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, reason)
return
}

// connect to tracker
trackerResp := a.trackerCl.putConfigStatus(reqBody)

// write response
switch trackerResp.code {
case http.StatusOK:
a.writeConfigStatusResp(w, trackerResp)
default:
log.Infof("apiPutConfigStatus code: %v Reason: %v", trackerResp.code, string(trackerResp.body))
a.writeError(w, trackerResp.code, API_ERR_FROM_TRACKER, string(trackerResp.body))
}
}

func (a *apiManager) apiPutHeartbeat(w http.ResponseWriter, r *http.Request) {
// parse & validate
vars := mux.Vars(r)
uuid := vars["uuid"]
if !isValidUuid(uuid) {
a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing gateway UUID")
return
}
reported := r.Header.Get("reportedTime")
if reported == "" || !isIso8601(reported) {
a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing reportedTime")
return
}

// connect to tracker
trackerResp := a.trackerCl.putHeartbeat(uuid, reported)

// write response
switch trackerResp.code {
case http.StatusOK:
a.writePutHeartbeatResp(w, trackerResp)
default:
log.Infof("apiPutHeartbeat code: %v Reason: %v", trackerResp.code, string(trackerResp.body))
a.writeError(w, trackerResp.code, API_ERR_FROM_TRACKER, string(trackerResp.body))
}
}

func (a *apiManager) writeConfigStatusResp(w http.ResponseWriter, tr *trackerResponse) {
a.writeSimpleResp(w, tr)
}

func (a *apiManager) writePutRegisterResp(w http.ResponseWriter, tr *trackerResponse) {
a.writeSimpleResp(w, tr)
}

func (a *apiManager) writePutHeartbeatResp(w http.ResponseWriter, tr *trackerResponse) {
a.writeSimpleResp(w, tr)
}

func (a *apiManager) writeSimpleResp(w http.ResponseWriter, tr *trackerResponse) {
if tr.contentType != "" {
w.Header().Add("Content-type", tr.contentType)
}
_, err := w.Write(tr.body)
if err != nil {
log.Errorf("failed to write response: %v", err)
a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err.Error())
}
}

// call whenever the list of deployments changes
func (a *apiManager) incrementETag() string {
e := atomic.AddInt64(&a.eTag, 1)
Expand Down Expand Up @@ -394,3 +538,81 @@ func getHttpHost() string {
proto = proto + "://" + config.GetString(configAPIListen)
return proto
}

func isValidUuid(uuid string) bool {
r := regexp.MustCompile("^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-4[a-fA-F0-9]{3}-[8|9|aA|bB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}$")
return r.MatchString(uuid)
}

func isIso8601(t string) bool {

if _, err := time.Parse(iso8601, t); err == nil {
return true
}
if _, err := time.Parse(time.RFC3339, t); err == nil {
return true
}

return false
}

type registerBody struct {
Uuid string `json:"uuid"`
Pod string `json:"pod"`
PodType string `json:"podType"`
ReportedTime string `json:"reportedTime"`
Name string `json:"name"`
Type string `json:"type"`
}

func (body *registerBody) validateBody(uuid string) (bool, string) {
switch {
case uuid != body.Uuid:
return false, "UUID in path mismatch UUID in body"
case !isValidUuid(body.Uuid):
return false, "Bad/Missing gateway UUID"
case body.ReportedTime == "" || !isIso8601(body.ReportedTime):
return false, "Bad/Missing gateway reportedTime"
}
return true, ""
}

type configStatusBody struct {
StatusDetails []statusDetailsJson `json:"statusDetails"`
ServiceId string `json:"serviceId"`
ReportedTime string `json:"reportedTime"`
}

func (body *configStatusBody) validateBody() (bool, string) {
switch {
case !isValidUuid(body.ServiceId):
return false, "Bad/Missing gateway ServiceId"
case body.ReportedTime == "" || !isIso8601(body.ReportedTime):
return false, "Bad/Missing gateway reportedTime"
}

for _, s := range body.StatusDetails {
isValid, reason := s.validateBody()
if !isValid {
return false, reason
}
}
return true, ""
}

type statusDetailsJson struct {
Status string `json:"status"`
ConfigurationId string `json:"configurationId"`
ErrorCode string `json:"errorCode"`
Message string `json:"message"`
}

func (s *statusDetailsJson) validateBody() (bool, string) {
switch {
case s.Status == "":
return false, "Bad/Missing configuration Status"
case s.ConfigurationId == "":
return false, "Bad/Missing configuration ConfigurationId"
}
return true, ""
}
Loading