Skip to content

Commit 0e488b5

Browse files
committed
Completed cmd service implementation - not tested
1 parent a331069 commit 0e488b5

File tree

9 files changed

+253
-49
lines changed

9 files changed

+253
-49
lines changed

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
11
### Clarum executor agent
22

3-
WIP
3+
### Lifecycle
4+
At this point we always start the agent, initiate mocks, run test actions and after all that we shut it down.
5+
Maybe in the future we will keep it alive with the mocks running to improve performance.
6+
7+
Keeping it alive would require:
8+
- to manage the agent lifecycle
9+
- add config flag to keep the agent alive
10+
- introduce concept of TestRun
11+
- split config in agent startup configuration & TestRun configuration

api/cmd/clarum-cmd.proto

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,22 @@ syntax = "proto3";
33
option go_package = "github.com/go-clarum/agent/api/cmd";
44

55
service CmdService {
6-
rpc CreateCommandEndpoint(CommandEndpoint) returns (CommandCreateResponse) {}
6+
rpc InitEndpoint(InitCmdEndpoint) returns (InitCmdEndpointResponse) {}
7+
rpc ShutdownEndpoint(ShutdownCmdEndpoint) returns (ShutdownCmdEndpointResponse) {}
78
}
89

9-
message CommandEndpoint {
10+
message InitCmdEndpoint {
1011
string name = 1;
12+
int32 warmup_seconds = 2;
13+
repeated string cmd_components = 3;
1114
}
12-
message CommandCreateResponse {
13-
bool success = 1;
15+
message InitCmdEndpointResponse {
16+
string error = 1;
17+
}
18+
19+
message ShutdownCmdEndpoint {
20+
string name = 1;
21+
}
22+
message ShutdownCmdEndpointResponse {
23+
string error = 1;
1424
}

config/config.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@ package config
22

33
import (
44
"github.com/go-clarum/agent/files"
5-
"github.com/go-clarum/agent/logging"
65
"gopkg.in/yaml.v3"
7-
"log/slog"
6+
"log"
87
"os"
98
"path"
109
"time"
@@ -25,12 +24,14 @@ type config struct {
2524
}
2625
}
2726

28-
func LoadConfig() {
27+
func init() {
28+
initLogger := log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds)
29+
2930
configFilePath := path.Join(*baseDir, *configFile)
3031
conf, err := files.ReadYamlFileToStruct[config](configFilePath)
3132
if err != nil {
3233
dir, _ := os.Getwd()
33-
logging.Infof("No config file found in [%s] - default values will be used instead", dir)
34+
initLogger.Printf("No config file found in [%s] - default values will be used instead", dir)
3435
conf = &config{}
3536
}
3637

@@ -39,7 +40,7 @@ func LoadConfig() {
3940
c = conf
4041

4142
configYaml, _ := yaml.Marshal(conf)
42-
logging.Infof("Using the following config:\n[\n%s]", configYaml)
43+
initLogger.Printf("Using the following config:\n[\n%s]", configYaml)
4344
}
4445

4546
func Version() string {
@@ -50,8 +51,8 @@ func BaseDir() string {
5051
return *baseDir
5152
}
5253

53-
func LoggingLevel() slog.Level {
54-
return logging.ParseLevel(c.Logging.Level)
54+
func LoggingLevel() string {
55+
return c.Logging.Level
5556
}
5657

5758
func ActionTimeout() time.Duration {

files/files.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,26 @@ package files
22

33
import (
44
"errors"
5-
"github.com/go-clarum/agent/logging"
5+
"fmt"
66
"github.com/go-clarum/agent/validators/strings"
77
"gopkg.in/yaml.v3"
88
"os"
99
)
1010

1111
func ReadYamlFileToStruct[S any](filePath string) (*S, error) {
1212
if strings.IsBlank(filePath) {
13-
logging.Error("Unable to read file. File path is empty")
14-
return nil, errors.New("file path is empty")
13+
return nil, errors.New("unable to read file - file path is empty")
1514
}
1615

1716
buf, err := os.ReadFile(filePath)
1817
if err != nil {
19-
logging.Errorf("Failed to %s", err.Error())
20-
return nil, err
18+
return nil, errors.New(fmt.Sprintf("unable to read file - %s", err))
2119
}
2220

2321
out := new(S)
2422

2523
if err := yaml.Unmarshal(buf, out); err != nil {
26-
logging.Errorf("Failed to unmarshal yaml file %s: %s", filePath, err.Error())
27-
return nil, err
24+
return nil, errors.New(fmt.Sprintf("failed to unmarshal yaml file %s: %s", filePath, err))
2825
}
2926

3027
return out, err

logging/logging.go

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,74 +2,81 @@ package logging
22

33
import (
44
"fmt"
5+
"github.com/go-clarum/agent/config"
56
"log"
67
"log/slog"
78
"os"
89
"strings"
910
)
1011

11-
var internalLogger = log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds)
12-
var defaultLogger = NewLogger(slog.LevelInfo, "")
12+
var activeLogLevel slog.Level
13+
var internalLogger *log.Logger
14+
var defaultLogger *Logger
15+
16+
func init() {
17+
activeLogLevel = parseLevel(config.LoggingLevel())
18+
internalLogger = log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds)
19+
defaultLogger = NewLogger("")
20+
}
1321

1422
type Logger struct {
15-
level slog.Level
16-
prefix string
23+
name string
1724
}
1825

19-
func NewLogger(level slog.Level, prefix string) *Logger {
20-
return &Logger{level, prefix}
26+
func NewLogger(name string) *Logger {
27+
return &Logger{name}
2128
}
2229

2330
func (logger *Logger) Info(message string) {
24-
if logger.level <= slog.LevelInfo {
25-
internalLogger.Println("INFO " + logger.prefix + message)
31+
if activeLogLevel <= slog.LevelInfo {
32+
internalLogger.Println("INFO " + logger.name + " " + message)
2633
}
2734
}
2835

2936
func (logger *Logger) Infof(format string, a ...any) {
30-
if logger.level <= slog.LevelInfo {
31-
internalLogger.Println("INFO " + logger.prefix + fmt.Sprintf(format, a...))
37+
if activeLogLevel <= slog.LevelInfo {
38+
internalLogger.Println("INFO " + logger.name + " " + fmt.Sprintf(format, a...))
3239
}
3340
}
3441

3542
func (logger *Logger) Debug(message string) {
36-
if logger.level <= slog.LevelDebug {
37-
internalLogger.Println("DEBUG " + logger.prefix + message)
43+
if activeLogLevel <= slog.LevelDebug {
44+
internalLogger.Println("DEBUG " + logger.name + " " + message)
3845
}
3946
}
4047

4148
func (logger *Logger) Debugf(format string, a ...any) {
42-
if logger.level <= slog.LevelDebug {
43-
internalLogger.Println("DEBUG " + logger.prefix + fmt.Sprintf(format, a...))
49+
if activeLogLevel <= slog.LevelDebug {
50+
internalLogger.Println("DEBUG " + logger.name + " " + fmt.Sprintf(format, a...))
4451
}
4552
}
4653

4754
func (logger *Logger) Warn(message string) {
48-
if logger.level <= slog.LevelWarn {
49-
internalLogger.Println("WARN " + logger.prefix + message)
55+
if activeLogLevel <= slog.LevelWarn {
56+
internalLogger.Println("WARN " + logger.name + " " + message)
5057
}
5158
}
5259

5360
func (logger *Logger) Warnf(format string, a ...any) {
54-
if logger.level <= slog.LevelWarn {
55-
internalLogger.Println("WARN " + logger.prefix + fmt.Sprintf(format, a...))
61+
if activeLogLevel <= slog.LevelWarn {
62+
internalLogger.Println("WARN " + logger.name + " " + fmt.Sprintf(format, a...))
5663
}
5764
}
5865

5966
func (logger *Logger) Error(message string) {
60-
if logger.level <= slog.LevelError {
61-
internalLogger.Println("ERROR " + logger.prefix + message)
67+
if activeLogLevel <= slog.LevelError {
68+
internalLogger.Println("ERROR " + logger.name + " " + message)
6269
}
6370
}
6471

6572
func (logger *Logger) Errorf(format string, a ...any) {
66-
if logger.level <= slog.LevelError {
67-
internalLogger.Println("ERROR " + logger.prefix + fmt.Sprintf(format, a...))
73+
if activeLogLevel <= slog.LevelError {
74+
internalLogger.Println("ERROR " + logger.name + " " + fmt.Sprintf(format, a...))
6875
}
6976
}
7077

71-
func (logger *Logger) Prefix() string {
72-
return logger.prefix
78+
func (logger *Logger) Name() string {
79+
return logger.name
7380
}
7481

7582
// calls on the default logger
@@ -106,7 +113,7 @@ func Errorf(format string, a ...any) {
106113
defaultLogger.Errorf(format, a...)
107114
}
108115

109-
func ParseLevel(level string) slog.Level {
116+
func parseLevel(level string) slog.Level {
110117
lcLevel := strings.ToLower(level)
111118
var result slog.Level
112119

main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ func main() {
1616
logging.Infof("Starting clarum agent v%s", config.Version())
1717
control.ShutdownHook.Add(1)
1818

19-
config.LoadConfig()
2019
initAndRunGrpcServer()
2120

2221
control.ShutdownHook.Wait()

services/cmd/cmd.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,34 @@ package cmd
22

33
import (
44
"context"
5+
"fmt"
56
. "github.com/go-clarum/agent/api/cmd"
67
"github.com/go-clarum/agent/logging"
8+
"github.com/go-clarum/agent/services/cmd/internal/service"
79
"google.golang.org/grpc"
810
)
911

10-
type service struct {
12+
type cmdService struct {
1113
UnimplementedCmdServiceServer
1214
}
1315

1416
func RegisterCmdService(server *grpc.Server) {
1517
logging.Infof("Registering CommandService")
16-
RegisterCmdServiceServer(server, &service{})
18+
RegisterCmdServiceServer(server, &cmdService{})
1719
}
1820

19-
func (s *service) CreateCommandEndpoint(ctx context.Context, ce *CommandEndpoint) (*CommandCreateResponse, error) {
20-
return &CommandCreateResponse{}, nil
21+
func (s *cmdService) InitEndpoint(ctx context.Context, req *InitCmdEndpoint) (*InitCmdEndpointResponse, error) {
22+
err := service.InitializeEndpoint(req.Name, req.CmdComponents, req.WarmupSeconds)
23+
24+
return &InitCmdEndpointResponse{
25+
Error: fmt.Sprintf("%s", err),
26+
}, nil
27+
}
28+
29+
func (s *cmdService) ShutdownEndpoint(ctx context.Context, req *ShutdownCmdEndpoint) (*ShutdownCmdEndpointResponse, error) {
30+
err := service.ShutdownEndpoint(req.Name)
31+
32+
return &ShutdownCmdEndpointResponse{
33+
Error: fmt.Sprintf("%s", err),
34+
}, nil
2135
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/go-clarum/agent/control"
8+
"github.com/go-clarum/agent/durations"
9+
"github.com/go-clarum/agent/logging"
10+
clarumstrings "github.com/go-clarum/agent/validators/strings"
11+
"os/exec"
12+
"time"
13+
)
14+
15+
type endpoint struct {
16+
name string
17+
cmdComponents []string
18+
warmup time.Duration
19+
cmd *exec.Cmd
20+
cmdCancel context.CancelFunc
21+
logger *logging.Logger
22+
}
23+
24+
func newCommandEndpoint(name string, cmdComponents []string, warmupSeconds int32) (*endpoint, error) {
25+
if clarumstrings.IsBlank(name) {
26+
return nil, errors.New("cannot create command endpoint - name is empty")
27+
}
28+
29+
if len(cmdComponents) == 0 || clarumstrings.IsBlank(cmdComponents[0]) {
30+
return nil, errors.New(fmt.Sprintf("cannot create command endpoint [%s] - cmd is empty", name))
31+
}
32+
33+
warmupDuration := time.Duration(warmupSeconds)
34+
35+
return &endpoint{
36+
name: name,
37+
cmdComponents: cmdComponents,
38+
warmup: durations.GetDurationWithDefault(warmupDuration, 1*time.Second),
39+
logger: logging.NewLogger(loggerName(name)),
40+
}, nil
41+
}
42+
43+
// Start the process from the given command & arguments.
44+
// The process will be started into a cancelable context so that we can
45+
// cancel it later in the post-integration test phase.
46+
func (endpoint *endpoint) start() error {
47+
endpoint.logger.Infof("running cmd [%s]", endpoint.cmdComponents)
48+
ctx, cancel := context.WithCancel(context.Background())
49+
50+
endpoint.cmd = exec.CommandContext(ctx, endpoint.cmdComponents[0], endpoint.cmdComponents[1:]...)
51+
endpoint.cmdCancel = cancel
52+
53+
endpoint.logger.Debug("starting command")
54+
if err := endpoint.cmd.Start(); err != nil {
55+
return err
56+
} else {
57+
endpoint.logger.Debug("cmd start successful")
58+
}
59+
60+
time.Sleep(endpoint.warmup)
61+
endpoint.logger.Debug("warmup ended")
62+
63+
return nil
64+
}
65+
66+
// shutdown the running process. Since the process was created with a context, we will attempt to
67+
// call ctx.Cancel(). If it returns an error, the process will be killed just in case.
68+
// We also wait for the action here, so that the post-integration test phase ends successfully.
69+
// TODO: check this code again, some parts are redundant
70+
func (endpoint *endpoint) shutdown() error {
71+
control.RunningActions.Add(1)
72+
defer control.RunningActions.Done()
73+
74+
endpoint.logger.Infof("stopping cmd [%s]", endpoint.cmdComponents)
75+
76+
if endpoint.cmdCancel != nil {
77+
endpoint.logger.Debug("cancelling cmd")
78+
endpoint.cmdCancel()
79+
80+
if _, err := endpoint.cmd.Process.Wait(); err != nil {
81+
endpoint.logger.Errorf("cmd.Wait() returned error - [%s]", err)
82+
endpoint.killProcess()
83+
return err
84+
} else {
85+
endpoint.logger.Debug("context cancel finished successfully")
86+
}
87+
} else {
88+
if err := endpoint.cmd.Process.Release(); err != nil {
89+
endpoint.logger.Errorf("cmd.Release() returned error - [%s]", err)
90+
endpoint.killProcess()
91+
return err
92+
} else {
93+
endpoint.logger.Debug("cmd kill successful")
94+
}
95+
}
96+
97+
return nil
98+
}
99+
100+
func (endpoint *endpoint) killProcess() {
101+
endpoint.logger.Info("killing process")
102+
103+
if err := endpoint.cmd.Process.Kill(); err != nil {
104+
endpoint.logger.Errorf("cmd.Kill() returned error - [%s]", err)
105+
return
106+
}
107+
}
108+
109+
func loggerName(cmdName string) string {
110+
return fmt.Sprintf("Command %s", cmdName)
111+
}

0 commit comments

Comments
 (0)