Skip to content

Commit

Permalink
Improved observability into operations (#36)
Browse files Browse the repository at this point in the history
* Add support for OP_UPDATE, OP_INSERT, OP_DELETE, OP_KILL_CURSORS

* Add support for parsing commands/collections from operations

* Make operations implement stringer interface for simpler debugging

* Move commands to separate file

* Add command and collection to error logs and metrics

* Add request debugging

* Pass tags down

* Fix tests

* Linting

* Fix no response in unacknowledged queries

* Fix var naming

* Re-add response_op_code tag

* Formatting
  • Loading branch information
mdehoog authored Oct 4, 2021
1 parent 12c318e commit f50da50
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 53 deletions.
79 changes: 79 additions & 0 deletions mongo/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package mongo

import (
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
)

type Command string

const (
Unknown Command = "unknown"
AbortTransaction Command = "abortTransaction"
Aggregate Command = "aggregate"
CommitTransaction Command = "commandTransaction"
Count Command = "count"
CreateIndexes Command = "createIndexes"
Delete Command = "delete"
Distinct Command = "distinct"
Drop Command = "drop"
DropDatabase Command = "dropDatabase"
DropIndexes Command = "dropIndexes"
EndSessions Command = "endSessions"
Find Command = "find"
FindAndModify Command = "findAndModify"
GetMore Command = "getMore"
Insert Command = "insert"
IsMaster Command = "isMaster"
Ismaster Command = "ismaster"
ListCollections Command = "listCollections"
ListIndexes Command = "listIndexes"
ListDatabases Command = "listDatabases"
MapReduce Command = "mapReduce"
Update Command = "update"
)

var collectionCommands = []Command{Aggregate, Count, CreateIndexes, Delete, Distinct, Drop, DropIndexes, Find, FindAndModify, Insert, ListIndexes, MapReduce, Update}
var int32Commands = []Command{AbortTransaction, Aggregate, CommitTransaction, DropDatabase, IsMaster, Ismaster, ListCollections, ListDatabases}
var int64Commands = []Command{GetMore}
var arrayCommands = []Command{EndSessions}

func IsWrite(command Command) bool {
switch command {
case CommitTransaction, CreateIndexes, Delete, Drop, DropIndexes, DropDatabase, FindAndModify, Insert, Update:
return true
}
return false
}

func CommandAndCollection(msg bsoncore.Document) (Command, string) {
for _, s := range collectionCommands {
if coll, ok := msg.Lookup(string(s)).StringValueOK(); ok {
return s, coll
}
}
for _, s := range int32Commands {
value := msg.Lookup(string(s))
if value.Data != nil {
return s, ""
}
}
for _, s := range int64Commands {
value := msg.Lookup(string(s))
if value.Data != nil {
return s, ""
}
}
for _, s := range arrayCommands {
value := msg.Lookup(string(s))
if value.Data != nil {
return s, ""
}
}
return Unknown, ""
}

func IsIsMasterDoc(doc bsoncore.Document) bool {
isMaster, _ := doc.Lookup(string(IsMaster)).Int32OK()
ismaster, _ := doc.Lookup(string(Ismaster)).Int32OK()
return ismaster+isMaster > 0
}
24 changes: 14 additions & 10 deletions mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,23 @@ func (m *Mongo) Close() {
}
}

func (m *Mongo) RoundTrip(msg *Message) (_ *Message, err error) {
func (m *Mongo) RoundTrip(msg *Message, tags []string) (_ *Message, err error) {
m.mu.RLock()
defer m.mu.RUnlock()

var addr address.Address
defer func() {
if err != nil {
cursorID, _ := msg.Op.CursorID()
command, collection := msg.Op.CommandAndCollection()
m.log.Error(
"Round trip error",
zap.Error(err),
zap.Int64("cursor_id", cursorID),
zap.Int32("op_code", int32(msg.Op.OpCode())),
zap.String("address", addr.String()),
zap.String("command", string(command)),
zap.String("collection", collection),
)
}
}()
Expand All @@ -189,6 +192,10 @@ func (m *Mongo) RoundTrip(msg *Message) (_ *Message, err error) {
}

addr = conn.Address()
tags = append(
tags,
fmt.Sprintf("address:%s", conn.Address().String()),
)

defer func() {
err := conn.Close()
Expand All @@ -203,12 +210,13 @@ func (m *Mongo) RoundTrip(msg *Message) (_ *Message, err error) {
return nil, errors.New("server ErrorProcessor type assertion failed")
}

wm, err := m.roundTrip(conn, msg.Wm, msg.Op.Unacknowledged())
unacknowledged := msg.Op.Unacknowledged()
wm, err := m.roundTrip(conn, msg.Wm, unacknowledged, tags)
if err != nil {
m.processError(err, ep, addr, conn)
return nil, err
}
if msg.Op.Unacknowledged() {
if unacknowledged {
return &Message{}, nil
}

Expand Down Expand Up @@ -278,21 +286,17 @@ func (m *Mongo) checkoutConnection(server driver.Server) (conn driver.Connection
}

// see https://github.com/mongodb/mongo-go-driver/blob/v1.7.2/x/mongo/driver/operation.go#L664-L681
func (m *Mongo) roundTrip(conn driver.Connection, req []byte, unacknowledged bool) (res []byte, err error) {
func (m *Mongo) roundTrip(conn driver.Connection, req []byte, unacknowledged bool, tags []string) (res []byte, err error) {
defer func(start time.Time) {
tags := []string{
fmt.Sprintf("address:%s", conn.Address().String()),
fmt.Sprintf("unacknowledged:%v", unacknowledged),
}
tags = append(tags, fmt.Sprintf("success:%v", err == nil))

_ = m.statsd.Distribution("request_size", float64(len(req)), tags, 1)
if err == nil && !unacknowledged {
// There is no response size for unacknowledged writes.
_ = m.statsd.Distribution("response_size", float64(len(res)), tags, 1)
}

roundTripTags := append(tags, fmt.Sprintf("success:%v", err == nil))
_ = m.statsd.Timing("round_trip", time.Since(start), roundTripTags, 1)
_ = m.statsd.Timing("round_trip", time.Since(start), tags, 1)
}(time.Now())

if err = conn.WriteWireMessage(m.roundTripCtx, req); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions mongo/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestRoundTrip(t *testing.T) {

msg := insertOpMsg(t)

res, err := m.RoundTrip(msg)
res, err := m.RoundTrip(msg, []string{})
assert.Nil(t, err)

single := mongo.ExtractSingleOpMsg(t, res)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestRoundTripProcessError(t *testing.T) {

msg := insertOpMsg(t)

res, err := m.RoundTrip(msg)
res, err := m.RoundTrip(msg, []string{})
assert.Nil(t, err)

single := mongo.ExtractSingleOpMsg(t, res)
Expand All @@ -103,7 +103,7 @@ func TestRoundTripProcessError(t *testing.T) {
// kill the proxy
p.Kill()

_, err = m.RoundTrip(msg)
_, err = m.RoundTrip(msg, []string{})
assert.Error(t, driver.Error{}, err)

assert.Equal(t, description.ServerKind(description.Unknown), m.Description().Servers[0].Kind, "Failed to update the server Kind to Unknown")
Expand Down
Loading

0 comments on commit f50da50

Please sign in to comment.