Skip to content

Commit

Permalink
Add eventlog logging (#33)
Browse files Browse the repository at this point in the history
Add logging of semi-structured events. These events can be used for (ad hoc) analytics, e.g. count how many machines were lost, compare number of spot instances, etc. Some of this information could be derived from a cloud provider, however this mechanism has some nice properties: it provides the perspective of bigmachine (e.g. a machine could still be running in EC2, but not available to bigmachine); it makes it easy to unify with other events (e.g. we can log keepalive RPC call behavior and compare it to RPC call behavior from other layers); it's lightweight to add new events.

For this initial addition, we log a few events (with immediate utility to GRAIL):
* `bigmachine:machineError`: we have an error reaching a machine and consider it stopped.
* `bigmachine:machineStop`: a machine is stopped for any reason.
* `bigmachine:machineAlive`: whenever we get a successful keepalive response.
* `bigmachine:ec2:machineStart`: an EC2 machine/instance was started
* `bigmachine:ec2:spotInstanceRequestFulfill`: a spot instance request was fulfilled.
  • Loading branch information
jcharum authored Apr 7, 2020
1 parent 9299430 commit bb1c07c
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 10 deletions.
1 change: 1 addition & 0 deletions ec2system/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func init() {
"",
"the bootstrap bigmachine binary with which machines are launched")
sshkeys := constr.String("sshkey", "", "comma-separated list of ssh keys to be installed")
constr.InstanceVar(&system.Eventer, "eventer", "eventer/cloudwatch", "the event logger used to log bigmachine events")
constr.StringVar(&system.Username, "username", "", "user name for tagging purposes")
var sess *session.Session
constr.InstanceVar(&sess, "aws", "aws", "AWS configuration for all EC2 calls")
Expand Down
21 changes: 20 additions & 1 deletion ec2system/ec2machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/grailbio/base/errors"
"github.com/grailbio/base/eventlog"
"github.com/grailbio/base/fatbin"
"github.com/grailbio/base/limitbuf"
"github.com/grailbio/base/log"
Expand Down Expand Up @@ -221,6 +222,9 @@ type System struct {
// AdditionalEC2Tags will be applied to this system's instances.
AdditionalEC2Tags []*ec2.Tag

// Eventer is used to log semi-structured events in service of analytics.
Eventer eventlog.Eventer

privateKey *rsa.PrivateKey

config instances.Type
Expand Down Expand Up @@ -524,7 +528,11 @@ func (s *System) Start(ctx context.Context, count int) ([]*bigmachine.Machine, e
}
instanceIds := make([]string, n)
for i := range instanceIds {
instanceIds[i] = aws.StringValue(describe.SpotInstanceRequests[i].InstanceId)
r := describe.SpotInstanceRequests[i]
instanceIds[i] = aws.StringValue(r.InstanceId)
s.Event("bigmachine:ec2:spotInstanceRequestFulfill",
"requestID", r.SpotInstanceRequestId,
"instanceID", r.InstanceId)
}
return instanceIds, nil
}
Expand Down Expand Up @@ -624,6 +632,10 @@ func (s *System) Start(ctx context.Context, count int) ([]*bigmachine.Machine, e
if useInstanceIDSuffix {
machines[i].Addr += aws.StringValue(instance.InstanceId) + "/"
}
s.Event("bigmachine:ec2:machineStart",
"instanceType", s.InstanceType,
"addr", machines[i].Addr,
"instanceID", instance.InstanceId)
machines[i].Maxprocs = int(s.config.VCPU)
}
return machines, nil
Expand Down Expand Up @@ -908,6 +920,13 @@ func (s *System) Main() error {
return http.ListenAndServe(":3333", nil)
}

func (s *System) Event(typ string, fieldPairs ...interface{}) {
if s.Eventer == nil {
return
}
s.Eventer.Event(typ, fieldPairs...)
}

// ListenAndServe serves the provided handler on a HTTP server
// configured for secure communications between ec2system
// instances.
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ module github.com/grailbio/bigmachine
go 1.12

require (
github.com/aws/aws-sdk-go v1.25.13
github.com/cespare/xxhash v1.1.0 // indirect
github.com/aws/aws-sdk-go v1.29.24
github.com/google/pprof v0.0.0-20190930153522-6ce02741cba3
github.com/grailbio/base v0.0.6
github.com/grailbio/base v0.0.7
github.com/grailbio/testutil v0.0.3
github.com/shirou/gopsutil v2.19.9+incompatible
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc
golang.org/x/net v0.0.0-20191007182048-72f939374954
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ github.com/aws/aws-sdk-go v1.25.10 h1:3epJfNmP6xWkOpLOdhIIj07+9UAJwvbzq8bBzyPigI
github.com/aws/aws-sdk-go v1.25.10/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.25.13 h1:qc1PpYdVQXI4eH5Ou25LD3Mb68HAY+AUn7yG4cWlqj8=
github.com/aws/aws-sdk-go v1.25.13/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.29.24 h1:KOnds/LwADMDBaALL4UB98ZR+TUR1A1mYmAYbdLixLA=
github.com/aws/aws-sdk-go v1.29.24/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg=
github.com/biogo/store v0.0.0-20190426020002-884f370e325d/go.mod h1:Iev9Q3MErcn+w3UOJD/DkEzllvugfdx7bGcMOFhvr/4=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
Expand All @@ -43,6 +45,7 @@ github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dT
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
Expand Down Expand Up @@ -81,6 +84,8 @@ github.com/grailbio/base v0.0.1 h1:AUaPetRbOP7ytAVJAQjg74DgIrgwz29GxbyckQGrnVk=
github.com/grailbio/base v0.0.1/go.mod h1:wVM2Cq2/HT0rt6WYGQhXJ3CCLkNnGjeAAOPHCZ2IsN0=
github.com/grailbio/base v0.0.6 h1:Hr1OHe16+sQEf4CytYlTu2bR+Sq5VENKA/sZoDBGo+k=
github.com/grailbio/base v0.0.6/go.mod h1:OFVz7zmqb1D+Jbew0B4DCIpl4ozzVFxf+JKQZBBIQzE=
github.com/grailbio/base v0.0.7 h1:f9qigc0yUuQRIMVr+EmWCLT9pTlVaLshrGTfrtQl3kE=
github.com/grailbio/base v0.0.7/go.mod h1:VL8MWdM8WxkFUs4ribgWoGYlfty6Xyrat+lNNWWVCfs=
github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a h1:kAl1x1ErQgs55bcm/WdoKCPny/kIF7COmC+UGQ9GKcM=
github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a/go.mod h1:2g5HI42KHw+BDBdjLP3zs+WvTHlDK3RoE8crjCl26y4=
github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok=
Expand Down Expand Up @@ -112,6 +117,7 @@ github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsO
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA=
Expand Down Expand Up @@ -179,6 +185,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191007182048-72f939374954 h1:JGZucVF/L/TotR719NbujzadOZ2AgnYlqphQGHDCKaU=
golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
12 changes: 12 additions & 0 deletions local.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"os"
"os/exec"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -123,6 +124,17 @@ func (*localSystem) Main() error {
panic("not reached")
}

func (s *localSystem) Event(typ string, fieldPairs ...interface{}) {
fields := []string{fmt.Sprintf("eventType:%s", typ)}
for i := 0; i < len(fieldPairs); i++ {
name := fieldPairs[i].(string)
i++
value := fieldPairs[i]
fields = append(fields, fmt.Sprintf("%s:%v", name, value))
}
log.Debug.Print(strings.Join(fields, ", "))
}

func (s *localSystem) ListenAndServe(addr string, handler http.Handler) error {
if addr == "" {
addr = os.Getenv("BIGMACHINE_ADDR")
Expand Down
31 changes: 26 additions & 5 deletions machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ type Machine struct {
client *rpc.Client
cancel func()

// event logs an event. See System.Event.
event func(typ string, fieldPairs ...interface{})

mu sync.Mutex
state int64
err error
Expand Down Expand Up @@ -269,6 +272,10 @@ func (m *Machine) start(b *B) {
if m.keepalivePeriod == 0 {
m.keepalivePeriod, m.keepaliveTimeout, m.keepaliveRpcTimeout = b.System().KeepaliveConfig()
}
m.event = func(_ string, _ ...interface{}) {}
if b != nil {
m.event = b.system.Event
}
m.cancelers = make(map[canceler]struct{})
ctx := context.Background()
ctx, m.cancel = context.WithCancel(ctx)
Expand All @@ -288,6 +295,10 @@ func (m *Machine) setError(err error) {
m.err = err
m.mu.Unlock()
m.setState(Stopped)
m.event("bigmachine:machineError",
"addr", m.Addr,
"error", err.Error(),
)
log.Error.Printf("%s: %v", m.Addr, err)
}

Expand All @@ -313,6 +324,7 @@ func (m *Machine) setState(s State) {
c.Cancel()
}
m.cancelers = make(map[canceler]struct{})
m.event("bigmachine:machineStop", "addr", m.Addr)
}
m.mu.Unlock()
for _, c := range triggered {
Expand All @@ -321,8 +333,13 @@ func (m *Machine) setState(s State) {
}

func (m *Machine) loop(ctx context.Context, system System) {
start := time.Now()
m.setState(Starting)
if m.owner {
m.event("bigmachine:machineAlive",
"addr", m.Addr,
"duration", time.Since(start).Nanoseconds()/1e6,
)
if system != nil {
go func() {
var err error
Expand Down Expand Up @@ -387,11 +404,11 @@ func (m *Machine) loop(ctx context.Context, system System) {
// (up or down) by maintaining a periodic ping.
m.setState(Running)
for {
start := time.Now()
callStart := time.Now()
err := m.retryCall(ctx, m.keepaliveTimeout, m.keepaliveRpcTimeout, "Supervisor.Ping", 0, nil)
if err != nil {
m.errorf("ping failed after %s (timeout=%s, rpc timeout=%s): %v",
time.Since(start), m.keepaliveTimeout, m.keepaliveRpcTimeout, err)
time.Since(callStart), m.keepaliveTimeout, m.keepaliveRpcTimeout, err)
return
}
time.Sleep(m.keepalivePeriod / 2)
Expand Down Expand Up @@ -427,16 +444,20 @@ func (m *Machine) loop(ctx context.Context, system System) {

const keepalive = 5 * time.Minute
for {
start := time.Now()
callStart := time.Now()
var reply keepaliveReply
err := m.retryCall(ctx, m.keepaliveTimeout, m.keepaliveRpcTimeout, "Supervisor.Keepalive", keepalive, &reply)
if err != nil {
m.errorf("keepalive failed after %s (timeout=%s, rpc timeout=%s): %v",
time.Since(start), m.keepaliveTimeout, m.keepaliveRpcTimeout, err)
time.Since(callStart), m.keepaliveTimeout, m.keepaliveRpcTimeout, err)
return
}
m.event("bigmachine:machineAlive",
"addr", m.Addr,
"duration", time.Since(start).Nanoseconds()/1e6,
)
m.mu.Lock()
m.keepaliveReplyTimes[m.numKeepalive%len(m.keepaliveReplyTimes)] = time.Since(start)
m.keepaliveReplyTimes[m.numKeepalive%len(m.keepaliveReplyTimes)] = time.Since(callStart)
m.numKeepalive++
m.nextKeepalive = time.Now().Add(reply.Next)
m.mu.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions system.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ type System interface {
// take over the process; the bigmachine fails if main returns (and
// if it does, it should always return with an error).
Main() error
// Event logs an event of typ with (key, value) fields given in fieldPairs
// as k0, v0, k1, v1, ...kn, vn. For example:
//
// s.Event("bigmachine:machineStart", "addr", "https://m0")
//
// These semi-structured events are used for analytics.
Event(typ string, fieldPairs ...interface{})
// HTTPClient returns an HTTP client that can be used to communicate
// from drivers to machines as well as between machines.
HTTPClient() *http.Client
Expand Down
4 changes: 4 additions & 0 deletions testsystem/testsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (s *System) Main() error {
panic("Main called on testsystem")
}

// Event is a no-op for the test system, as we do not care about event logs in
// tests.
func (*System) Event(_ string, _ ...interface{}) {}

// HTTPClient returns an http.Client that can converse with
// servers created by this test system.
func (s *System) HTTPClient() *http.Client {
Expand Down

0 comments on commit bb1c07c

Please sign in to comment.