Skip to content

Commit

Permalink
feat: add cloudpubsub.NewHTTPHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
odsod committed Aug 5, 2021
1 parent 2655b8d commit f9f26b7
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 0 deletions.
61 changes: 61 additions & 0 deletions cloudpubsub/httphandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cloudpubsub

import (
"context"
"encoding/json"
"net/http"
"time"

"go.einride.tech/cloudrunner/cloudrequestlog"
"go.einride.tech/cloudrunner/cloudstatus"
"go.einride.tech/cloudrunner/cloudzap"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// NewHTTPHandler creates a new HTTP handler for Cloud Pub/Sub push messages.
// See: https://cloud.google.com/pubsub/docs/push
func NewHTTPHandler(fn func(context.Context, *pubsub.PubsubMessage) error) http.Handler {
return httpHandlerFn(fn)
}

type httpHandlerFn func(ctx context.Context, message *pubsub.PubsubMessage) error

func (fn httpHandlerFn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var payload struct {
Subscription string `json:"subscription"`
Message struct {
Attributes map[string]string `json:"attributes"`
Data []byte `json:"data"`
MessageID string `json:"messageId"`
PublishTime time.Time `json:"publishTime"`
} `json:"message"`
}
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
if fields, ok := cloudrequestlog.GetAdditionalFields(r.Context()); ok {
fields.Add(zap.Error(err))
}
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
pubsubMessage := pubsub.PubsubMessage{
Data: payload.Message.Data,
Attributes: payload.Message.Attributes,
MessageId: payload.Message.MessageID,
PublishTime: timestamppb.New(payload.Message.PublishTime),
}
if fields, ok := cloudrequestlog.GetAdditionalFields(r.Context()); ok {
fields.Add(cloudzap.ProtoMessage("pubsubMessage", &pubsubMessage))
}
if err := fn(r.Context(), &pubsubMessage); err != nil {
if fields, ok := cloudrequestlog.GetAdditionalFields(r.Context()); ok {
fields.Add(zap.Error(err))
}
code := status.Code(err)
httpStatus := cloudstatus.ToHTTP(code)
http.Error(w, http.StatusText(httpStatus), httpStatus)
return
}
}
55 changes: 55 additions & 0 deletions cloudpubsub/httphandler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cloudpubsub

import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"

"google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
"gotest.tools/v3/assert"
)

func TestNewHTTPHandler(t *testing.T) {
// From: https://cloud.google.com/pubsub/docs/push#receiving_messages
const example = `
{
"message": {
"attributes": {
"key": "value"
},
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "2070443601311540",
"message_id": "2070443601311540",
"publishTime": "2021-02-26T19:13:55.749Z",
"publish_time": "2021-02-26T19:13:55.749Z"
},
"subscription": "projects/myproject/subscriptions/mysubscription"
}
`
expectedMessage := &pubsub.PubsubMessage{
Data: []byte("Hello Cloud Pub/Sub! Here is my message!"),
Attributes: map[string]string{"key": "value"},
MessageId: "2070443601311540",
PublishTime: &timestamppb.Timestamp{
Seconds: 1614366835,
Nanos: 749000000,
},
}
var actualMessage *pubsub.PubsubMessage
fn := func(ctx context.Context, message *pubsub.PubsubMessage) error {
actualMessage = message
return nil
}
server := httptest.NewServer(NewHTTPHandler(fn))
defer server.Close()
request, err := http.NewRequest(http.MethodPost, server.URL, strings.NewReader(example))
assert.NilError(t, err)
response, err := http.DefaultClient.Do(request)
assert.NilError(t, err)
assert.Equal(t, http.StatusOK, response.StatusCode)
assert.DeepEqual(t, expectedMessage, actualMessage, protocmp.Transform())
}
52 changes: 52 additions & 0 deletions cloudstatus/code.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cloudstatus

import (
"net/http"

"google.golang.org/grpc/codes"
)

// ToHTTP converts a gRPC error code into the corresponding HTTP response status.
// See: https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
// From: https://github.com/grpc-ecosystem/grpc-gateway/blob/master/runtime/errors.go
func ToHTTP(code codes.Code) int {
switch code {
case codes.OK:
return http.StatusOK
case codes.Canceled:
return http.StatusRequestTimeout
case codes.Unknown:
return http.StatusInternalServerError
case codes.InvalidArgument:
return http.StatusBadRequest
case codes.DeadlineExceeded:
return http.StatusGatewayTimeout
case codes.NotFound:
return http.StatusNotFound
case codes.AlreadyExists:
return http.StatusConflict
case codes.PermissionDenied:
return http.StatusForbidden
case codes.Unauthenticated:
return http.StatusUnauthorized
case codes.ResourceExhausted:
return http.StatusTooManyRequests
case codes.FailedPrecondition:
// This deliberately doesn't translate to the similarly named '412 Precondition Failed' HTTP response status.
return http.StatusBadRequest
case codes.Aborted:
return http.StatusConflict
case codes.OutOfRange:
return http.StatusBadRequest
case codes.Unimplemented:
return http.StatusNotImplemented
case codes.Internal:
return http.StatusInternalServerError
case codes.Unavailable:
return http.StatusServiceUnavailable
case codes.DataLoss:
return http.StatusInternalServerError
default:
return http.StatusInternalServerError
}
}

0 comments on commit f9f26b7

Please sign in to comment.