diff --git a/ec2system/config.go b/ec2system/config.go index bc12c72..c325d1d 100644 --- a/ec2system/config.go +++ b/ec2system/config.go @@ -39,6 +39,7 @@ func init() { "", "the bootstrap bigmachine binary with which machines are launched") sshkeys := constr.String("sshkey", "", "comma-separated list of ssh keys to be installed") + constr.InstanceVar(&system.Eventer, "eventer", "eventer/cloudwatch", "the event logger used to log bigmachine events") constr.StringVar(&system.Username, "username", "", "user name for tagging purposes") var sess *session.Session constr.InstanceVar(&sess, "aws", "aws", "AWS configuration for all EC2 calls") diff --git a/ec2system/ec2machine.go b/ec2system/ec2machine.go index 05c84ee..afdc717 100644 --- a/ec2system/ec2machine.go +++ b/ec2system/ec2machine.go @@ -53,6 +53,7 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/grailbio/base/errors" + "github.com/grailbio/base/eventlog" "github.com/grailbio/base/fatbin" "github.com/grailbio/base/limitbuf" "github.com/grailbio/base/log" @@ -221,6 +222,9 @@ type System struct { // AdditionalEC2Tags will be applied to this system's instances. AdditionalEC2Tags []*ec2.Tag + // Eventer is used to log semi-structured events in service of analytics. + Eventer eventlog.Eventer + privateKey *rsa.PrivateKey config instances.Type @@ -524,7 +528,11 @@ func (s *System) Start(ctx context.Context, count int) ([]*bigmachine.Machine, e } instanceIds := make([]string, n) for i := range instanceIds { - instanceIds[i] = aws.StringValue(describe.SpotInstanceRequests[i].InstanceId) + r := describe.SpotInstanceRequests[i] + instanceIds[i] = aws.StringValue(r.InstanceId) + s.Event("bigmachine:ec2:spotInstanceRequestFulfill", + "requestID", r.SpotInstanceRequestId, + "instanceID", r.InstanceId) } return instanceIds, nil } @@ -624,6 +632,10 @@ func (s *System) Start(ctx context.Context, count int) ([]*bigmachine.Machine, e if useInstanceIDSuffix { machines[i].Addr += aws.StringValue(instance.InstanceId) + "/" } + s.Event("bigmachine:ec2:machineStart", + "instanceType", s.InstanceType, + "addr", machines[i].Addr, + "instanceID", instance.InstanceId) machines[i].Maxprocs = int(s.config.VCPU) } return machines, nil @@ -908,6 +920,13 @@ func (s *System) Main() error { return http.ListenAndServe(":3333", nil) } +func (s *System) Event(typ string, fieldPairs ...interface{}) { + if s.Eventer == nil { + return + } + s.Eventer.Event(typ, fieldPairs...) +} + // ListenAndServe serves the provided handler on a HTTP server // configured for secure communications between ec2system // instances. diff --git a/go.mod b/go.mod index d888e75..09f8f1f 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,13 @@ module github.com/grailbio/bigmachine go 1.12 require ( - github.com/aws/aws-sdk-go v1.25.13 - github.com/cespare/xxhash v1.1.0 // indirect + github.com/aws/aws-sdk-go v1.29.24 github.com/google/pprof v0.0.0-20190930153522-6ce02741cba3 - github.com/grailbio/base v0.0.6 + github.com/grailbio/base v0.0.7 github.com/grailbio/testutil v0.0.3 github.com/shirou/gopsutil v2.19.9+incompatible golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc - golang.org/x/net v0.0.0-20191007182048-72f939374954 + golang.org/x/net v0.0.0-20200202094626-16171245cfb2 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect diff --git a/go.sum b/go.sum index cbc8381..d8e0f91 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/aws/aws-sdk-go v1.25.10 h1:3epJfNmP6xWkOpLOdhIIj07+9UAJwvbzq8bBzyPigI github.com/aws/aws-sdk-go v1.25.10/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.25.13 h1:qc1PpYdVQXI4eH5Ou25LD3Mb68HAY+AUn7yG4cWlqj8= github.com/aws/aws-sdk-go v1.25.13/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-sdk-go v1.29.24 h1:KOnds/LwADMDBaALL4UB98ZR+TUR1A1mYmAYbdLixLA= +github.com/aws/aws-sdk-go v1.29.24/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg= github.com/biogo/store v0.0.0-20190426020002-884f370e325d/go.mod h1:Iev9Q3MErcn+w3UOJD/DkEzllvugfdx7bGcMOFhvr/4= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -43,6 +45,7 @@ github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dT github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= @@ -81,6 +84,8 @@ github.com/grailbio/base v0.0.1 h1:AUaPetRbOP7ytAVJAQjg74DgIrgwz29GxbyckQGrnVk= github.com/grailbio/base v0.0.1/go.mod h1:wVM2Cq2/HT0rt6WYGQhXJ3CCLkNnGjeAAOPHCZ2IsN0= github.com/grailbio/base v0.0.6 h1:Hr1OHe16+sQEf4CytYlTu2bR+Sq5VENKA/sZoDBGo+k= github.com/grailbio/base v0.0.6/go.mod h1:OFVz7zmqb1D+Jbew0B4DCIpl4ozzVFxf+JKQZBBIQzE= +github.com/grailbio/base v0.0.7 h1:f9qigc0yUuQRIMVr+EmWCLT9pTlVaLshrGTfrtQl3kE= +github.com/grailbio/base v0.0.7/go.mod h1:VL8MWdM8WxkFUs4ribgWoGYlfty6Xyrat+lNNWWVCfs= github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a h1:kAl1x1ErQgs55bcm/WdoKCPny/kIF7COmC+UGQ9GKcM= github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a/go.mod h1:2g5HI42KHw+BDBdjLP3zs+WvTHlDK3RoE8crjCl26y4= github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok= @@ -112,6 +117,7 @@ github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsO github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA= @@ -179,6 +185,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191007182048-72f939374954 h1:JGZucVF/L/TotR719NbujzadOZ2AgnYlqphQGHDCKaU= golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/local.go b/local.go index 89be64a..9cdc32f 100644 --- a/local.go +++ b/local.go @@ -14,6 +14,7 @@ import ( "net/http" "os" "os/exec" + "strings" "sync" "time" @@ -123,6 +124,17 @@ func (*localSystem) Main() error { panic("not reached") } +func (s *localSystem) Event(typ string, fieldPairs ...interface{}) { + fields := []string{fmt.Sprintf("eventType:%s", typ)} + for i := 0; i < len(fieldPairs); i++ { + name := fieldPairs[i].(string) + i++ + value := fieldPairs[i] + fields = append(fields, fmt.Sprintf("%s:%v", name, value)) + } + log.Debug.Print(strings.Join(fields, ", ")) +} + func (s *localSystem) ListenAndServe(addr string, handler http.Handler) error { if addr == "" { addr = os.Getenv("BIGMACHINE_ADDR") diff --git a/machine.go b/machine.go index 5d4a46b..f799cf9 100644 --- a/machine.go +++ b/machine.go @@ -146,6 +146,9 @@ type Machine struct { client *rpc.Client cancel func() + // event logs an event. See System.Event. + event func(typ string, fieldPairs ...interface{}) + mu sync.Mutex state int64 err error @@ -269,6 +272,10 @@ func (m *Machine) start(b *B) { if m.keepalivePeriod == 0 { m.keepalivePeriod, m.keepaliveTimeout, m.keepaliveRpcTimeout = b.System().KeepaliveConfig() } + m.event = func(_ string, _ ...interface{}) {} + if b != nil { + m.event = b.system.Event + } m.cancelers = make(map[canceler]struct{}) ctx := context.Background() ctx, m.cancel = context.WithCancel(ctx) @@ -288,6 +295,10 @@ func (m *Machine) setError(err error) { m.err = err m.mu.Unlock() m.setState(Stopped) + m.event("bigmachine:machineError", + "addr", m.Addr, + "error", err.Error(), + ) log.Error.Printf("%s: %v", m.Addr, err) } @@ -313,6 +324,7 @@ func (m *Machine) setState(s State) { c.Cancel() } m.cancelers = make(map[canceler]struct{}) + m.event("bigmachine:machineStop", "addr", m.Addr) } m.mu.Unlock() for _, c := range triggered { @@ -321,8 +333,13 @@ func (m *Machine) setState(s State) { } func (m *Machine) loop(ctx context.Context, system System) { + start := time.Now() m.setState(Starting) if m.owner { + m.event("bigmachine:machineAlive", + "addr", m.Addr, + "duration", time.Since(start).Nanoseconds()/1e6, + ) if system != nil { go func() { var err error @@ -387,11 +404,11 @@ func (m *Machine) loop(ctx context.Context, system System) { // (up or down) by maintaining a periodic ping. m.setState(Running) for { - start := time.Now() + callStart := time.Now() err := m.retryCall(ctx, m.keepaliveTimeout, m.keepaliveRpcTimeout, "Supervisor.Ping", 0, nil) if err != nil { m.errorf("ping failed after %s (timeout=%s, rpc timeout=%s): %v", - time.Since(start), m.keepaliveTimeout, m.keepaliveRpcTimeout, err) + time.Since(callStart), m.keepaliveTimeout, m.keepaliveRpcTimeout, err) return } time.Sleep(m.keepalivePeriod / 2) @@ -427,16 +444,20 @@ func (m *Machine) loop(ctx context.Context, system System) { const keepalive = 5 * time.Minute for { - start := time.Now() + callStart := time.Now() var reply keepaliveReply err := m.retryCall(ctx, m.keepaliveTimeout, m.keepaliveRpcTimeout, "Supervisor.Keepalive", keepalive, &reply) if err != nil { m.errorf("keepalive failed after %s (timeout=%s, rpc timeout=%s): %v", - time.Since(start), m.keepaliveTimeout, m.keepaliveRpcTimeout, err) + time.Since(callStart), m.keepaliveTimeout, m.keepaliveRpcTimeout, err) return } + m.event("bigmachine:machineAlive", + "addr", m.Addr, + "duration", time.Since(start).Nanoseconds()/1e6, + ) m.mu.Lock() - m.keepaliveReplyTimes[m.numKeepalive%len(m.keepaliveReplyTimes)] = time.Since(start) + m.keepaliveReplyTimes[m.numKeepalive%len(m.keepaliveReplyTimes)] = time.Since(callStart) m.numKeepalive++ m.nextKeepalive = time.Now().Add(reply.Next) m.mu.Unlock() diff --git a/system.go b/system.go index 8683e28..881db2d 100644 --- a/system.go +++ b/system.go @@ -33,6 +33,13 @@ type System interface { // take over the process; the bigmachine fails if main returns (and // if it does, it should always return with an error). Main() error + // Event logs an event of typ with (key, value) fields given in fieldPairs + // as k0, v0, k1, v1, ...kn, vn. For example: + // + // s.Event("bigmachine:machineStart", "addr", "https://m0") + // + // These semi-structured events are used for analytics. + Event(typ string, fieldPairs ...interface{}) // HTTPClient returns an HTTP client that can be used to communicate // from drivers to machines as well as between machines. HTTPClient() *http.Client diff --git a/testsystem/testsystem.go b/testsystem/testsystem.go index bb184e2..db987db 100644 --- a/testsystem/testsystem.go +++ b/testsystem/testsystem.go @@ -164,6 +164,10 @@ func (s *System) Main() error { panic("Main called on testsystem") } +// Event is a no-op for the test system, as we do not care about event logs in +// tests. +func (*System) Event(_ string, _ ...interface{}) {} + // HTTPClient returns an http.Client that can converse with // servers created by this test system. func (s *System) HTTPClient() *http.Client {