Skip to content

Commit 7a9a8af

Browse files
committed
Code added with Makefile and dependencies
1 parent 057e794 commit 7a9a8af

File tree

16 files changed

+733
-1
lines changed

16 files changed

+733
-1
lines changed

.gitignore

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,40 @@
11
# Binaries for programs and plugins
22
*.exe
3-
*.exe~
43
*.dll
54
*.so
65
*.dylib
6+
*.o
7+
*.a
8+
9+
# Folders
10+
_obj
11+
_test
12+
13+
# Architecture specific extensions/prefixes
14+
*.[568vq]
15+
[568vq].out
16+
17+
*.cgo1.go
18+
*.cgo2.c
19+
_cgo_defun.c
20+
_cgo_gotypes.go
21+
_cgo_export.*
22+
23+
_testmain.go
24+
25+
*.prof
726

827
# Test binary, build with `go test -c`
928
*.test
1029

1130
# Output of the go coverage tool, specifically when used with LiteIDE
1231
*.out
32+
33+
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
34+
.glide/
35+
36+
# Gogland
37+
.idea/
38+
39+
# Dependencies
40+
vendor/

Makefile

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
PKGS ?= $(shell glide novendor)
2+
BENCH_FLAGS ?= -benchmem
3+
4+
.PHONY: all
5+
all: lint test
6+
7+
.PHONY: dependencies
8+
dependencies:
9+
@echo "Installing Glide and locked dependencies..."
10+
glide --version || go get -u -f github.com/Masterminds/glide
11+
glide install
12+
@echo "Installing goimports..."
13+
go install ./vendor/golang.org/x/tools/cmd/goimports
14+
@echo "Installing golint..."
15+
go install ./vendor/golang.org/x/lint/golint
16+
@echo "Installing gosimple..."
17+
go install ./vendor/honnef.co/go/tools/cmd/gosimple
18+
@echo "Installing unused..."
19+
go install ./vendor/honnef.co/go/tools/cmd/unused
20+
@echo "Installing staticcheck..."
21+
go install ./vendor/honnef.co/go/tools/cmd/staticcheck
22+
23+
.PHONY: lint
24+
lint:
25+
@echo "Checking formatting..."
26+
@gofiles=$$(go list -f {{.Dir}} $(PKGS) | grep -v mock) && [ -z "$$gofiles" ] || unformatted=$$(for d in $$gofiles; do goimports -l $$d/*.go; done) && [ -z "$$unformatted" ] || (echo >&2 "Go files must be formatted with goimports. Following files has problem:\n$$unformatted" && false)
27+
@echo "Checking vet..."
28+
@go vet $(PKG_FILES)
29+
@echo "Checking simple..."
30+
@gosimple $(PKG_FILES)
31+
@echo "Checking unused..."
32+
@unused $(PKG_FILES)
33+
@echo "Checking staticcheck..."
34+
@staticcheck $(PKG_FILES)
35+
@echo "Checking lint..."
36+
@$(foreach dir,$(PKGS),golint $(dir);)
37+
38+
.PHONY: test
39+
test:
40+
go test -timeout 20s -race -v $(PKGS)
41+
42+
.PHONY: cover
43+
cover:
44+
./cover.sh $(PKGS)
45+
46+
.PHONY: bench
47+
BENCH ?= .
48+
bench:
49+
$(foreach pkg,$(PKGS),go test -bench=$(BENCH) -run="^$$" $(BENCH_FLAGS) $(pkg);)
50+
51+
.PHONY: fmt
52+
fmt:
53+
@echo "Formatting files..."
54+
@gofiles=$$(go list -f {{.Dir}} $(PKGS) | grep -v mock) && [ -z "$$gofiles" ] || for d in $$gofiles; do goimports -l -w $$d/*.go; done
55+

broker/broker.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package broker
2+
3+
import "github.com/monetha/go-distributed/task"
4+
5+
// Broker is an interface for creating distributed tasks.
6+
type Broker interface {
7+
// NewTask creates long-running distributed task.
8+
NewTask(key string, fun task.Func) (*task.Task, error)
9+
}

broker/consulbroker/broker.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package consulbroker
2+
3+
import (
4+
"time"
5+
6+
"github.com/hashicorp/consul/api"
7+
"github.com/monetha/go-distributed/task"
8+
)
9+
10+
const (
11+
// LockWaitTime is how long we block for at a time to check if locker
12+
// acquisition is possible. This affects the minimum time it takes to cancel
13+
// a Lock acquisition.
14+
LockWaitTime = 2 * time.Second
15+
)
16+
17+
// Config is used to configure the creation of a broker
18+
type Config struct {
19+
// Address is the address of the Consul server
20+
Address string
21+
22+
// Scheme is the URI scheme for the Consul server
23+
Scheme string
24+
25+
// Token is used to provide a per-request ACL token
26+
Token string
27+
}
28+
29+
// Broker can start long-running tasks (minutes to permanent) in Consul cluster.
30+
type Broker struct {
31+
client *api.Client
32+
}
33+
34+
// New creates a broker that can start long-running tasks in Consul cluster.
35+
func New(config *Config) (*Broker, error) {
36+
if config == nil {
37+
config = &Config{}
38+
}
39+
40+
client, err := api.NewClient(&api.Config{
41+
Address: config.Address,
42+
Scheme: config.Scheme,
43+
Token: config.Token,
44+
})
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
return &Broker{client: client}, nil
50+
}
51+
52+
// NewTask creates new long-running task in Consul cluster.
53+
// Task makes a best effort to ensure that exactly one instance of a task is executing in a cluster.
54+
// Task may be re-started when needed until it's been closed.
55+
func (b *Broker) NewTask(key string, fun task.Func) (*task.Task, error) {
56+
return task.New(newConsulLocker(b.client, key, LockWaitTime), fun), nil
57+
}

broker/consulbroker/broker_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package consulbroker
2+
3+
import "github.com/monetha/go-distributed/broker"
4+
5+
var (
6+
_ broker.Broker = &Broker{} // ensure Broker implements Broker interface
7+
)

broker/consulbroker/example_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package consulbroker_test
2+
3+
import (
4+
"context"
5+
"log"
6+
"os"
7+
"os/signal"
8+
"syscall"
9+
"time"
10+
11+
"github.com/monetha/go-distributed/broker/consulbroker"
12+
)
13+
14+
func ExampleNew() {
15+
defer log.Println("service stopped.")
16+
17+
b, err := consulbroker.New(&consulbroker.Config{Address: "127.0.0.1:8500"})
18+
if err != nil {
19+
log.Printf("New broker: %v", err)
20+
return
21+
}
22+
23+
t, err := b.NewTask("some/long/running/task", someLongRunningTask)
24+
if err != nil {
25+
log.Printf("New task: %v", err)
26+
return
27+
}
28+
defer t.Close()
29+
30+
t2, err := b.NewTask("other/long/running/task", otherLongRunningTask)
31+
if err != nil {
32+
log.Printf("New task: %v", err)
33+
return
34+
}
35+
defer t2.Close()
36+
37+
sigChan := make(chan os.Signal, 1)
38+
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
39+
<-sigChan
40+
}
41+
42+
func someLongRunningTask(ctx context.Context) error {
43+
log.Println("someLongRunningTask: I'm alive!!")
44+
45+
ticker := time.NewTicker(10 * time.Second)
46+
defer ticker.Stop()
47+
48+
for {
49+
select {
50+
case <-ticker.C:
51+
log.Println("someLongRunningTask: Doing some interesting stuff..")
52+
case <-ctx.Done():
53+
log.Println("someLongRunningTask: Oh, no! I should stop my work.")
54+
return ctx.Err()
55+
}
56+
}
57+
}
58+
59+
func otherLongRunningTask(ctx context.Context) error {
60+
log.Println("otherLongRunningTask: BANG BANG!!")
61+
62+
select {
63+
case <-time.After(10 * time.Second):
64+
log.Println("otherLongRunningTask: BOOO!!")
65+
return nil
66+
case <-ctx.Done():
67+
log.Println("otherLongRunningTask: good bye")
68+
return ctx.Err()
69+
}
70+
}

broker/consulbroker/locker.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package consulbroker
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/hashicorp/consul/api"
8+
"github.com/monetha/go-distributed/locker"
9+
)
10+
11+
// consulLocker wraps Consul distributed lock by implementing Locker interface.
12+
type consulLocker struct {
13+
client *api.Client
14+
key string
15+
lockWaitTime time.Duration
16+
lock *api.Lock
17+
}
18+
19+
// NewConsulLocker creates new consulLocker instance.
20+
func newConsulLocker(client *api.Client, key string, lockWaitTime time.Duration) *consulLocker {
21+
return &consulLocker{
22+
client: client,
23+
key: key,
24+
lockWaitTime: lockWaitTime,
25+
}
26+
}
27+
28+
// Key returns the name of locker.
29+
func (l *consulLocker) Key() string {
30+
return l.key
31+
}
32+
33+
// Lock attempts to acquire the locker and blocks while doing so.
34+
// Providing a non-nil stopCh can be used to abort the locker attempt.
35+
// Returns a channel that is closed if our locker is lost or an error.
36+
// This channel could be closed at any time due to session invalidation,
37+
// communication errors, operator intervention, etc. It is NOT safe to
38+
// assume that the locker is held until Unlock(), application must be able
39+
// to handle the locker being lost.
40+
func (l *consulLocker) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
41+
if l.lock != nil {
42+
return nil, locker.ErrLockHeld
43+
}
44+
45+
lock, err := l.client.LockOpts(&api.LockOptions{
46+
Key: l.key,
47+
LockWaitTime: l.lockWaitTime,
48+
})
49+
if err != nil {
50+
return nil, fmt.Errorf("locker: creating lock opts %s: %v", l.key, err)
51+
}
52+
53+
lockCh, err := lock.Lock(stopCh)
54+
if err != nil {
55+
return nil, fmt.Errorf("locker: lock %s: %v", l.key, err)
56+
}
57+
58+
if lockCh == nil {
59+
return nil, locker.LockCancelled(l.key)
60+
}
61+
62+
l.lock = lock
63+
64+
return lockCh, nil
65+
}
66+
67+
// Unlock released the locker. It is an error to call this
68+
// if the locker is not currently held.
69+
func (l *consulLocker) Unlock() error {
70+
if l.lock == nil {
71+
return locker.ErrLockNotHeld
72+
}
73+
defer func() {
74+
l.lock = nil
75+
}()
76+
77+
return l.lock.Unlock()
78+
}

0 commit comments

Comments
 (0)