Skip to content

Commit

Permalink
Try to make tag.Map threadsafe.
Browse files Browse the repository at this point in the history
This adds a lockedMap type which implements tag.Map interface. The
implementation contains tagMap instance and a mutex, so all the
operations are forwarded to the tagMap under the locked mutex.

An additional care was needed for the functions returning contents of
the map, because core.Value contains a byte slice, which has pointer
like semantics. So to avoid accidental changes, we copy the value if
it is of BYTES type. This likely should be handled by the core.Value
itself, e.g. through some Copy function.

The downside of locking here is that the users of Foreach function
need to be careful to not call into the same map, otherwise deadlock
will happen.

While writing this code I think I have got an understanding of the
issue at hand - the implementation of the tag.Map should basically be
immutable (so the modification of the map actually produces a new map,
instead of doing in-place updates, thus making the map threadsafe),
but that is not easy (or even possible) to enforce. So maybe it's
indeed better to avoid providing the ability of having a different,
vendor-specific implementation of tag.Map and have a good-by-default
implemetation as a part of API.

Still, to preserve the immutability of the map, the core.Value structs
need to be deep-copied.

Fixes #59
  • Loading branch information
krnowak committed Aug 12, 2019
1 parent 623b062 commit b872e0d
Showing 1 changed file with 62 additions and 2 deletions.
64 changes: 62 additions & 2 deletions api/tag/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tag

import (
"context"
"sync"

"go.opentelemetry.io/api/core"
)
Expand Down Expand Up @@ -68,8 +69,67 @@ type Map interface {
Foreach(func(kv core.KeyValue) bool)
}

type lockedMap struct {
lock sync.Mutex
impl Map
}

var _ Map = &lockedMap{}

func newLockedMap(impl Map) Map {
return &lockedMap{
lock: sync.Mutex{},
impl: impl,
}
}

func (m *lockedMap) Apply(update MapUpdate) Map {
m.lock.Lock()
defer m.lock.Unlock()
newImpl := m.impl.Apply(update)
return newLockedMap(newImpl)
}

func (m *lockedMap) Value(key core.Key) (core.Value, bool) {
m.lock.Lock()
defer m.lock.Unlock()
v, exists := m.impl.Value(key)
if exists && v.Type == core.BYTES {
b := v.Bytes
v.Bytes = make([]byte, len(b))
copy(v.Bytes, b)
}
return v, exists
}

func (m *lockedMap) HasValue(key core.Key) bool {
m.lock.Lock()
defer m.lock.Unlock()
return m.impl.HasValue(key)
}

func (m *lockedMap) Len() int {
m.lock.Lock()
defer m.lock.Unlock()
return m.impl.Len()
}

func (m *lockedMap) Foreach(callback func(kv core.KeyValue) bool) {
m.lock.Lock()
defer m.lock.Unlock()
f := func(kv core.KeyValue) bool {
if kv.Value.Type == core.BYTES {
b := kv.Value.Bytes
kv.Value.Bytes = make([]byte, len(b))
copy(kv.Value.Bytes, b)
}
return callback(kv)
}
m.impl.Foreach(f)
}

func NewEmptyMap() Map {
return tagMap{}
return newLockedMap(tagMap{})
}

func NewMap(update MapUpdate) Map {
Expand All @@ -90,5 +150,5 @@ func FromContext(ctx context.Context) Map {
if m, ok := ctx.Value(ctxTagsKey).(Map); ok {
return m
}
return tagMap{}
return NewEmptyMap()
}

0 comments on commit b872e0d

Please sign in to comment.