From 81db6e70d3946e25fbabf4a913ee1ee48d3950a3 Mon Sep 17 00:00:00 2001 From: tomez Date: Fri, 10 Mar 2017 15:17:35 +0100 Subject: [PATCH] Support Marathon SSE (#207) * Support Marathon SSE (#187) * Log error wehn events queue if full (#201) * Add metrics: events.read.type (#203) * Fix marathon events delay, now fetched from event data (#206) Fixes: #143 --- README.md | 33 +++- apps/task.go | 8 +- apps/task_test.go | 3 + config/config.go | 6 + config/config_test.go | 5 + debian/config.json | 8 +- events/event_handler.go | 5 +- events/sse_events.go | 149 +++++++++++++++++ events/sse_events_test.go | 269 ++++++++++++++++++++++++++++++ events/task_health_change.go | 3 +- events/task_health_change_test.go | 12 +- events/web_events.go | 28 +--- main.go | 18 +- marathon/marathon.go | 72 +++++++- marathon/marathon_stub.go | 8 + marathon/marathon_test.go | 111 +++++++++--- marathon/streamer.go | 75 +++++++++ sse/config.go | 7 + sse/sse.go | 72 ++++++++ sse/sse_handler.go | 111 ++++++++++++ sync/error_marathon_stub_test.go | 9 + sync/sync.go | 2 +- time/time.go | 8 + web/config.go | 1 + web/web_handler.go | 2 - 25 files changed, 954 insertions(+), 71 deletions(-) create mode 100644 events/sse_events.go create mode 100644 events/sse_events_test.go create mode 100644 marathon/streamer.go create mode 100644 sse/config.go create mode 100644 sse/sse.go create mode 100644 sse/sse_handler.go diff --git a/README.md b/README.md index 68adc50..087d181 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,11 @@ Register [Marathon](https://mesosphere.github.io/marathon/) Tasks as [Consul](ht forwards it to Consul agents. It also re-syncs all the information from Marathon to Consul on startup and repeats it with given interval. +Note: In the future release Event Bus (callbacks) will be considered deprecated and eventually +removed in favor of Event Stream (SSE). +Right now marathon-consul is supporting both solutions. +SSE is provided as experimental feature, disabled by default ([more](#sse-support)). + ## Code This project is based on @@ -101,7 +106,6 @@ curl -X POST 'http://marathon.service.consul:8080/v2/eventSubscriptions?callback ``` The event subscription should be set to `localhost` to reduce network traffic. - ## Usage ### Marathon masters @@ -165,11 +169,12 @@ for more details. ### Sync - The scheduled Marathon-consul sync may run in two modes: - - Only on node that is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader), `sync-leader` parameter should be set to `hostname:port` the current node appears in the Marathon cluster. + - Only on node that is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader), `sync-leader` parameter should be set to `hostname:port` the current node appears in the Marathon cluster. This mode is **enabled by default** and the `sync-leader` property is set to the hostname resolved by OS. Note that there is a difference between `sync-leader` and `marathon-location`: `sync-leader` is used for node leadership detection (should be set to cluster-wide node name), while `marathon-location` is used for connection purpose (may be set to `localhost`) - On every node, `sync-force` parameter should be set to `true` + ### Options Argument | Default | Description @@ -215,6 +220,8 @@ sync-force | `false` | Force leadership-independent Mar sync-interval | `15m0s` | Marathon-consul sync interval sync-leader | | Marathon cluster-wide node name (defaults to :8080), the sync will run only if the specified node is the current Marathon-leader workers-pool-size | `10` | Number of concurrent workers processing events +sse-enabled | `false` | Enable marathon-consul SSE on this node +web-enabled | `true` | Enable marathon-consul Web callbacks on this node ### Endpoints @@ -227,7 +234,7 @@ Endpoint | Description ### Register under multiple ports -If you need to map your Marathon task into multiple service registrations in Consul, you can configure marathon-consul +If you need to map your Marathon task into multiple service registrations in Consul, you can configure marathon-consul via Marathon's `portDefinitions`: ``` @@ -276,15 +283,15 @@ curl -X GET http://localhost:8500/v1/catalog/service/my-app-other-name ], "ServicePort": 31293, ... -``` +``` -If any port definition contains the `consul` label, then advanced configuration mode is enabled. As a result, only the ports +If any port definition contains the `consul` label, then advanced configuration mode is enabled. As a result, only the ports containing this label are registered, under the name specified as the label's value – with empty value resolved to default name. -Names don't have to be unique – you can have multiple registrations under the same name, but on different ports, +Names don't have to be unique – you can have multiple registrations under the same name, but on different ports, perhaps with different tags. Note that the `consul` label still needs to be present in the top-level application labels, even though its value won't have any effect. -Tags configured in the top-level application labels will be added to all registrations. Tags configured in the port definition +Tags configured in the top-level application labels will be added to all registrations. Tags configured in the port definition labels will be added only to corresponding registrations. All registrations share the same `marathon-task` tag. @@ -294,9 +301,19 @@ All registrations share the same `marathon-task` tag. Until 1.x.x marathon-consul would register services in Consul with registration id equal to related Marathon task id. Since 1.x.x registration ids are different and an additional tag, `marathon-task`, is added to each registration. -If you update marathon-consul from version 0.x.x to 1.x.x, expect the synchronization phase during the first startup to +If you update marathon-consul from version 0.x.x to 1.x.x, expect the synchronization phase during the first startup to reregister all healthy services managed by marathon-consul to the new format. Unhealthy services will get deregistered in the process. +## SSE Support + +In future callback interface between marathon and marathon-consul will be replaced by SSE. +While using SSE please consider: +- SSE is using Web module config for queues, event sizes, in the future will be moved to sse module, +- SSE is using sync-leader config for determining current leader, when this value match leader returned by marathon (/v2/leader endpoint) +then SSE is started on this instance, +- when enabled SSE is spawning its own own set of workers and separated dispatcher, +- be advised to disable marathon callback subscription when enabling SSE, otherwise it might result in doubling registers and deregisers. + ## Known limitations The following section describes known limitations in `marathon-consul`. diff --git a/apps/task.go b/apps/task.go index d999c8c..7f04d7b 100644 --- a/apps/task.go +++ b/apps/task.go @@ -3,10 +3,16 @@ package apps import ( "encoding/json" "strings" + + "github.com/allegro/marathon-consul/time" ) type Task struct { - ID TaskID `json:"id"` + ID TaskID `json:"id"` + // Timestamp field is not a part of a Marathon task object. + // It's only present in StatusUpdateEventType and we are using this struct for decoding it. + // As well as for Marathon Task. + Timestamp time.Timestamp `json:"timestamp"` TaskStatus string `json:"taskStatus"` AppID AppID `json:"appId"` Host string `json:"host"` diff --git a/apps/task_test.go b/apps/task_test.go index 5e8136a..0bba314 100644 --- a/apps/task_test.go +++ b/apps/task_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "testing" + "github.com/allegro/marathon-consul/time" "github.com/stretchr/testify/assert" ) @@ -13,6 +14,7 @@ func TestParseTask(t *testing.T) { testTask := &Task{ ID: "my-app_0-1396592784349", + Timestamp: time.Timestamp{}, AppID: "/my-app", Host: "slave-1234.acme.org", Ports: []int{31372}, @@ -40,6 +42,7 @@ func TestParseTasks(t *testing.T) { expectedTasks := []Task{ { ID: "test.47de43bd-1a81-11e5-bdb6-e6cb6734eaf8", + Timestamp: time.Timestamp{}, AppID: "/test", Host: "192.168.2.114", Ports: []int{31315}, diff --git a/config/config.go b/config/config.go index ce593f5..7ace9ba 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,7 @@ import ( "github.com/allegro/marathon-consul/marathon" "github.com/allegro/marathon-consul/metrics" "github.com/allegro/marathon-consul/sentry" + "github.com/allegro/marathon-consul/sse" "github.com/allegro/marathon-consul/sync" "github.com/allegro/marathon-consul/web" flag "github.com/ogier/pflag" @@ -20,6 +21,7 @@ import ( type Config struct { Consul consul.Config Web web.Config + SSE sse.Config Sync sync.Config Marathon marathon.Config Metrics metrics.Config @@ -81,6 +83,10 @@ func (config *Config) parseFlags() { flag.IntVar(&config.Web.QueueSize, "events-queue-size", 1000, "Size of events queue") flag.IntVar(&config.Web.WorkersCount, "workers-pool-size", 10, "Number of concurrent workers processing events") flag.Int64Var(&config.Web.MaxEventSize, "event-max-size", 4096, "Maximum size of event to process (bytes)") + flag.BoolVar(&config.Web.Enabled, "web-enabled", true, "Enable web events (callbacks).") + + // SSE + flag.BoolVar(&config.SSE.Enabled, "sse-enabled", false, "Enable sse event stream.") // Sync flag.BoolVar(&config.Sync.Enabled, "sync-enabled", true, "Enable Marathon-consul scheduled sync") diff --git a/config/config_test.go b/config/config_test.go index a121cce..4733c2f 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -10,6 +10,7 @@ import ( "github.com/allegro/marathon-consul/marathon" "github.com/allegro/marathon-consul/metrics" "github.com/allegro/marathon-consul/sentry" + "github.com/allegro/marathon-consul/sse" "github.com/allegro/marathon-consul/sync" timeutil "github.com/allegro/marathon-consul/time" "github.com/allegro/marathon-consul/web" @@ -108,6 +109,10 @@ func TestConfig_ShouldBeMergedWithFileDefaultsAndFlags(t *testing.T) { QueueSize: 1000, WorkersCount: 10, MaxEventSize: 4096, + Enabled: true, + }, + SSE: sse.Config{ + Enabled: false, }, Sync: sync.Config{ Interval: timeutil.Interval{Duration: 15 * time.Minute}, diff --git a/debian/config.json b/debian/config.json index 04e4dc2..47485f3 100644 --- a/debian/config.json +++ b/debian/config.json @@ -22,7 +22,11 @@ "Listen": ":4000", "QueueSize": 1000, "WorkersCount": 10, - "MaxEventSize": 4096 + "MaxEventSize": 4096, + "Enabled": true + }, + "SSE": { + "Enabled": false }, "Sync": { "Enabled": true, @@ -55,4 +59,4 @@ "Level": "error" } } -} \ No newline at end of file +} diff --git a/events/event_handler.go b/events/event_handler.go index bfbd4a5..30569f2 100644 --- a/events/event_handler.go +++ b/events/event_handler.go @@ -102,6 +102,8 @@ func (fh *eventHandler) handleHealthyTask(body []byte) error { log.WithError(err).Error("Body generated error") return err } + delay := taskHealthChange.Timestamp.Delay() + metrics.UpdateGauge("events.read.delay.current", int64(delay)) appID := taskHealthChange.AppID taskID := taskHealthChange.TaskID() @@ -146,11 +148,12 @@ func (fh *eventHandler) handleHealthyTask(body []byte) error { func (fh *eventHandler) handleStatusEvent(body []byte) error { task, err := apps.ParseTask(body) - if err != nil { log.WithError(err).WithField("Body", body).Error("Could not parse event body") return err } + delay := task.Timestamp.Delay() + metrics.UpdateGauge("events.read.delay.current", int64(delay)) log.WithFields(log.Fields{ "Id": task.ID, diff --git a/events/sse_events.go b/events/sse_events.go new file mode 100644 index 0000000..eba2589 --- /dev/null +++ b/events/sse_events.go @@ -0,0 +1,149 @@ +package events + +import ( + "bufio" + "bytes" + "fmt" + "io" +) + +// Event holds state of parsed fields from marathon EventStream +type SSEEvent struct { + Type string + Body []byte + ID string + Delay string + maxLineSize int64 +} + +var ( + lineFeed = []byte("\n") + colon = []byte{':'} + space = []byte{' '} +) + +func (e *SSEEvent) parseLine(line []byte) bool { + // https://www.w3.org/TR/2011/WD-eventsource-20110208/ + // Quote: Lines must be separated by either a U+000D CARRIAGE RETURN U+000A + // LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character, + // or a single U+000D CARRIAGE RETURN (CR) character. + + //If the line is empty (a blank line) + if len(line) == 0 || bytes.Compare(line, lineFeed) == 0 { + //Dispatch the event, as defined below. + return !e.isEmpty() + } + + //If the line starts with a U+003A COLON character (:) + if bytes.HasPrefix(line, colon) { + //Ignore the line. + return false + } + + var field string + var value []byte + //If the line contains a U+003A COLON character (:) + //Collect the characters on the line before the first U+003A COLON character (:), and let field be that string. + split := bytes.SplitN(line, colon, 2) + if len(split) == 2 { + field = string(split[0]) + //Collect the characters on the line after the first U+003A COLON character (:), and let value be that string. + //If value starts with a U+0020 SPACE character, remove it from value. + value = bytes.TrimPrefix(split[1], space) + } else { + //Otherwise, the string is not empty but does not contain a U+003A COLON character (:) + //Process the field using the steps described below, using the whole line as the field name, + //and the empty string as the field value. + field = string(line) + value = []byte{} + + } + stringValue := string(value) + //If the field name is + switch field { + case "event": + //Set the event name buffer to field value. + e.Type = stringValue + case "data": + //If the data buffer is not the empty string, + if len(value) != 0 { + //Append the field value to the data buffer, + //then append a single U+000A LINE FEED (LF) character to the data buffer. + e.Body = append(e.Body, value...) + e.Body = append(e.Body, '\n') + } + case "id": + //Set the last event ID buffer to the field value. + e.ID = stringValue + case "retry": + e.Delay = stringValue + // TODO consider reconnection delay + } + + return false +} + +func (e *SSEEvent) isEmpty() bool { + return e.Type == "" && e.Body == nil && e.ID == "" +} + +func (e *SSEEvent) String() string { + return fmt.Sprintf("Type: %s, Body: %s", e.Type, string(e.Body)) +} + +func ParseSSEEvent(scanner *bufio.Scanner) (SSEEvent, error) { + e := SSEEvent{} + + for dispatch := false; !dispatch; { + if !scanner.Scan() { + return e, io.EOF + } + line := scanner.Bytes() + dispatch = e.parseLine(line) + if err := scanner.Err(); err != nil { + return e, scanner.Err() + } + } + return e, nil +} + +// ScanLines is higtly inspired by the function of the same name from bufio package, +// but is sensitive to CR as line separator +func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + pos := lineTerminatorPosition(data) + if pos != 0 { + return pos + 1, dropCR(data[0:pos]), nil + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), dropCR(data), nil + } + // Request more data. + return 0, nil, nil +} + +func lineTerminatorPosition(data []byte) int { + // https://www.w3.org/TR/2011/WD-eventsource-20110208/ + // Quote: Lines must be separated by either a U+000D CARRIAGE RETURN U+000A + // LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character, + // or a single U+000D CARRIAGE RETURN (CR) character. + if i := bytes.IndexByte(data, '\n'); i >= 0 { + // We have a full newline-terminated line + return i + } else if i := bytes.IndexByte(data, '\r'); i >= 0 { + // We have a full CR terminated line + return i + } + return 0 +} + +// dropCR drops a terminal \r from the data. +func dropCR(data []byte) []byte { + if len(data) > 0 && data[len(data)-1] == '\r' { + return data[0 : len(data)-1] + } + return data +} diff --git a/events/sse_events_test.go b/events/sse_events_test.go new file mode 100644 index 0000000..3cbdf3a --- /dev/null +++ b/events/sse_events_test.go @@ -0,0 +1,269 @@ +package events + +import ( + "bufio" + "bytes" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEvent_IfEventIsEmptyReturnsFalse(t *testing.T) { + t.Parallel() + // given + event := &SSEEvent{ + Type: "status_update_event", + Body: []byte(`{"id": "simpleId"}`), + ID: "id", + } + // when + expected := false + actual := event.isEmpty() + // then + assert.Equal(t, expected, actual) +} + +func TestEvent_IfEventIsEmptyReturnsTrue(t *testing.T) { + t.Parallel() + // given + event := &SSEEvent{} + // when + expected := true + actual := event.isEmpty() + // then + assert.Equal(t, expected, actual) +} + +func TestParseLine_WhenStautsUpdateEventPassed(t *testing.T) { + t.Parallel() + // given + event := &SSEEvent{} + line0 := []byte("id: 0") + line1 := []byte("event: status_update_event") + line2 := []byte("data: testData") + expected0 := "0" + expected1 := "status_update_event" + expected2 := []byte("testData\n") + // when + event.parseLine(line0) + event.parseLine(line1) + event.parseLine(line2) + // then + assert.Equal(t, expected0, event.ID) + assert.Equal(t, expected1, event.Type) + assert.Equal(t, string(expected2), string(event.Body)) +} + +func TestParseLine_WhenGarbageIsProvidedBodyShouldBeNil(t *testing.T) { + t.Parallel() + // given + event := &SSEEvent{} + line := []byte("garbage data") + expectedBody := []byte(nil) + // when + _ = event.parseLine(line) + // then + assert.Equal(t, expectedBody, event.Body) +} + +func BenchmarkParseLine(b *testing.B) { + // given + longTestData := bytes.Repeat([]byte("testData"), 1) + longLine := append([]byte("data: "), longTestData...) + expectedEvent := &SSEEvent{Body: append(longTestData, []byte("\n")...)} + + var event *SSEEvent + for i := 0; i <= b.N; i++ { + event = &SSEEvent{} + event.parseLine(longLine) + } + + // then + assert.Equal(b, string(expectedEvent.Body), string(event.Body)) + +} + +var parseEventCases = []struct { + in string + expectedEvent SSEEvent +}{ + {": No Event", SSEEvent{}}, + {"event: status_update_event\ndata: testData\n", + SSEEvent{Type: "status_update_event", Body: []byte("testData\n")}, + }, + {"event: status_update_event\ndata: testData\ndummydata", + SSEEvent{Type: "status_update_event", Body: []byte("testData\n")}, + }, + {"event: status_update_event\ndata", + SSEEvent{Type: "status_update_event"}, + }, + {"event: status_update_event\ndata:\n", + SSEEvent{Type: "status_update_event"}, + }, + {"event: some_event\ndata: abc\ndata: def", + SSEEvent{Type: "some_event", Body: []byte("abc\ndef\n")}, + }, + {"event: some_event\ndata: aaa\ndata: ccc\ndata: 10", + SSEEvent{Type: "some_event", Body: []byte("aaa\nccc\n10\n")}, + }, + {"event: some_event\ndata: abc\nid: 12", + SSEEvent{Type: "some_event", Body: []byte("abc\n"), ID: "12"}, + }, + {"data: abc\n", + SSEEvent{Body: []byte("abc\n")}, + }, +} + +func TestParseSSEEvent_TestCases(t *testing.T) { + t.Parallel() + + for _, testCase := range parseEventCases { + reader := strings.NewReader(testCase.in) + sscanner := bufio.NewScanner(reader) + // when + actualEvent, _ := ParseSSEEvent(sscanner) + // then + assert.Equal(t, testCase.expectedEvent, actualEvent) + + } +} + +var parseEventMultipleDataCases = []struct { + in string + expectedEvents []SSEEvent +}{ + {"\n\n\n\n\n\n\n", + []SSEEvent{ + SSEEvent{}, + }, + }, + {"event: status_update_event\ndata: testData\n\nevent: some_event\ndata: someData", + []SSEEvent{ + SSEEvent{Type: "status_update_event", Body: []byte("testData\n")}, + SSEEvent{Type: "some_event", Body: []byte("someData\n")}, + }, + }, + {"event: status_update_event\ndata: testData\n\nid: 13\ndata: someData\n\nid: 14\ndata: abc\n\nid: 15\ndata: def\n", + []SSEEvent{ + SSEEvent{Type: "status_update_event", Body: []byte("testData\n")}, + SSEEvent{ID: "13", Body: []byte("someData\n")}, + SSEEvent{ID: "14", Body: []byte("abc\n")}, + SSEEvent{ID: "15", Body: []byte("def\n")}, + }, + }, + {"data: testData\n\ndata: someData\n\ndata: abc\n\ndata: def\n", + []SSEEvent{ + SSEEvent{Body: []byte("testData\n")}, + SSEEvent{Body: []byte("someData\n")}, + SSEEvent{Body: []byte("abc\n")}, + SSEEvent{Body: []byte("def\n")}, + }, + }, + {"data: testData\nretry: 10\ndummy: dummy field\n\ndata: someData\n\ndata: abc\n\ndata: def\n", + []SSEEvent{ + SSEEvent{Body: []byte("testData\n"), Delay: "10"}, + SSEEvent{Body: []byte("someData\n")}, + SSEEvent{Body: []byte("abc\n")}, + SSEEvent{Body: []byte("def\n")}, + }, + }, +} + +func TestParseSSEEvent_MultipleDataTestCases(t *testing.T) { + t.Parallel() + + for _, testCase := range parseEventMultipleDataCases { + + reader := strings.NewReader(testCase.in) + sscanner := bufio.NewScanner(reader) + + var err error + var actualEvent SSEEvent + for i := 0; len(testCase.expectedEvents) > i && err != fmt.Errorf("EOF"); i++ { + // when + actualEvent, err = ParseSSEEvent(sscanner) + // then + assert.Equal(t, testCase.expectedEvents[i], actualEvent) + } + + } +} + +func BenchmarkParseEvent(b *testing.B) { + var event SSEEvent + expectedEvent := SSEEvent{ + Type: "some event type", + ID: "1", + Body: []byte("some data\nnext data\n"), + Delay: "10", + } + + for i := 0; i <= b.N; i++ { + reader := strings.NewReader("event: some event type\nid: 1\ndata: some data\ndata: next data\nretry: 10\n") + sscanner := bufio.NewScanner(reader) + event, _ = ParseSSEEvent(sscanner) + } + + assert.Equal(b, expectedEvent, event) + +} + +func TestParseSSEEvent_WhenVeryLongLineIsOnStream(t *testing.T) { + t.Parallel() + // given + veryLongEventName := strings.Repeat("a", 1016) + veryLongLine := fmt.Sprintf("event: %s\n", veryLongEventName) + + sreader := strings.NewReader(veryLongLine) + sscanner := bufio.NewScanner(sreader) + expectedEventType := veryLongEventName + // when + event, _ := ParseSSEEvent(sscanner) + // then + assert.Equal(t, expectedEventType, event.Type) +} + +func TestParseSSEEvent_WhenVeryLongLineIsLongerThanMaxLineSize(t *testing.T) { + t.Parallel() + // given + veryLongEventName := strings.Repeat("a", 10240) + veryLongLine := fmt.Sprintf("event: %s\n\n", veryLongEventName) + sreader := strings.NewReader(veryLongLine) + sscanner := bufio.NewScanner(sreader) + buffer := make([]byte, 1024) + sscanner.Buffer(buffer, cap(buffer)) + expectedEventType := "" + // when + event, _ := ParseSSEEvent(sscanner) + // then + assert.Equal(t, expectedEventType, event.Type) +} + +var scanLineCases = []struct { + in []byte + atEOL bool + expectedAdvance int + expectedToken []byte +}{ + {[]byte("abcd\n"), false, 5, []byte("abcd")}, + {[]byte("abcd\r"), false, 5, []byte("abcd")}, + {[]byte("abcd\r\n"), false, 6, []byte("abcd")}, + {[]byte("abcd"), false, 0, []byte(nil)}, + {[]byte("abcd"), true, 4, []byte("abcd")}, + {[]byte("abcd\n"), true, 5, []byte("abcd")}, +} + +func TestScanLine_TestCases(t *testing.T) { + t.Parallel() + for _, testCase := range scanLineCases { + // when + advance, token, err := ScanLines(testCase.in, testCase.atEOL) + // then + require.NoError(t, err) + assert.Equal(t, testCase.expectedAdvance, advance) + assert.Equal(t, testCase.expectedToken, token) + } +} diff --git a/events/task_health_change.go b/events/task_health_change.go index 6f79243..496c444 100644 --- a/events/task_health_change.go +++ b/events/task_health_change.go @@ -6,10 +6,11 @@ import ( "regexp" "github.com/allegro/marathon-consul/apps" + "github.com/allegro/marathon-consul/time" ) type TaskHealthChange struct { - Timestamp string `json:"timestamp"` + Timestamp time.Timestamp `json:"timestamp"` // Prefer TaskID() instead of ID ID apps.TaskID `json:"id"` InstanceID string `json:"instanceId"` diff --git a/events/task_health_change_test.go b/events/task_health_change_test.go index 284c23d..ae00475 100644 --- a/events/task_health_change_test.go +++ b/events/task_health_change_test.go @@ -3,12 +3,22 @@ package events import ( "encoding/json" "testing" + gotime "time" + "github.com/allegro/marathon-consul/time" "github.com/stretchr/testify/assert" ) +func StringToTimestamp(date string) time.Timestamp { + t, err := gotime.Parse(gotime.RFC3339Nano, date) + if err != nil { + return time.Timestamp{Time: gotime.Time{}} + } + return time.Timestamp{Time: t} +} + var testHealthChange = &TaskHealthChange{ - Timestamp: "2014-03-01T23:29:30.158Z", + Timestamp: StringToTimestamp("2014-03-01T23:29:30.158Z"), ID: "my-app_0-1396592784349", Alive: true, AppID: "/my-app", diff --git a/events/web_events.go b/events/web_events.go index a2ddb30..5ab3ce3 100644 --- a/events/web_events.go +++ b/events/web_events.go @@ -3,31 +3,13 @@ package events import ( "encoding/json" "errors" - "strings" - "time" -) - -type Timestamp struct { - time.Time -} - -func (t *Timestamp) UnmarshalJSON(b []byte) (err error) { - s := strings.Trim(string(b), "\"") - if s == "null" { - t.Time = time.Time{} - return - } - t.Time, err = time.Parse(time.RFC3339Nano, s) - return -} -func (t *Timestamp) String() string { - return t.Format(time.RFC3339Nano) -} + "github.com/allegro/marathon-consul/time" +) type WebEvent struct { - Type string `json:"eventType"` - Timestamp Timestamp `json:"timestamp"` + Type string `json:"eventType"` + Timestamp time.Timestamp `json:"timestamp"` } func ParseEvent(jsonBlob []byte) (WebEvent, error) { @@ -37,7 +19,7 @@ func ParseEvent(jsonBlob []byte) (WebEvent, error) { return WebEvent{}, err } else if webEvent.Type == "" { return WebEvent{}, errors.New("Missing event type") - } else if webEvent.Timestamp.Unix() == (time.Time{}).Unix() { + } else if webEvent.Timestamp.Missing() { return WebEvent{}, errors.New("Missing timestamp") } return webEvent, nil diff --git a/main.go b/main.go index 9691718..3e25410 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "github.com/allegro/marathon-consul/marathon" "github.com/allegro/marathon-consul/metrics" "github.com/allegro/marathon-consul/sentry" + "github.com/allegro/marathon-consul/sse" "github.com/allegro/marathon-consul/sync" "github.com/allegro/marathon-consul/web" ) @@ -34,19 +35,26 @@ func main() { } consulInstance := consul.New(config.Consul) - remote, err := marathon.New(config.Marathon) + // TODO(tz) - move Leader from sync module to highest level config, access like config.Leader + remote, err := marathon.New(config.Marathon, config.Sync.Leader) if err != nil { log.Fatal(err.Error()) } sync.New(config.Sync, remote, consulInstance, consulInstance.AddAgentsFromApps).StartSyncServicesJob() - handler, stop := web.NewHandler(config.Web, remote, consulInstance) - defer stop() + if config.SSE.Enabled { + stopSSE := sse.NewHandler(config.SSE, config.Web, remote, consulInstance) + defer stopSSE() + } + + if config.Web.Enabled { + handler, stop := web.NewHandler(config.Web, remote, consulInstance) + defer stop() + http.HandleFunc("/events", handler) + } - // set up routes http.HandleFunc("/health", web.HealthHandler) - http.HandleFunc("/events", handler) log.WithField("Port", config.Web.Listen).Info("Listening") log.Fatal(http.ListenAndServe(config.Web.Listen, nil)) diff --git a/marathon/marathon.go b/marathon/marathon.go index 5730baf..7bf545e 100644 --- a/marathon/marathon.go +++ b/marathon/marathon.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strings" + "time" log "github.com/Sirupsen/logrus" "github.com/allegro/marathon-consul/apps" @@ -19,11 +20,14 @@ type Marathoner interface { App(apps.AppID) (*apps.App, error) Tasks(apps.AppID) ([]apps.Task, error) Leader() (string, error) + EventStream([]string, int, int) (*Streamer, error) + AmILeader() (bool, error) } type Marathon struct { Location string Protocol string + MyLeader string Auth *url.Userinfo client *http.Client } @@ -32,7 +36,7 @@ type LeaderResponse struct { Leader string `json:"leader"` } -func New(config Config) (*Marathon, error) { +func New(config Config, leader string) (*Marathon, error) { var auth *url.Userinfo if len(config.Username) == 0 && len(config.Password) == 0 { auth = nil @@ -45,10 +49,12 @@ func New(config Config) (*Marathon, error) { InsecureSkipVerify: !config.VerifySsl, }, } + // TODO(tz) - consider passing desiredEvents as config return &Marathon{ Location: config.Location, Protocol: config.Protocol, Auth: auth, + MyLeader: leader, client: &http.Client{ Transport: transport, Timeout: config.Timeout.Duration, @@ -59,7 +65,7 @@ func New(config Config) (*Marathon, error) { func (m Marathon) App(appID apps.AppID) (*apps.App, error) { log.WithField("Location", m.Location).Debug("Asking Marathon for " + appID) - body, err := m.get(m.urlWithQuery(fmt.Sprintf("/v2/apps/%s", appID), params{"embed": "apps.tasks"})) + body, err := m.get(m.urlWithQuery(fmt.Sprintf("/v2/apps/%s", appID), params{"embed": []string{"apps.tasks"}})) if err != nil { return nil, err } @@ -69,7 +75,7 @@ func (m Marathon) App(appID apps.AppID) (*apps.App, error) { func (m Marathon) ConsulApps() ([]*apps.App, error) { log.WithField("Location", m.Location).Debug("Asking Marathon for apps") - body, err := m.get(m.urlWithQuery("/v2/apps", params{"embed": "apps.tasks", "label": apps.MarathonConsulLabel})) + body, err := m.get(m.urlWithQuery("/v2/apps", params{"embed": []string{"apps.tasks"}, "label": []string{apps.MarathonConsulLabel}})) if err != nil { return nil, err } @@ -106,6 +112,53 @@ func (m Marathon) Leader() (string, error) { return leaderResponse.Leader, err } +// EventStream method creates Streamer handler which is configured based on marathon +// client and credentials. +func (m Marathon) EventStream(desiredEvents []string, retries, retryBackoff int) (*Streamer, error) { + subURL := m.urlWithQuery("/v2/events", params{"event_type": desiredEvents}) + + // Before creating actual streamer, this function blocks until configured leader for this (m) reciever is elected. + // When leaderPoll function successfully exit this instance of marathon-consul, + // consider itself as a new leader and initializes Streamer. + err := m.leaderPoll() + if err != nil { + log.WithError(err).Fatal("Leader poll failed. Check marathon and previous errors. Exiting") + } + + return &Streamer{ + subURL: subURL, + client: &http.Client{ + Transport: m.client.Transport, + }, + retries: retries, + retryBackoff: retryBackoff, + }, nil +} + +// leaderPoll just blocks until configured myleader is equal to +// leader returned from marathon (/v2/leader endpoint) +func (m Marathon) leaderPoll() error { + pollTicker := time.Tick(1 * time.Second) + retries := 5 + i := 0 + for range pollTicker { + leading, err := m.AmILeader() + if err != nil { + if i >= retries { + return fmt.Errorf("Failed to get a leader after %d retries", i) + } + i++ + log.WithError(err).Error("Error while getting leader") + continue + } + if leading { + return nil + } + log.Debug("I am not leader") + } + return nil +} + func (m Marathon) get(url string) ([]byte, error) { request, err := http.NewRequest("GET", url, nil) if err != nil { @@ -157,7 +210,7 @@ func (m Marathon) url(path string) string { return m.urlWithQuery(path, nil) } -type params map[string]string +type params map[string][]string func (m Marathon) urlWithQuery(path string, params params) string { marathon := url.URL{ @@ -167,9 +220,16 @@ func (m Marathon) urlWithQuery(path string, params params) string { Path: path, } query := marathon.Query() - for key, value := range params { - query.Add(key, value) + for key, values := range params { + for _, value := range values { + query.Add(key, value) + } } marathon.RawQuery = query.Encode() return marathon.String() } + +func (m Marathon) AmILeader() (bool, error) { + leader, err := m.Leader() + return m.MyLeader == leader, err +} diff --git a/marathon/marathon_stub.go b/marathon/marathon_stub.go index bfbf594..97f272b 100644 --- a/marathon/marathon_stub.go +++ b/marathon/marathon_stub.go @@ -42,6 +42,14 @@ func (m *MarathonerStub) Leader() (string, error) { return m.leader, nil } +func (m *MarathonerStub) EventStream([]string, int, int) (*Streamer, error) { + return &Streamer{}, nil +} + +func (m *MarathonerStub) AmILeader() (bool, error) { + return false, nil +} + func (m *MarathonerStub) Interactions() bool { m.interactionsMu.RLock() defer m.interactionsMu.RUnlock() diff --git a/marathon/marathon_test.go b/marathon/marathon_test.go index 4ccf03a..8d7ae74 100644 --- a/marathon/marathon_test.go +++ b/marathon/marathon_test.go @@ -17,7 +17,7 @@ func TestMarathon_AppsWhenMarathonReturnEmptyList(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when apps, err := m.ConsulApps() @@ -29,7 +29,7 @@ func TestMarathon_AppsWhenMarathonReturnEmptyList(t *testing.T) { func TestMarathon_AppsWhenConfigIsWrong(t *testing.T) { t.Parallel() // given - m, _ := New(Config{Location: "not::valid/location", Protocol: "HTTP"}) + m, _ := New(Config{Location: "not::valid/location", Protocol: "HTTP"}, "") // when apps, err := m.ConsulApps() //then @@ -43,7 +43,7 @@ func TestMarathon_AppsWhenServerIsNotResponding(t *testing.T) { } t.Parallel() // given - m, _ := New(Config{Location: "unknown:22", Protocol: "HTTP"}) + m, _ := New(Config{Location: "unknown:22", Protocol: "HTTP"}, "") // when apps, err := m.ConsulApps() //then @@ -62,7 +62,7 @@ func TestMarathon_AppsWhenMarathonConnectionFailedShouldNotRetry(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when apps, err := m.ConsulApps() @@ -83,7 +83,7 @@ func TestMarathon_TasksWhenMarathonConnectionFailedShouldNotRetry(t *testing.T) defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when tasks, err := m.Tasks("/app/id") @@ -104,7 +104,7 @@ func TestMarathon_AppWhenMarathonConnectionFailedShouldNotRetry(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when app, err := m.App("/app/id") @@ -121,7 +121,7 @@ func TestMarathon_AppsWhenMarathonReturnEmptyResponse(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when apps, err := m.ConsulApps() @@ -137,7 +137,7 @@ func TestMarathon_AppsWhenMarathonReturnMalformedJsonResponse(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when app, err := m.App("/test/app") @@ -153,7 +153,7 @@ func TestMarathon_AppWhenMarathonReturnEmptyApp(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when app, err := m.App("/test/app") @@ -169,7 +169,7 @@ func TestMarathon_AppWhenMarathonReturnEmptyResponse(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when app, err := m.App("/test/app") @@ -185,7 +185,7 @@ func TestMarathon_AppWhenMarathonReturnMalformedJsonResponse(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when apps, err := m.ConsulApps() @@ -207,7 +207,7 @@ func TestMarathon_TasksWhenMarathonReturnEmptyList(t *testing.T) { }]}`) defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when tasks, err := m.Tasks("//test/app") @@ -222,7 +222,7 @@ func TestMarathon_TasksWhenMarathonReturnEmptyResponse(t *testing.T) { server, transport := stubServer("/v2/apps/test/app/tasks", ``) defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when tasks, err := m.Tasks("/test/app") @@ -237,7 +237,7 @@ func TestMarathon_TasksWhenMarathonReturnMalformedJsonResponse(t *testing.T) { server, transport := stubServer("/v2/apps/test/app/tasks", ``) defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when tasks, err := m.Tasks("/test/app") @@ -251,7 +251,7 @@ func TestConfig_transport(t *testing.T) { // given config := Config{VerifySsl: false} // when - marathon, _ := New(config) + marathon, _ := New(config, "") // then transport, ok := marathon.client.Transport.(*http.Transport) assert.True(t, ok) @@ -263,7 +263,7 @@ func TestUrl_WithoutAuth(t *testing.T) { // given config := Config{Location: "localhost:8080", Protocol: "http"} // when - m, _ := New(config) + m, _ := New(config, "") // then assert.Equal(t, "http://localhost:8080/v2/apps", m.url("/v2/apps")) } @@ -273,7 +273,7 @@ func TestUrl_WithAuth(t *testing.T) { // given config := Config{Location: "localhost:8080", Protocol: "http", Username: "peter", Password: "parker"} // when - m, _ := New(config) + m, _ := New(config, "") // then assert.Equal(t, "http://peter:parker@localhost:8080/v2/apps", m.url("/v2/apps")) } @@ -285,7 +285,7 @@ func TestLeader_SuccessfulResponse(t *testing.T) { server, transport := stubServer("/v2/leader", `{"leader": "some.leader.host:8081"}`) defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when @@ -303,7 +303,7 @@ func TestLeader_ErrorOnMalformedJsonResponse(t *testing.T) { server, transport := stubServer("/v2/leader", "{") defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when @@ -325,7 +325,7 @@ func TestLeader_NotRetryOnFailingResponse(t *testing.T) { }) defer server.Close() url, _ := url.Parse(server.URL) - m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "") m.client.Transport = transport // when @@ -337,6 +337,77 @@ func TestLeader_NotRetryOnFailingResponse(t *testing.T) { assert.Empty(t, leader) } +func TestLeaderPoll_PassingRunningOnLeader(t *testing.T) { + t.Parallel() + + // given + server, transport := stubServer("/v2/leader", `{"leader": "this.leader:8080"}`) + defer server.Close() + url, _ := url.Parse(server.URL) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "this.leader:8080") + m.client.Transport = transport + + // when + err := m.leaderPoll() + + //then + assert.NoError(t, err) +} + +func TestAMILeader_PassingRunningOnLeader(t *testing.T) { + t.Parallel() + + // given + server, transport := stubServer("/v2/leader", `{"leader": "this.leader:8080"}`) + defer server.Close() + url, _ := url.Parse(server.URL) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "this.leader:8080") + m.client.Transport = transport + + // when + leading, err := m.AmILeader() + + //then + assert.True(t, leading) + assert.NoError(t, err) +} + +func TestAMILeader_NotPassingNotRunningOnLeader(t *testing.T) { + t.Parallel() + + // given + server, transport := stubServer("/v2/leader", `{"leader": "other.leader:8080"}`) + defer server.Close() + url, _ := url.Parse(server.URL) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "this.leader:8080") + m.client.Transport = transport + + // when + leading, err := m.AmILeader() + + //then + assert.False(t, leading) + assert.NoError(t, err) +} + +func TestEventStream_PassingStreamerCreated(t *testing.T) { + t.Parallel() + + // given + server, transport := stubServer("/v2/leader", `{"leader": "this.leader:8080"}`) + defer server.Close() + url, _ := url.Parse(server.URL) + m, _ := New(Config{Location: url.Host, Protocol: "HTTP"}, "this.leader:8080") + m.client.Transport = transport + + // when + streamer, err := m.EventStream([]string{}, 1, 1) + + //then + assert.NoError(t, err) + assert.IsType(t, &Streamer{}, streamer) +} + // http://keighl.com/post/mocking-http-responses-in-golang/ func stubServer(uri string, body string) (*httptest.Server, *http.Transport) { return mockServer(func(w http.ResponseWriter, r *http.Request) { diff --git a/marathon/streamer.go b/marathon/streamer.go new file mode 100644 index 0000000..2d1e5c1 --- /dev/null +++ b/marathon/streamer.go @@ -0,0 +1,75 @@ +package marathon + +import ( + "bufio" + "context" + "fmt" + "net/http" + "time" + + log "github.com/Sirupsen/logrus" +) + +type Streamer struct { + Scanner *bufio.Scanner + cancel context.CancelFunc + client *http.Client + subURL string + retries int + retryBackoff int + noRecover bool +} + +func (s *Streamer) stop() { + s.cancel() +} +func (s *Streamer) Stop() { + s.noRecover = true + s.stop() +} + +func (s *Streamer) Start() error { + req, err := http.NewRequest("GET", s.subURL, nil) + if err != nil { + log.Fatal("Unable to create Streamer request") + return nil + } + req.Header.Set("Accept", "text/event-stream") + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + req = req.WithContext(ctx) + res, err := s.client.Do(req) + if err != nil { + s.cancel() + return err + } + if res.StatusCode != http.StatusOK { + log.WithFields(log.Fields{ + "Location": s.subURL, + "Method": "GET", + }).Errorf("Got status code : %d", res.StatusCode) + } + log.WithFields(log.Fields{ + "SubUrl": s.subURL, + "Method": "GET", + }).Debug("Subsciption success") + s.Scanner = bufio.NewScanner(res.Body) + + return nil +} + +func (s *Streamer) Recover() error { + if s.noRecover { + return fmt.Errorf("Streamer is not recoverable") + } + s.stop() + + err := s.Start() + i := 0 + for ; err != nil && i <= s.retries; err = s.Start() { + seconds := time.Duration(i * s.retryBackoff) + time.Sleep(seconds * time.Second) + i++ + } + return err +} diff --git a/sse/config.go b/sse/config.go new file mode 100644 index 0000000..3de3ab2 --- /dev/null +++ b/sse/config.go @@ -0,0 +1,7 @@ +package sse + +type Config struct { + Enabled bool + Retries int + RetryBackoff int +} diff --git a/sse/sse.go b/sse/sse.go new file mode 100644 index 0000000..94f3909 --- /dev/null +++ b/sse/sse.go @@ -0,0 +1,72 @@ +package sse + +import ( + "net/http" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/allegro/marathon-consul/events" + "github.com/allegro/marathon-consul/marathon" + "github.com/allegro/marathon-consul/service" + "github.com/allegro/marathon-consul/web" +) + +type Stop func() +type Handler func(w http.ResponseWriter, r *http.Request) + +func NewHandler(config Config, webConfig web.Config, marathon marathon.Marathoner, serviceOperations service.ServiceRegistry) Stop { + stopChannels := make([]chan<- events.StopEvent, webConfig.WorkersCount, webConfig.WorkersCount) + eventQueue := make(chan events.Event, webConfig.QueueSize) + for i := 0; i < webConfig.WorkersCount; i++ { + handler := events.NewEventHandler(i, serviceOperations, marathon, eventQueue) + stopChannels[i] = handler.Start() + } + + sse := newSSEHandler(eventQueue, marathon, webConfig.MaxEventSize, config) + dispatcherStop := sse.start() + + guardQuit := leaderGuard(sse.Streamer, marathon) + stopChannels = append(stopChannels, dispatcherStop, guardQuit) + + return stop(stopChannels) +} + +func stop(channels []chan<- events.StopEvent) Stop { + return func() { + for _, channel := range channels { + channel <- events.StopEvent{} + } + } +} + +// leaderGuard is a watchdog goroutine, +// periodically checks if leader has changed +// if change is detected, passed streamer is stopped - unable to recover +// if this goroutine is quited, agent is stopped - unable to recover +func leaderGuard(s *marathon.Streamer, m marathon.Marathoner) chan<- events.StopEvent { + // TODO(tz) - consider launching this goroutine from marathon, + // no need to pass marathon reciever then ?? + quit := make(chan events.StopEvent) + + go func() { + ticker := time.Tick(5 * time.Second) + for { + select { + case <-ticker: + if iAMLeader, err := m.AmILeader(); !iAMLeader && err != nil { + // Leader changed, not revocerable. + s.Stop() + log.Error("Tearing down SSE stream, marathon leader changed.") + return + } else if err != nil { + log.WithError(err).Error("Leader Guard error while checking leader.") + } + case <-quit: + log.Info("Recieved quit notification. Quit checker") + s.Stop() + return + } + } + }() + return quit +} diff --git a/sse/sse_handler.go b/sse/sse_handler.go new file mode 100644 index 0000000..659a3f0 --- /dev/null +++ b/sse/sse_handler.go @@ -0,0 +1,111 @@ +package sse + +import ( + "context" + "io" + "net/http" + "time" + + "github.com/allegro/marathon-consul/events" + "github.com/allegro/marathon-consul/marathon" + "github.com/allegro/marathon-consul/metrics" + + log "github.com/Sirupsen/logrus" +) + +// SSEHandler defines handler for marathon event stream, opening and closing +// subscription +type SSEHandler struct { + config Config + eventQueue chan events.Event + loc string + client *http.Client + close context.CancelFunc + req *http.Request + Streamer *marathon.Streamer + maxLineSize int64 +} + +func newSSEHandler(eventQueue chan events.Event, service marathon.Marathoner, maxLineSize int64, config Config) *SSEHandler { + + streamer, err := service.EventStream( + []string{events.StatusUpdateEventType, events.HealthStatusChangedEventType}, + config.Retries, + config.RetryBackoff, + ) + if err != nil { + log.WithError(err).Fatal("Unable to start Streamer") + } + + return &SSEHandler{ + config: config, + eventQueue: eventQueue, + Streamer: streamer, + maxLineSize: maxLineSize, + } +} + +// Open connection to marathon v2/events +func (h *SSEHandler) start() chan<- events.StopEvent { + stopChan := make(chan events.StopEvent) + go func() { + <-stopChan + h.stop() + }() + + go func() { + defer h.stop() + + err := h.Streamer.Start() + if err != nil { + log.WithError(err).Error("Unable to start streamer") + } + // buffer used for token storage, + // if token is greater than buffer, empty token is stored + buffer := make([]byte, h.maxLineSize) + // configure streamer scanner :) + h.Streamer.Scanner.Buffer(buffer, cap(buffer)) + h.Streamer.Scanner.Split(events.ScanLines) + for { + metrics.Time("events.read", func() { h.handle() }) + } + }() + return stopChan +} + +func (h *SSEHandler) handle() { + e, err := events.ParseSSEEvent(h.Streamer.Scanner) + if err != nil { + if err == io.EOF { + // Event could be partial at this point + h.enqueueEvent(e) + } + log.WithError(err).Error("Error when parsing the event") + err = h.Streamer.Recover() + if err != nil { + log.WithError(err).Fatalf("Unable to recover streamer") + } + } + metrics.Mark("events.read." + e.Type) + if e.Type != events.StatusUpdateEventType && e.Type != events.HealthStatusChangedEventType { + log.Debugf("%s is not supported", e.Type) + metrics.Mark("events.read.drop") + return + } + h.enqueueEvent(e) +} + +func (h *SSEHandler) enqueueEvent(e events.SSEEvent) { + select { + case h.eventQueue <- events.Event{Timestamp: time.Now(), EventType: e.Type, Body: e.Body}: + metrics.Mark("events.read.accept") + default: + log.Error("Events queue full. Dropping the event") + metrics.Mark("events.read.drop") + } +} + +// Close connections managed by context +func (h *SSEHandler) stop() { + h.Streamer.Stop() +} diff --git a/sync/error_marathon_stub_test.go b/sync/error_marathon_stub_test.go index 9be9cb2..ed06fec 100644 --- a/sync/error_marathon_stub_test.go +++ b/sync/error_marathon_stub_test.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/allegro/marathon-consul/apps" + "github.com/allegro/marathon-consul/marathon" ) type errorMarathon struct { @@ -24,3 +25,11 @@ func (m errorMarathon) Tasks(appID apps.AppID) ([]apps.Task, error) { func (m errorMarathon) Leader() (string, error) { return "", errors.New("Error") } + +func (m errorMarathon) EventStream([]string, int, int) (*marathon.Streamer, error) { + return &marathon.Streamer{}, errors.New("Error") +} + +func (m errorMarathon) AmILeader() (bool, error) { + return false, errors.New("Error") +} diff --git a/sync/sync.go b/sync/sync.go index e4fa180..899855a 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -223,4 +223,4 @@ func marathonTaskIdsSet(marathonApps []*apps.App) map[apps.TaskID]struct{} { } } return tasksSet -} \ No newline at end of file +} diff --git a/time/time.go b/time/time.go index 4c4009b..c81fede 100644 --- a/time/time.go +++ b/time/time.go @@ -25,10 +25,18 @@ func (t Timestamp) MarshalJSON() ([]byte, error) { return json.Marshal(t.String()) } +func (t *Timestamp) Delay() time.Duration { + return time.Now().Sub(t.Time) +} + func (t *Timestamp) String() string { return t.Format(time.RFC3339Nano) } +func (t *Timestamp) Missing() bool { + return t.Unix() == (time.Time{}).Unix() +} + type Interval struct { time.Duration } diff --git a/web/config.go b/web/config.go index acdcb38..ac7cadb 100644 --- a/web/config.go +++ b/web/config.go @@ -5,4 +5,5 @@ type Config struct { QueueSize int WorkersCount int MaxEventSize int64 + Enabled bool } diff --git a/web/web_handler.go b/web/web_handler.go index 88febfb..07ff4f2 100644 --- a/web/web_handler.go +++ b/web/web_handler.go @@ -50,8 +50,6 @@ func (h *EventHandler) Handle(w http.ResponseWriter, r *http.Request) { } metrics.Mark("events.requests." + e.Type) - delay := time.Now().Unix() - e.Timestamp.Unix() - metrics.UpdateGauge("events.requests.delay.current", delay) log.WithFields(log.Fields{"EventType": e.Type, "OriginalTimestamp": e.Timestamp.String()}).Debug("Received event") if e.Type != events.StatusUpdateEventType && e.Type != events.HealthStatusChangedEventType {