Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to make tag.Map threadsafe. #89

Merged
merged 1 commit into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 13 additions & 53 deletions api/tag/api.go → api/tag/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ package tag

import (
"context"

"go.opentelemetry.io/api/core"
"runtime/pprof"
)

type ctxTagsType struct{}
Expand All @@ -26,56 +25,6 @@ var (
ctxTagsKey = &ctxTagsType{}
)

type MutatorOp int

const (
INSERT MutatorOp = iota
UPDATE
UPSERT
DELETE
)

type Mutator struct {
MutatorOp
core.KeyValue
MeasureMetadata
}

type MeasureMetadata struct {
TTL int // -1 == infinite, 0 == do not propagate
}

func (m Mutator) WithTTL(hops int) Mutator {
m.TTL = hops
return m
}

type MapUpdate struct {
SingleKV core.KeyValue
MultiKV []core.KeyValue
SingleMutator Mutator
MultiMutator []Mutator
}

type Map interface {
Apply(MapUpdate) Map

Value(core.Key) (core.Value, bool)
HasValue(core.Key) bool

Len() int

Foreach(func(kv core.KeyValue) bool)
}

func NewEmptyMap() Map {
return tagMap{}
}

func NewMap(update MapUpdate) Map {
return NewEmptyMap().Apply(update)
}

func WithMap(ctx context.Context, m Map) context.Context {
return context.WithValue(ctx, ctxTagsKey, m)
}
Expand All @@ -90,5 +39,16 @@ func FromContext(ctx context.Context) Map {
if m, ok := ctx.Value(ctxTagsKey).(Map); ok {
return m
}
return tagMap{}
return NewEmptyMap()
}

// Note: the golang pprof.Do API forces this memory allocation, we
// should file an issue about that. (There's a TODO in the source.)
func Do(ctx context.Context, f func(ctx context.Context)) {
m := FromContext(ctx)
keyvals := make([]string, 0, 2*len(m.m))
for k, v := range m.m {
keyvals = append(keyvals, k.Variable.Name, v.value.Emit())
}
pprof.Do(ctx, pprof.Labels(keyvals...), f)
}
127 changes: 55 additions & 72 deletions api/tag/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,91 @@
package tag

import (
"context"
"runtime/pprof"

"go.opentelemetry.io/api/core"
)

type MeasureMetadata struct {
TTL int // -1 == infinite, 0 == do not propagate
}

type tagContent struct {
value core.Value
meta MeasureMetadata
}

type tagMap map[core.Key]tagContent
type rawMap map[core.Key]tagContent

type Map struct {
m rawMap
}

var _ Map = tagMap{}
type MapUpdate struct {
SingleKV core.KeyValue
MultiKV []core.KeyValue
SingleMutator Mutator
MultiMutator []Mutator
}

