From bb1c07c3f35c7dda471bfb57de293cf84a2254d1 Mon Sep 17 00:00:00 2001 From: Jaran Charumilind Date: Tue, 7 Apr 2020 09:55:28 -0700 Subject: [PATCH] Add eventlog logging (#33) Add logging of semi-structured events. These events can be used for (ad hoc) analytics, e.g. count how many machines were lost, compare number of spot instances, etc. Some of this information could be derived from a cloud provider, however this mechanism has some nice properties: it provides the perspective of bigmachine (e.g. a machine could still be running in EC2, but not available to bigmachine); it makes it easy to unify with other events (e.g. we can log keepalive RPC call behavior and compare it to RPC call behavior from other layers); it's lightweight to add new events. For this initial addition, we log a few events (with immediate utility to GRAIL): * `bigmachine:machineError`: we have an error reaching a machine and consider it stopped. * `bigmachine:machineStop`: a machine is stopped for any reason. * `bigmachine:machineAlive`: whenever we get a successful keepalive response. * `bigmachine:ec2:machineStart`: an EC2 machine/instance was started * `bigmachine:ec2:spotInstanceRequestFulfill`: a spot instance request was fulfilled. --- ec2system/config.go | 1 + ec2system/ec2machine.go | 21 ++++++++++++++++++++- go.mod | 7 +++---- go.sum | 8 ++++++++ local.go | 12 ++++++++++++ machine.go | 31 ++++++++++++++++++++++++++----- system.go | 7 +++++++ testsystem/testsystem.go | 4 ++++ 8 files changed, 81 insertions(+), 10 deletions(-) diff --git a/ec2system/config.go b/ec2system/config.go index bc12c72..c325d1d 100644 --- a/ec2system/config.go +++ b/ec2system/config.go @@ -39,6 +39,7 @@ func init() { "", "the bootstrap bigmachine binary with which machines are launched") sshkeys := constr.String("sshkey", "", "comma-separated list of ssh keys to be installed") + constr.InstanceVar(&system.Eventer, "eventer", "eventer/cloudwatch", "the event logger used to log bigmachine events") constr.StringVar(&system.Username, "username", "", "user name for tagging purposes") var sess *session.Session constr.InstanceVar(&sess, "aws", "aws", "AWS configuration for all EC2 calls") diff --git a/ec2system/ec2machine.go b/ec2system/ec2machine.go index 05c84ee..afdc717 100644 --- a/ec2system/ec2machine.go +++ b/ec2system/ec2machine.go @@ -53,6 +53,7 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/grailbio/base/errors" + "github.com/grailbio/base/eventlog" "github.com/grailbio/base/fatbin" "github.com/grailbio/base/limitbuf" "github.com/grailbio/base/log" @@ -221,6 +222,9 @@ type System struct { // AdditionalEC2Tags will be applied to this system's instances. AdditionalEC2Tags []*ec2.Tag + // Eventer is used to log semi-structured events in service of analytics. + Eventer eventlog.Eventer + privateKey *rsa.PrivateKey config instances.Type @@ -524,7 +528,11 @@ func (s *System) Start(ctx context.Context, count int) ([]*bigmachine.Machine, e } instanceIds := make([]string, n) for i := range instanceIds { - instanceIds[i] = aws.StringValue(describe.SpotInstanceRequests[i].InstanceId) + r := describe.SpotInstanceRequests[i] + instanceIds[i] = aws.StringValue(r.InstanceId) + s.Event("bigmachine:ec2:spotInstanceRequestFulfill", + "requestID", r.SpotInstanceRequestId, + "instanceID", r.InstanceId) } return instanceIds, nil } @@ -624,6 +632,10 @@ func (s *System) Start(ctx context.Context, count int) ([]*bigmachine.Machine, e if useInstanceIDSuffix { machines[i].Addr += aws.StringValue(instance.InstanceId) + "/" } + s.Event("bigmachine:ec2:machineStart", + "instanceType", s.InstanceType, + "addr", machines[i].Addr, + "instanceID", instance.InstanceId) machines[i].Maxprocs = int(s.config.VCPU) } return machines, nil @@ -908,6 +920,13 @@ func (s *System) Main() error { return http.ListenAndServe(":3333", nil) } +func (s *System) Event(typ string, fieldPairs ...interface{}) { + if s.Eventer == nil { + return + } + s.Eventer.Event(typ, fieldPairs...) +} + // ListenAndServe serves the provided handler on a HTTP server // configured for secure communications between ec2system // instances. diff --git a/go.mod b/go.mod index d888e75..09f8f1f 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,13 @@ module github.com/grailbio/bigmachine go 1.12 require ( - github.com/aws/aws-sdk-go v1.25.13 - github.com/cespare/xxhash v1.1.0 // indirect + github.com/aws/aws-sdk-go v1.29.24 github.com/google/pprof v0.0.0-20190930153522-6ce02741cba3 - github.com/grailbio/base v0.0.6 + github.com/grailbio/base v0.0.7 github.com/grailbio/testutil v0.0.3 github.com/shirou/gopsutil v2.19.9+incompatible golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc - golang.org/x/net v0.0.0-20191007182048-72f939374954 + golang.org/x/net v0.0.0-20200202094626-16171245cfb2 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect diff --git a/go.sum b/go.sum index cbc8381..d8e0f91 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/aws/aws-sdk-go v1.25.10 h1:3epJfNmP6xWkOpLOdhIIj07+9UAJwvbzq8bBzyPigI github.com/aws/aws-sdk-go v1.25.10/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.25.13 h1:qc1PpYdVQXI4eH5Ou25LD3Mb68HAY+AUn7yG4cWlqj8= github.com/aws/aws-sdk-go v1.25.13/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-sdk-go v1.29.24 h1:KOnds/LwADMDBaALL4UB98ZR+TUR1A1mYmAYbdLixLA= +github.com/aws/aws-sdk-go v1.29.24/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg= github.com/biogo/store v0.0.0-20190426020002-884f370e325d/go.mod h1:Iev9Q3MErcn+w3UOJD/DkEzllvugfdx7bGcMOFhvr/4= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -43,6 +45,7 @@ github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dT github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= @@ -81,6 +84,8 @@ github.com/grailbio/base v0.0.1 h1:AUaPetRbOP7ytAVJAQjg74DgIrgwz29GxbyckQGrnVk= github.com/grailbio/base v0.0.1/go.mod h1:wVM2Cq2/HT0rt6WYGQhXJ3CCLkNnGjeAAOPHCZ2IsN0= github.com/grailbio/base v0.0.6 h1:Hr1OHe16+sQEf4CytYlTu2bR+Sq5VENKA/sZoDBGo+k= github.com/grailbio/base v0.0.6/go.mod h1:OFVz7zmqb1D+Jbew0B4DCIpl4ozzVFxf+JKQZBBIQzE= +github.com/grailbio/base v0.0.7 h1:f9qigc0yUuQRIMVr+EmWCLT9pTlVaLshrGTfrtQl3kE= +github.com/grailbio/base v0.0.7/go.mod h1:VL8MWdM8WxkFUs4ribgWoGYlfty6Xyrat+lNNWWVCfs= github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a h1:kAl1x1ErQgs55bcm/WdoKCPny/kIF7COmC+UGQ9GKcM= github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a/go.mod h1:2g5HI42KHw+BDBdjLP3zs+WvTHlDK3RoE8crjCl26y4= github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok= @@ -112,6 +117,7 @@ github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsO github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA= @@ -179,6 +185,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191007182048-72f939374954 h1:JGZucVF/L/TotR719NbujzadOZ2AgnYlqphQGHDCKaU= golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/local.go b/local.go index 89be64a..9cdc32f 100644 --- a/local.go +++ b/local.go @@ -14,6 +14,7 @@ import ( "net/http" "os" "os/exec" + "strings" "sync" "time" @@ -123,6 +124,17 @@ func (*localSystem) Main() error { panic("not reached") } +func (s *localSystem) Event(typ string, fieldPairs ...interface{}) { + fields := []string{fmt.Sprintf("eventType:%s", typ)} + for i := 0; i < len(fieldPairs); i++ { + name := fieldPairs[i].(string) + i++ + value := fieldPairs[i] + fields = append(fields, fmt.Sprintf("%s:%v", name, value)) + } + log.Debug.Print(strings.Join(fields, ", ")) +} + func (s *localSystem) ListenAndServe(addr string, handler http.Handler) error { if addr == "" { addr = os.Getenv("BIGMACHINE_ADDR") diff --git a/machine.go b/machine.go index 5d4a46b..f799cf9 100644 --- a/machine.go +++ b/machine.go @@ -146,6 +146,9 @@ type Machine struct { client *rpc.Client cancel func() + // event logs an event. See System.Event. + event func(typ string, fieldPairs ...interface{}) + mu sync.Mutex state int64 err error @@ -269,6 +272,10 @@ func (m *Machine) start(b *B) { if m.keepalivePeriod == 0 { m.keepalivePeriod, m.keepaliveTimeout, m.keepaliveRpcTimeout = b.System().KeepaliveConfig() } + m.event = func(_ string, _ ...interface{}) {} + if b != nil { + m.event = b.system.Event + } m.cancelers = make(map[canceler]struct{}) ctx := context.Background() ctx, m.cancel = context.WithCancel(ctx) @@ -288,6 +295,10 @@ func (m *Machine) setError(err error) { m.err = err m.mu.Unlock() m.setState(Stopped) + m.event("bigmachine:machineError", + "addr", m.Addr, + "error", err.Error(), + ) log.Error.Printf("%s: %v", m.Addr, err) } @@ -313,6 +324,7 @@ func (m *Machine) setState(s State) { c.Cancel() } m.cancelers = make(map[canceler]struct{}) + m.event("bigmachine:machineStop", "addr", m.Addr) } m.mu.Unlock() for _, c := range triggered { @@ -321,8 +333,13 @@ func (m *Machine) setState(s State) { } func (m *Machine) loop(ctx context.Context, system System) { + start := time.Now() m.setState(Starting) if m.owner { + m.event("bigmachine:machineAlive", + "addr", m.Addr, + "duration", time.Since(start).Nanoseconds()/1e6, + ) if system != nil { go func() { var err error @@ -387,11 +404,11 @@ func (m *Machine) loop(ctx context.Context, system System) { // (up or down) by maintaining a periodic ping. m.setState(Running) for { - start := time.Now() + callStart := time.Now() err := m.retryCall(ctx, m.keepaliveTimeout, m.keepaliveRpcTimeout, "Supervisor.Ping", 0, nil) if err != nil { m.errorf("ping failed after %s (timeout=%s, rpc timeout=%s): %v", - time.Since(start), m.keepaliveTimeout, m.keepaliveRpcTimeout, err) + time.Since(callStart), m.keepaliveTimeout, m.keepaliveRpcTimeout, err) return } time.Sleep(m.keepalivePeriod / 2) @@ -427,16 +444,20 @@ func (m *Machine) loop(ctx context.Context, system System) { const keepalive = 5 * time.Minute for { - start := time.Now() + callStart := time.Now() var reply keepaliveReply err := m.retryCall(ctx, m.keepaliveTimeout, m.keepaliveRpcTimeout, "Supervisor.Keepalive", keepalive, &reply) if err != nil { m.errorf("keepalive failed after %s (timeout=%s, rpc timeout=%s): %v", - time.Since(start), m.keepaliveTimeout, m.keepaliveRpcTimeout, err) + time.Since(callStart), m.keepaliveTimeout, m.keepaliveRpcTimeout, err) return } + m.event("bigmachine:machineAlive", + "addr", m.Addr, + "duration", time.Since(start).Nanoseconds()/1e6, + ) m.mu.Lock() - m.keepaliveReplyTimes[m.numKeepalive%len(m.keepaliveReplyTimes)] = time.Since(start) + m.keepaliveReplyTimes[m.numKeepalive%len(m.keepaliveReplyTimes)] = time.Since(callStart) m.numKeepalive++ m.nextKeepalive = time.Now().Add(reply.Next) m.mu.Unlock() diff --git a/system.go b/system.go index 8683e28..881db2d 100644 --- a/system.go +++ b/system.go @@ -33,6 +33,13 @@ type System interface { // take over the process; the bigmachine fails if main returns (and // if it does, it should always return with an error). Main() error + // Event logs an event of typ with (key, value) fields given in fieldPairs + // as k0, v0, k1, v1, ...kn, vn. For example: + // + // s.Event("bigmachine:machineStart", "addr", "https://m0") + // + // These semi-structured events are used for analytics. + Event(typ string, fieldPairs ...interface{}) // HTTPClient returns an HTTP client that can be used to communicate // from drivers to machines as well as between machines. HTTPClient() *http.Client diff --git a/testsystem/testsystem.go b/testsystem/testsystem.go index bb184e2..db987db 100644 --- a/testsystem/testsystem.go +++ b/testsystem/testsystem.go @@ -164,6 +164,10 @@ func (s *System) Main() error { panic("Main called on testsystem") } +// Event is a no-op for the test system, as we do not care about event logs in +// tests. +func (*System) Event(_ string, _ ...interface{}) {} + // HTTPClient returns an http.Client that can converse with // servers created by this test system. func (s *System) HTTPClient() *http.Client {