diff --git a/instrumentation/cloud.google.com/go/pubsub/push_test.go b/instrumentation/cloud.google.com/go/pubsub/push_test.go index 49f97738c..011e616e0 100644 --- a/instrumentation/cloud.google.com/go/pubsub/push_test.go +++ b/instrumentation/cloud.google.com/go/pubsub/push_test.go @@ -6,6 +6,8 @@ package pubsub_test import ( "bytes" "context" + "errors" + "fmt" "io" "net/http" "net/http/httptest" @@ -190,3 +192,186 @@ func (alwaysReadyClient) SendEvent(event *instana.EventData) error { re func (alwaysReadyClient) SendSpans(spans []instana.Span) error { return nil } func (alwaysReadyClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } func (alwaysReadyClient) Flush(context.Context) error { return nil } + +// errorReader is a custom io.Reader that always returns an error +type errorReader struct{} + +func (errorReader) Read(p []byte) (n int, err error) { + return 0, errors.New("forced read error") +} + +// TestTracingHandlerFunc_ReadBodyError tests error handling when reading request body fails +func TestTracingHandlerFunc_ReadBodyError(t *testing.T) { + recorder := instana.NewTestRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: alwaysReadyClient{}, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + var numCalls int + h := pubsub.TracingHandlerFunc(c, "/", func(w http.ResponseWriter, req *http.Request) { + numCalls++ + }) + + rec := httptest.NewRecorder() + + // Create a request with a body that will fail to read + req := httptest.NewRequest(http.MethodPost, "/", errorReader{}) + + h(rec, req) + + // Verify that the handler returned an error and didn't call the wrapped handler + assert.Equal(t, http.StatusInternalServerError, rec.Result().StatusCode) + assert.Equal(t, 0, numCalls) +} + +// TestTracingHandlerFunc_InvalidJSON tests handling of malformed JSON messages +func TestTracingHandlerFunc_InvalidJSON(t *testing.T) { + recorder := instana.NewTestRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: alwaysReadyClient{}, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + var numCalls int + h := pubsub.TracingHandlerFunc(c, "/", func(w http.ResponseWriter, req *http.Request) { + numCalls++ + }) + + rec := httptest.NewRecorder() + + // Create a request with invalid JSON + invalidJSON := []byte(`{"message": "this is not valid pubsub message format"}`) + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(invalidJSON)) + + h(rec, req) + + // Verify that the handler falls back to regular HTTP tracing + assert.Equal(t, http.StatusOK, rec.Result().StatusCode) + assert.Equal(t, 1, numCalls) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 1) + assert.Equal(t, "g.http", spans[0].Name) // Regular HTTP span, not PubSub span +} + +// TestTracingHandlerFunc_InvalidSubscriptionFormat tests handling of invalid subscription formats +func TestTracingHandlerFunc_InvalidSubscriptionFormat(t *testing.T) { + recorder := instana.NewTestRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: alwaysReadyClient{}, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + var numCalls int + h := pubsub.TracingHandlerFunc(c, "/", func(w http.ResponseWriter, req *http.Request) { + numCalls++ + }) + + rec := httptest.NewRecorder() + + // Create a request with invalid subscription format + invalidSubscription := []byte(`{ + "message": { + "attributes": {}, + "messageId": "136969346945" + }, + "subscription": "invalid-subscription-format" + }`) + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(invalidSubscription)) + + h(rec, req) + + // Verify that the handler falls back to regular HTTP tracing + assert.Equal(t, http.StatusOK, rec.Result().StatusCode) + assert.Equal(t, 1, numCalls) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 1) + assert.Equal(t, "g.http", spans[0].Name) // Regular HTTP span, not PubSub span +} + +// TestParsePathResourceIDExported tests the parsePathResourceID function indirectly +// by creating test cases that will exercise different paths in the subscription parsing logic +func TestParsePathResourceIDExported(t *testing.T) { + recorder := instana.NewTestRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: alwaysReadyClient{}, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + tests := []struct { + name string + subscription string + expectSuccess bool + }{ + { + name: "Valid subscription format", + subscription: "projects/myproject/subscriptions/mysubscription", + expectSuccess: true, + }, + { + name: "Missing projects prefix", + subscription: "notprojects/myproject/subscriptions/mysubscription", + expectSuccess: false, + }, + { + name: "Missing subscriptions part", + subscription: "projects/myproject/notsubscriptions/mysubscription", + expectSuccess: false, + }, + { + name: "Empty string", + subscription: "", + expectSuccess: false, + }, + { + name: "Just projects prefix", + subscription: "projects/", + expectSuccess: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // We'll test the parsePathResourceID function indirectly through startConsumePushSpan + payload := []byte(fmt.Sprintf(`{ + "message": { + "attributes": {}, + "messageId": "136969346945" + }, + "subscription": "%s" + }`, tt.subscription)) + + var numCalls int + h := pubsub.TracingHandlerFunc(c, "/", func(w http.ResponseWriter, req *http.Request) { + numCalls++ + }) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(payload)) + + h(rec, req) + + if tt.expectSuccess { + // Should create a PubSub span + assert.Equal(t, 1, numCalls) + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 1) + assert.Equal(t, "gcps", spans[0].Name) + } else { + // Should fall back to HTTP span + assert.Equal(t, 1, numCalls) + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 1) + assert.Equal(t, "g.http", spans[0].Name) + } + + // Create a new recorder for each test case instead of resetting + }) + } +}