func (t tagMap) Apply(update MapUpdate) Map {
m := make(tagMap, len(t)+len(update.MultiKV)+len(update.MultiMutator))
for k, v := range t {
m[k] = v
func newMap(raw rawMap) Map {
return Map{
m: raw,
}
}

func NewEmptyMap() Map {
return newMap(nil)
}

func NewMap(update MapUpdate) Map {
return NewEmptyMap().Apply(update)
}

func (m Map) Apply(update MapUpdate) Map {
r := make(rawMap, len(m.m)+len(update.MultiKV)+len(update.MultiMutator))
for k, v := range m.m {
r[k] = v
}
if update.SingleKV.Key.Defined() {
m[update.SingleKV.Key] = tagContent{
r[update.SingleKV.Key] = tagContent{
value: update.SingleKV.Value,
}
}
for _, kv := range update.MultiKV {
m[kv.Key] = tagContent{
r[kv.Key] = tagContent{
value: kv.Value,
}
}
if update.SingleMutator.Key.Defined() {
m.apply(update.SingleMutator)
r.apply(update.SingleMutator)
}
for _, mutator := range update.MultiMutator {
m.apply(mutator)
r.apply(mutator)
}
if len(r) == 0 {
r = nil
}
return m
return newMap(r)
}

func (m tagMap) Value(k core.Key) (core.Value, bool) {
entry, ok := m[k]
func (m Map) Value(k core.Key) (core.Value, bool) {
entry, ok := m.m[k]
if !ok {
entry.value.Type = core.INVALID
}
return entry.value, ok
}

func (m tagMap) HasValue(k core.Key) bool {
func (m Map) HasValue(k core.Key) bool {
_, has := m.Value(k)
return has
}

func (m tagMap) Len() int {
return len(m)
func (m Map) Len() int {
return len(m.m)
}

func (m tagMap) Foreach(f func(kv core.KeyValue) bool) {
for k, v := range m {
func (m Map) Foreach(f func(kv core.KeyValue) bool) {
for k, v := range m.m {
if !f(core.KeyValue{
Key: k,
Value: v.value,
Expand All @@ -82,68 +109,24 @@ func (m tagMap) Foreach(f func(kv core.KeyValue) bool) {
}
}

func (m tagMap) apply(mutator Mutator) {
if m == nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This never happens, so I dropped the check.

return
}
func (r rawMap) apply(mutator Mutator) {
key := mutator.KeyValue.Key
content := tagContent{
value: mutator.KeyValue.Value,
meta: mutator.MeasureMetadata,
}
switch mutator.MutatorOp {
case INSERT:
if _, ok := m[key]; !ok {
m[key] = content
if _, ok := r[key]; !ok {
r[key] = content
}
case UPDATE:
if _, ok := m[key]; ok {
m[key] = content
if _, ok := r[key]; ok {
r[key] = content
}
case UPSERT:
m[key] = content
r[key] = content
case DELETE:
delete(m, key)
}
}

func Insert(kv core.KeyValue) Mutator {
return Mutator{
MutatorOp: INSERT,
KeyValue: kv,
}
}

func Update(kv core.KeyValue) Mutator {
return Mutator{
MutatorOp: UPDATE,
KeyValue: kv,
}
}

func Upsert(kv core.KeyValue) Mutator {
return Mutator{
MutatorOp: UPSERT,
KeyValue: kv,
}
}

func Delete(k core.Key) Mutator {
return Mutator{
MutatorOp: DELETE,
KeyValue: core.KeyValue{
Key: k,
},
}
}

// Note: the golang pprof.Do API forces this memory allocation, we
// should file an issue about that. (There's a TODO in the source.)
func Do(ctx context.Context, f func(ctx context.Context)) {
m := FromContext(ctx).(tagMap)
keyvals := make([]string, 0, 2*len(m))
for k, v := range m {
keyvals = append(keyvals, k.Variable.Name, v.value.Emit())
delete(r, key)
}
pprof.Do(ctx, pprof.Labels(keyvals...), f)
}
69 changes: 69 additions & 0 deletions api/tag/mutator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tag

import (
"go.opentelemetry.io/api/core"
)

type MutatorOp int

const (
INSERT MutatorOp = iota
UPDATE
UPSERT
DELETE
)

type Mutator struct {
MutatorOp
core.KeyValue
MeasureMetadata
}

func (m Mutator) WithTTL(hops int) Mutator {
m.TTL = hops
return m
}

func Insert(kv core.KeyValue) Mutator {
return Mutator{
MutatorOp: INSERT,
KeyValue: kv,
}
}

func Update(kv core.KeyValue) Mutator {
return Mutator{
MutatorOp: UPDATE,
KeyValue: kv,
}
}

func Upsert(kv core.KeyValue) Mutator {
return Mutator{
MutatorOp: UPSERT,
KeyValue: kv,
}
}

func Delete(k core.Key) Mutator {
return Mutator{
MutatorOp: DELETE,
KeyValue: core.KeyValue{
Key: k,
},
}
}
6 changes: 3 additions & 3 deletions experimental/streaming/exporter/reader/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func AppendEvent(buf *strings.Builder, data reader.Event) {
} else {
buf.WriteString(" <")
f(false)(parentSpanIDKey.String(data.Parent.SpanIDString()))
if data.ParentAttributes != nil {
if data.ParentAttributes.Len() > 0 {
data.ParentAttributes.Foreach(f(false))
}
buf.WriteString(" >")
Expand Down Expand Up @@ -113,10 +113,10 @@ func AppendEvent(buf *strings.Builder, data reader.Event) {

// Attach the scope (span) attributes and context tags.
buf.WriteString(" [")
if data.Attributes != nil {
if data.Attributes.Len() > 0 {
data.Attributes.Foreach(f(false))
}
if data.Tags != nil {
if data.Tags.Len() > 0 {
data.Tags.Foreach(f(true))
}
if data.SpanContext.HasSpanID() {
Expand Down
2 changes: 1 addition & 1 deletion experimental/streaming/exporter/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (ro *readerObserver) addMeasurement(e *Event, m stats.Measurement) {

func (ro *readerObserver) readMeasureScope(m stats.Measure) (tag.Map, *readerSpan) {
// TODO
return nil, nil
return tag.NewEmptyMap(), nil
}

func (ro *readerObserver) readScope(id observer.ScopeID) (tag.Map, *readerSpan) {
Expand Down
3 changes: 2 additions & 1 deletion sdk/trace/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/api/core"
"go.opentelemetry.io/api/tag"
apitrace "go.opentelemetry.io/api/trace"
)

Expand Down Expand Up @@ -94,5 +95,5 @@ func (tr *tracer) WithComponent(component string) apitrace.Tracer {
}

func (tr *tracer) Inject(ctx context.Context, span apitrace.Span, injector apitrace.Injector) {
injector.Inject(span.SpanContext(), nil)
injector.Inject(span.SpanContext(), tag.NewEmptyMap())
}