Skip to content

Commit

Permalink
Support Marathon SSE (#207)
Browse files Browse the repository at this point in the history
* Support Marathon SSE (#187)
* Log error wehn events queue if full (#201)
* Add metrics: events.read.type (#203)
* Fix marathon events delay, now fetched from event data (#206)

Fixes: #143
  • Loading branch information
tomez authored and janisz committed Mar 10, 2017
1 parent b416d54 commit 81db6e7
Show file tree
Hide file tree
Showing 25 changed files with 954 additions and 71 deletions.
33 changes: 25 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ Register [Marathon](https://mesosphere.github.io/marathon/) Tasks as [Consul](ht
forwards it to Consul agents. It also re-syncs all the information from Marathon
to Consul on startup and repeats it with given interval.

Note: In the future release Event Bus (callbacks) will be considered deprecated and eventually
removed in favor of Event Stream (SSE).
Right now marathon-consul is supporting both solutions.
SSE is provided as experimental feature, disabled by default ([more](#sse-support)).

## Code

This project is based on
Expand Down Expand Up @@ -101,7 +106,6 @@ curl -X POST 'http://marathon.service.consul:8080/v2/eventSubscriptions?callback
```
The event subscription should be set to `localhost` to reduce network traffic.


## Usage

### Marathon masters
Expand Down Expand Up @@ -165,11 +169,12 @@ for more details.
### Sync

- The scheduled Marathon-consul sync may run in two modes:
- Only on node that is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader), `sync-leader` parameter should be set to `hostname:port` the current node appears in the Marathon cluster.
- Only on node that is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader), `sync-leader` parameter should be set to `hostname:port` the current node appears in the Marathon cluster.
This mode is **enabled by default** and the `sync-leader` property is set to the hostname resolved by OS.
Note that there is a difference between `sync-leader` and `marathon-location`: `sync-leader` is used for node leadership detection (should be set to cluster-wide node name), while `marathon-location` is used for connection purpose (may be set to `localhost`)
- On every node, `sync-force` parameter should be set to `true`


### Options

Argument | Default | Description
Expand Down Expand Up @@ -215,6 +220,8 @@ sync-force | `false` | Force leadership-independent Mar
sync-interval | `15m0s` | Marathon-consul sync interval
sync-leader | | Marathon cluster-wide node name (defaults to <hostname>:8080), the sync will run only if the specified node is the current Marathon-leader
workers-pool-size | `10` | Number of concurrent workers processing events
sse-enabled | `false` | Enable marathon-consul SSE on this node
web-enabled | `true` | Enable marathon-consul Web callbacks on this node

### Endpoints

Expand All @@ -227,7 +234,7 @@ Endpoint | Description

### Register under multiple ports

If you need to map your Marathon task into multiple service registrations in Consul, you can configure marathon-consul
If you need to map your Marathon task into multiple service registrations in Consul, you can configure marathon-consul
via Marathon's `portDefinitions`:

```
Expand Down Expand Up @@ -276,15 +283,15 @@ curl -X GET http://localhost:8500/v1/catalog/service/my-app-other-name
],
"ServicePort": 31293,
...
```
```

If any port definition contains the `consul` label, then advanced configuration mode is enabled. As a result, only the ports
If any port definition contains the `consul` label, then advanced configuration mode is enabled. As a result, only the ports
containing this label are registered, under the name specified as the label's value – with empty value resolved to default name.
Names don't have to be unique – you can have multiple registrations under the same name, but on different ports,
Names don't have to be unique – you can have multiple registrations under the same name, but on different ports,
perhaps with different tags. Note that the `consul` label still needs to be present in the top-level application labels, even
though its value won't have any effect.

Tags configured in the top-level application labels will be added to all registrations. Tags configured in the port definition
Tags configured in the top-level application labels will be added to all registrations. Tags configured in the port definition
labels will be added only to corresponding registrations.

All registrations share the same `marathon-task` tag.
Expand All @@ -294,9 +301,19 @@ All registrations share the same `marathon-task` tag.
Until 1.x.x marathon-consul would register services in Consul with registration id equal to related Marathon task id. Since 1.x.x registration ids are different and
an additional tag, `marathon-task`, is added to each registration.

If you update marathon-consul from version 0.x.x to 1.x.x, expect the synchronization phase during the first startup to
If you update marathon-consul from version 0.x.x to 1.x.x, expect the synchronization phase during the first startup to
reregister all healthy services managed by marathon-consul to the new format. Unhealthy services will get deregistered in the process.

## SSE Support

In future callback interface between marathon and marathon-consul will be replaced by SSE.
While using SSE please consider:
- SSE is using Web module config for queues, event sizes, in the future will be moved to sse module,
- SSE is using sync-leader config for determining current leader, when this value match leader returned by marathon (/v2/leader endpoint)
then SSE is started on this instance,
- when enabled SSE is spawning its own own set of workers and separated dispatcher,
- be advised to disable marathon callback subscription when enabling SSE, otherwise it might result in doubling registers and deregisers.

## Known limitations

The following section describes known limitations in `marathon-consul`.
Expand Down
8 changes: 7 additions & 1 deletion apps/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ package apps
import (
"encoding/json"
"strings"

"github.com/allegro/marathon-consul/time"
)

type Task struct {
ID TaskID `json:"id"`
ID TaskID `json:"id"`
// Timestamp field is not a part of a Marathon task object.
// It's only present in StatusUpdateEventType and we are using this struct for decoding it.
// As well as for Marathon Task.
Timestamp time.Timestamp `json:"timestamp"`
TaskStatus string `json:"taskStatus"`
AppID AppID `json:"appId"`
Host string `json:"host"`
Expand Down
3 changes: 3 additions & 0 deletions apps/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"testing"

"github.com/allegro/marathon-consul/time"
"github.com/stretchr/testify/assert"
)

Expand All @@ -13,6 +14,7 @@ func TestParseTask(t *testing.T) {

testTask := &Task{
ID: "my-app_0-1396592784349",
Timestamp: time.Timestamp{},
AppID: "/my-app",
Host: "slave-1234.acme.org",
Ports: []int{31372},
Expand Down Expand Up @@ -40,6 +42,7 @@ func TestParseTasks(t *testing.T) {
expectedTasks := []Task{
{
ID: "test.47de43bd-1a81-11e5-bdb6-e6cb6734eaf8",
Timestamp: time.Timestamp{},
AppID: "/test",
Host: "192.168.2.114",
Ports: []int{31315},
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/allegro/marathon-consul/marathon"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/sentry"
"github.com/allegro/marathon-consul/sse"
"github.com/allegro/marathon-consul/sync"
"github.com/allegro/marathon-consul/web"
flag "github.com/ogier/pflag"
Expand All @@ -20,6 +21,7 @@ import (
type Config struct {
Consul consul.Config
Web web.Config
SSE sse.Config
Sync sync.Config
Marathon marathon.Config
Metrics metrics.Config
Expand Down Expand Up @@ -81,6 +83,10 @@ func (config *Config) parseFlags() {
flag.IntVar(&config.Web.QueueSize, "events-queue-size", 1000, "Size of events queue")
flag.IntVar(&config.Web.WorkersCount, "workers-pool-size", 10, "Number of concurrent workers processing events")
flag.Int64Var(&config.Web.MaxEventSize, "event-max-size", 4096, "Maximum size of event to process (bytes)")
flag.BoolVar(&config.Web.Enabled, "web-enabled", true, "Enable web events (callbacks).")

// SSE
flag.BoolVar(&config.SSE.Enabled, "sse-enabled", false, "Enable sse event stream.")

// Sync
flag.BoolVar(&config.Sync.Enabled, "sync-enabled", true, "Enable Marathon-consul scheduled sync")
Expand Down
5 changes: 5 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/allegro/marathon-consul/marathon"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/sentry"
"github.com/allegro/marathon-consul/sse"
"github.com/allegro/marathon-consul/sync"
timeutil "github.com/allegro/marathon-consul/time"
"github.com/allegro/marathon-consul/web"
Expand Down Expand Up @@ -108,6 +109,10 @@ func TestConfig_ShouldBeMergedWithFileDefaultsAndFlags(t *testing.T) {
QueueSize: 1000,
WorkersCount: 10,
MaxEventSize: 4096,
Enabled: true,
},
SSE: sse.Config{
Enabled: false,
},
Sync: sync.Config{
Interval: timeutil.Interval{Duration: 15 * time.Minute},
Expand Down
8 changes: 6 additions & 2 deletions debian/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
"Listen": ":4000",
"QueueSize": 1000,
"WorkersCount": 10,
"MaxEventSize": 4096
"MaxEventSize": 4096,
"Enabled": true
},
"SSE": {
"Enabled": false
},
"Sync": {
"Enabled": true,
Expand Down Expand Up @@ -55,4 +59,4 @@
"Level": "error"
}
}
}
}
5 changes: 4 additions & 1 deletion events/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (fh *eventHandler) handleHealthyTask(body []byte) error {
log.WithError(err).Error("Body generated error")
return err
}
delay := taskHealthChange.Timestamp.Delay()
metrics.UpdateGauge("events.read.delay.current", int64(delay))

appID := taskHealthChange.AppID
taskID := taskHealthChange.TaskID()
Expand Down Expand Up @@ -146,11 +148,12 @@ func (fh *eventHandler) handleHealthyTask(body []byte) error {

func (fh *eventHandler) handleStatusEvent(body []byte) error {
task, err := apps.ParseTask(body)

if err != nil {
log.WithError(err).WithField("Body", body).Error("Could not parse event body")
return err
}
delay := task.Timestamp.Delay()
metrics.UpdateGauge("events.read.delay.current", int64(delay))

log.WithFields(log.Fields{
"Id": task.ID,
Expand Down
149 changes: 149 additions & 0 deletions events/sse_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package events

import (
"bufio"
"bytes"
"fmt"
"io"
)

// Event holds state of parsed fields from marathon EventStream
type SSEEvent struct {
Type string
Body []byte
ID string
Delay string
maxLineSize int64
}

var (
lineFeed = []byte("\n")
colon = []byte{':'}
space = []byte{' '}
)

func (e *SSEEvent) parseLine(line []byte) bool {
// https://www.w3.org/TR/2011/WD-eventsource-20110208/
// Quote: Lines must be separated by either a U+000D CARRIAGE RETURN U+000A
// LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character,
// or a single U+000D CARRIAGE RETURN (CR) character.

//If the line is empty (a blank line)
if len(line) == 0 || bytes.Compare(line, lineFeed) == 0 {
//Dispatch the event, as defined below.
return !e.isEmpty()
}

//If the line starts with a U+003A COLON character (:)
if bytes.HasPrefix(line, colon) {
//Ignore the line.
return false
}

var field string
var value []byte
//If the line contains a U+003A COLON character (:)
//Collect the characters on the line before the first U+003A COLON character (:), and let field be that string.
split := bytes.SplitN(line, colon, 2)
if len(split) == 2 {
field = string(split[0])
//Collect the characters on the line after the first U+003A COLON character (:), and let value be that string.
//If value starts with a U+0020 SPACE character, remove it from value.
value = bytes.TrimPrefix(split[1], space)
} else {
//Otherwise, the string is not empty but does not contain a U+003A COLON character (:)
//Process the field using the steps described below, using the whole line as the field name,
//and the empty string as the field value.
field = string(line)
value = []byte{}

}
stringValue := string(value)
//If the field name is
switch field {
case "event":
//Set the event name buffer to field value.
e.Type = stringValue
case "data":
//If the data buffer is not the empty string,
if len(value) != 0 {
//Append the field value to the data buffer,
//then append a single U+000A LINE FEED (LF) character to the data buffer.
e.Body = append(e.Body, value...)
e.Body = append(e.Body, '\n')
}
case "id":
//Set the last event ID buffer to the field value.
e.ID = stringValue
case "retry":
e.Delay = stringValue
// TODO consider reconnection delay
}

return false
}

func (e *SSEEvent) isEmpty() bool {
return e.Type == "" && e.Body == nil && e.ID == ""
}

func (e *SSEEvent) String() string {
return fmt.Sprintf("Type: %s, Body: %s", e.Type, string(e.Body))
}

func ParseSSEEvent(scanner *bufio.Scanner) (SSEEvent, error) {
e := SSEEvent{}

for dispatch := false; !dispatch; {
if !scanner.Scan() {
return e, io.EOF
}
line := scanner.Bytes()
dispatch = e.parseLine(line)
if err := scanner.Err(); err != nil {
return e, scanner.Err()
}
}
return e, nil
}

// ScanLines is higtly inspired by the function of the same name from bufio package,
// but is sensitive to CR as line separator
func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
pos := lineTerminatorPosition(data)
if pos != 0 {
return pos + 1, dropCR(data[0:pos]), nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), dropCR(data), nil
}
// Request more data.
return 0, nil, nil
}

func lineTerminatorPosition(data []byte) int {
// https://www.w3.org/TR/2011/WD-eventsource-20110208/
// Quote: Lines must be separated by either a U+000D CARRIAGE RETURN U+000A
// LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character,
// or a single U+000D CARRIAGE RETURN (CR) character.
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line
return i
} else if i := bytes.IndexByte(data, '\r'); i >= 0 {
// We have a full CR terminated line
return i
}
return 0
}

// dropCR drops a terminal \r from the data.
func dropCR(data []byte) []byte {
if len(data) > 0 && data[len(data)-1] == '\r' {
return data[0 : len(data)-1]
}
return data
}
Loading

0 comments on commit 81db6e7

Please sign in to comment.