Skip to content

Commit

Permalink
Merge pull request #14 from safesoftware/get-job-by-id
Browse files Browse the repository at this point in the history
Get job by id
  • Loading branch information
garnold54 authored Jan 3, 2023
2 parents 49a0195 + ee8901c commit 65d7995
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 40 deletions.
8 changes: 4 additions & 4 deletions cmd/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func buildFmeServerRequest(endpoint string, method string, body io.Reader) (http
}

// since the JSON for published parameters has subtypes, we need to implement this ourselves
func (f *Job) UnmarshalJSON(b []byte) error {
type job Job
func (f *JobRequest) UnmarshalJSON(b []byte) error {
type job JobRequest
err := json.Unmarshal(b, (*job)(f))
if err != nil {
return err
Expand Down Expand Up @@ -80,9 +80,9 @@ func (f *Job) UnmarshalJSON(b []byte) error {
return nil
}

func (f *Job) MarshalJSON() ([]byte, error) {
func (f *JobRequest) MarshalJSON() ([]byte, error) {

type job Job
type job JobRequest
if f.PublishedParameters != nil {
for _, v := range f.PublishedParameters {
b, err := json.Marshal(v)
Expand Down
101 changes: 67 additions & 34 deletions cmd/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,44 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"github.com/spf13/cobra"
)

type JobStatus struct {
Request JobRequest `json:"request"`
TimeDelivered time.Time `json:"timeDelivered"`
Workspace string `json:"workspace"`
NumErrors int `json:"numErrors"`
NumLines int `json:"numLines"`
EngineHost string `json:"engineHost"`
TimeQueued time.Time `json:"timeQueued"`
CPUPct float64 `json:"cpuPct"`
Description string `json:"description"`
TimeStarted time.Time `json:"timeStarted"`
Repository string `json:"repository"`
UserName string `json:"userName"`
Result JobResult `json:"result"`
CPUTime int `json:"cpuTime"`
ID int `json:"id"`
TimeFinished time.Time `json:"timeFinished"`
EngineName string `json:"engineName"`
NumWarnings int `json:"numWarnings"`
TimeSubmitted time.Time `json:"timeSubmitted"`
ElapsedTime int `json:"elapsedTime"`
PeakMemUsage int `json:"peakMemUsage"`
Status string `json:"status"`
}

type Jobs struct {
Offset int `json:"offset"`
Limit int `json:"limit"`
TotalCount int `json:"totalCount"`
Items []struct {
Request Job `json:"request"`
TimeDelivered time.Time `json:"timeDelivered"`
Workspace string `json:"workspace"`
NumErrors int `json:"numErrors"`
NumLines int `json:"numLines"`
EngineHost string `json:"engineHost"`
TimeQueued time.Time `json:"timeQueued"`
CPUPct float64 `json:"cpuPct"`
Description string `json:"description"`
TimeStarted time.Time `json:"timeStarted"`
Repository string `json:"repository"`
UserName string `json:"userName"`
Result JobResult `json:"result"`
CPUTime int `json:"cpuTime"`
ID int `json:"id"`
TimeFinished time.Time `json:"timeFinished"`
EngineName string `json:"engineName"`
NumWarnings int `json:"numWarnings"`
TimeSubmitted time.Time `json:"timeSubmitted"`
ElapsedTime int `json:"elapsedTime"`
PeakMemUsage int `json:"peakMemUsage"`
Status string `json:"status"`
} `json:"items"`
Offset int `json:"offset"`
Limit int `json:"limit"`
TotalCount int `json:"totalCount"`
Items []JobStatus `json:"items"`
}

type jobsFlags struct {
Expand All @@ -56,6 +59,7 @@ type jobsFlags struct {
jobsWorkspace string
jobsSourceID string
jobsSourceType string
jobId int
}

func newJobsCmd() *cobra.Command {
Expand Down Expand Up @@ -103,9 +107,20 @@ func newJobsCmd() *cobra.Command {
cmd.Flags().StringVar(&f.jobsSourceID, "source-id", "", "If specified along with source type, only jobs from the specified type with the specified id will be returned. For Automations, the source id is the automation id. For WorkspaceSubscriber, the source id is the id of the subscription. For Scheduler, the source id is the category and name of the schedule separated by '/'. For example, 'Category/Name'.")
cmd.Flags().StringVar(&f.jobsSourceType, "source-type", "", "If specified, only jobs run by this source type will be returned.")
cmd.Flags().StringVarP(&f.outputType, "output", "o", "table", "Specify the output type. Should be one of table, json, or custom-columns")
cmd.Flags().IntVar(&f.jobId, "id", -1, "Specify the job id to display")
cmd.Flags().BoolVar(&f.noHeaders, "no-headers", false, "Don't print column headers")
cmd.MarkFlagsMutuallyExclusive("queued", "active")
cmd.MarkFlagsMutuallyExclusive("running", "active")
cmd.MarkFlagsMutuallyExclusive("id", "running")
cmd.MarkFlagsMutuallyExclusive("id", "completed")
cmd.MarkFlagsMutuallyExclusive("id", "queued")
cmd.MarkFlagsMutuallyExclusive("id", "all")
cmd.MarkFlagsMutuallyExclusive("id", "active")
cmd.MarkFlagsMutuallyExclusive("id", "repository")
cmd.MarkFlagsMutuallyExclusive("id", "workspace")
cmd.MarkFlagsMutuallyExclusive("id", "user-name")
cmd.MarkFlagsMutuallyExclusive("id", "source-id")
cmd.MarkFlagsMutuallyExclusive("id", "source-type")
return cmd

}
Expand All @@ -116,7 +131,7 @@ func jobsRun(f *jobsFlags) func(cmd *cobra.Command, args []string) error {
if jsonOutput {
f.outputType = "json"
}
if !f.jobsActive && !f.jobsCompleted && !f.jobsQueued && !f.jobsRunning && !f.jobsAll {
if !f.jobsActive && !f.jobsCompleted && !f.jobsQueued && !f.jobsRunning && !f.jobsAll && f.jobId == -1 {
// if no filter is passed in, show all jobs
f.jobsAll = true
}
Expand Down Expand Up @@ -149,6 +164,13 @@ func jobsRun(f *jobsFlags) func(cmd *cobra.Command, args []string) error {
}
}

if f.jobId != -1 {
err := getJobs("/fmerest/v3/transformations/jobs/id/"+strconv.Itoa(f.jobId), &allJobs, f)
if err != nil {
return err
}
}

if f.outputType == "table" {
// output all values returned by the JSON in a table
t := table.NewWriter()
Expand Down Expand Up @@ -257,13 +279,24 @@ func getJobs(endpoint string, allJobs *Jobs, f *jobsFlags) error {
return errors.New(response.Status)
}

var result Jobs
if err := json.Unmarshal(responseData, &result); err != nil {
return err
if f.jobId == -1 {
var result Jobs
if err := json.Unmarshal(responseData, &result); err != nil {
return err
} else {
// merge with existing jobs
allJobs.TotalCount += result.TotalCount
allJobs.Items = append(allJobs.Items, result.Items...)
}
} else {
// merge with existing jobs
allJobs.TotalCount += result.TotalCount
allJobs.Items = append(allJobs.Items, result.Items...)
var result JobStatus
if err := json.Unmarshal(responseData, &result); err != nil {
return err
} else {
allJobs.TotalCount += 1
allJobs.Items = append(allJobs.Items, result)
}
}

return nil
}
115 changes: 115 additions & 0 deletions cmd/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,101 @@ func TestJobs(t *testing.T) {
]
}`

responseV3SingleJob := `{
"request": {
"TMDirectives": {
"rtc": false,
"ttc": -1,
"tag": "Default",
"ttl": -1
},
"NMDirectives": {}
},
"timeDelivered": "2022-12-07T21:23:12Z",
"workspace": "none2none.fmw",
"numErrors": 0,
"numLines": 0,
"engineHost": "145929514b24",
"timeQueued": "2022-12-07T21:22:48Z",
"cpuPct": 0,
"description": "",
"timeStarted": "2022-12-07T21:22:48Z",
"repository": "test",
"userName": "admin",
"result": {
"timeRequested": "2022-12-07T21:22:48Z",
"requesterResultPort": -1,
"numFeaturesOutput": 0,
"requesterHost": "172.19.0.5",
"timeStarted": "2022-12-07T21:22:48Z",
"id": 1,
"timeFinished": "2022-12-07T21:23:12Z",
"priority": -1,
"statusMessage": "Job cancelled. ",
"status": "ABORTED"
},
"cpuTime": 0,
"id": 1,
"timeFinished": "2022-12-07T21:23:12Z",
"engineName": "145929514b24",
"numWarnings": 0,
"timeSubmitted": "2022-12-07T21:22:48Z",
"elapsedTime": 0,
"peakMemUsage": 0,
"status": "ABORTED"
}`

responseV3SingleJobOutput := `{
"offset": 0,
"limit": 0,
"totalCount": 1,
"items": [
{
"request": {
"TMDirectives": {
"rtc": false,
"ttc": -1,
"tag": "Default",
"ttl": -1
},
"NMDirectives": {}
},
"timeDelivered": "2022-12-07T21:23:12Z",
"workspace": "none2none.fmw",
"numErrors": 0,
"numLines": 0,
"engineHost": "145929514b24",
"timeQueued": "2022-12-07T21:22:48Z",
"cpuPct": 0,
"description": "",
"timeStarted": "2022-12-07T21:22:48Z",
"repository": "test",
"userName": "admin",
"result": {
"timeRequested": "2022-12-07T21:22:48Z",
"requesterResultPort": -1,
"numFeaturesOutput": 0,
"requesterHost": "172.19.0.5",
"timeStarted": "2022-12-07T21:22:48Z",
"id": 1,
"timeFinished": "2022-12-07T21:23:12Z",
"priority": -1,
"statusMessage": "Job cancelled. ",
"status": "ABORTED"
},
"cpuTime": 0,
"id": 1,
"timeFinished": "2022-12-07T21:23:12Z",
"engineName": "145929514b24",
"numWarnings": 0,
"timeSubmitted": "2022-12-07T21:22:48Z",
"elapsedTime": 0,
"peakMemUsage": 0,
"status": "ABORTED"
}
]
}`

customHttpServerHandler := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if strings.Contains(r.URL.Path, "active") {
Expand Down Expand Up @@ -508,6 +603,26 @@ func TestJobs(t *testing.T) {
args: []string{"jobs", "--completed", "--output", "custom-columns=CPU:.cpuTime,FEATURES OUTPUT:.result.numFeaturesOutput"},
wantOutputRegex: "^[\\s]*CPU[\\s]*FEATURES OUTPUT[\\s]*994[\\s]*49[\\s]*697[\\s]*0[\\s]*$",
},
{
name: "get single job",
statusCode: http.StatusOK,
args: []string{"jobs", "--id", "1"},
body: responseV3SingleJob,
wantOutputRegex: "^[\\s]*JOB ID[\\s]*ENGINE NAME[\\s]*WORKSPACE[\\s]*STATUS[\\s]*1[\\s]*145929514b24[\\s]*none2none.fmw[\\s]*ABORTED[\\s]*$",
},
{
name: "get single job json",
statusCode: http.StatusOK,
args: []string{"jobs", "--id", "1", "--json"},
body: responseV3SingleJob,
wantOutputJson: responseV3SingleJobOutput,
},
{
name: "get single job does not exist",
statusCode: http.StatusNotFound,
args: []string{"jobs", "--id", "243"},
wantErrText: "404 Not Found",
},
}

runTests(cases, t)
Expand Down
4 changes: 2 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type JobId struct {
Id int `json:"id"`
}

type Job struct {
type JobRequest struct {
PublishedParameters []interface{} `json:"-"`
RawPublishedParameters []json.RawMessage `json:"publishedParameters,omitempty"`
TMDirectives struct {
Expand Down Expand Up @@ -158,7 +158,7 @@ func runRun(f *runFlags) func(cmd *cobra.Command, args []string) error {
var responseData []byte

if f.runSourceData == "" {
job := &Job{}
job := &JobRequest{}

// get published parameters
for _, parameter := range f.runPublishedParameter {
Expand Down

0 comments on commit 65d7995

Please sign in to comment.