diff --git a/Makefile b/Makefile index dea9bcc..250d1b4 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,9 @@ all: mac linux simple-responder clean: rm -rf $(BUILD_DIR) +test: + go test -v ./proxy + # Build OSX binary mac: @echo "Building Mac binary..." diff --git a/proxy/logMonitor.go b/proxy/logMonitor.go index ff163a0..130b721 100644 --- a/proxy/logMonitor.go +++ b/proxy/logMonitor.go @@ -2,66 +2,73 @@ package proxy import ( "container/ring" + "io" "os" "sync" ) type LogMonitor struct { - clients map[chan string]bool + clients map[chan []byte]bool mu sync.RWMutex buffer *ring.Ring bufferMu sync.RWMutex + + // typically this can be os.Stdout + stdout io.Writer } func NewLogMonitor() *LogMonitor { + return NewLogMonitorWriter(os.Stdout) +} + +func NewLogMonitorWriter(stdout io.Writer) *LogMonitor { return &LogMonitor{ - clients: make(map[chan string]bool), + clients: make(map[chan []byte]bool), buffer: ring.New(10 * 1024), // keep 10KB of buffered logs + stdout: stdout, } } func (w *LogMonitor) Write(p []byte) (n int, err error) { - n, err = os.Stdout.Write(p) + n, err = w.stdout.Write(p) if err != nil { return n, err } - content := string(p) - w.bufferMu.Lock() - w.buffer.Value = content + w.buffer.Value = p w.buffer = w.buffer.Next() w.bufferMu.Unlock() - w.Broadcast(content) + w.broadcast(p) return n, nil } -func (w *LogMonitor) getHistory() string { +func (w *LogMonitor) GetHistory() []byte { w.bufferMu.RLock() defer w.bufferMu.RUnlock() - var history string + var history []byte w.buffer.Do(func(p interface{}) { if p != nil { - if content, ok := p.(string); ok { - history += content + if content, ok := p.([]byte); ok { + history = append(history, content...) } } }) return history } -func (w *LogMonitor) Subscribe() chan string { +func (w *LogMonitor) Subscribe() chan []byte { w.mu.Lock() defer w.mu.Unlock() - ch := make(chan string, 100) + ch := make(chan []byte, 100) w.clients[ch] = true return ch } -func (w *LogMonitor) Unsubscribe(ch chan string) { +func (w *LogMonitor) Unsubscribe(ch chan []byte) { w.mu.Lock() defer w.mu.Unlock() @@ -69,7 +76,7 @@ func (w *LogMonitor) Unsubscribe(ch chan string) { close(ch) } -func (w *LogMonitor) Broadcast(msg string) { +func (w *LogMonitor) broadcast(msg []byte) { w.mu.RLock() defer w.mu.RUnlock() diff --git a/proxy/logMonitor_test.go b/proxy/logMonitor_test.go new file mode 100644 index 0000000..08f6d74 --- /dev/null +++ b/proxy/logMonitor_test.go @@ -0,0 +1,63 @@ +package proxy + +import ( + "io" + "sync" + "testing" +) + +func TestLogMonitor(t *testing.T) { + logMonitor := NewLogMonitorWriter(io.Discard) + + // Test subscription + client1 := logMonitor.Subscribe() + client2 := logMonitor.Subscribe() + + defer logMonitor.Unsubscribe(client1) + defer logMonitor.Unsubscribe(client2) + + client1Messages := make([]byte, 0) + client2Messages := make([]byte, 0) + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + for { + select { + case data := <-client1: + client1Messages = append(client1Messages, data...) + case data := <-client2: + client2Messages = append(client2Messages, data...) + default: + return + } + } + }() + + logMonitor.Write([]byte("1")) + logMonitor.Write([]byte("2")) + logMonitor.Write([]byte("3")) + + // Wait for the goroutine to finish + wg.Wait() + + // Check the buffer + expectedHistory := "123" + history := string(logMonitor.GetHistory()) + + if history != expectedHistory { + t.Errorf("Expected history: %s, got: %s", expectedHistory, history) + } + + c1Data := string(client1Messages) + if c1Data != expectedHistory { + t.Errorf("Client1 expected %s, got: %s", expectedHistory, c1Data) + } + + c2Data := string(client2Messages) + if c2Data != expectedHistory { + t.Errorf("Client2 expected %s, got: %s", expectedHistory, c2Data) + } +} diff --git a/proxy/manager.go b/proxy/manager.go index c3a5dbe..0107c51 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -61,9 +61,9 @@ func (pm *ProxyManager) streamLogs(w http.ResponseWriter, r *http.Request) { skipHistory := r.URL.Query().Has("skip") if !skipHistory { // Send history first - history := pm.logMonitor.getHistory() - if history != "" { - fmt.Fprint(w, history) + history := pm.logMonitor.GetHistory() + if len(history) != 0 { + w.Write(history) flusher.Flush() } } @@ -76,7 +76,7 @@ func (pm *ProxyManager) streamLogs(w http.ResponseWriter, r *http.Request) { for { select { case msg := <-ch: - fmt.Fprint(w, msg) + w.Write(msg) flusher.Flush() case <-notify: return