Skip to content

Commit

Permalink
feat: Support publishing new log entries to Pub/Sub topics
Browse files Browse the repository at this point in the history
Adds initial support publishing new log entries to Pub/Sub topics. Interested parties can subscribe to the topic in order to receive notifications when new entries are added.

Signed-off-by: James Alseth <[email protected]>
  • Loading branch information
jalseth committed Jul 2, 2023
1 parent a1349da commit 9fd111e
Show file tree
Hide file tree
Showing 17 changed files with 536 additions and 20 deletions.
3 changes: 3 additions & 0 deletions Dockerfile.pubsub-emulator
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# gcloud sdk for pubsub emulator with netcat added for the startup health check
FROM google/cloud-sdk:437.0.0
RUN apt-get install -y netcat
2 changes: 2 additions & 0 deletions cmd/rekor-server/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func init() {
Memory and file-based signers should only be used for testing.`)
rootCmd.PersistentFlags().String("rekor_server.signer-passwd", "", "Password to decrypt signer private key")

rootCmd.PersistentFlags().String("rekor_server.new_entry_publisher", "", "URL for pub/sub queue to send messages to when new entries are added to the log. Ignored if not set.")

rootCmd.PersistentFlags().Uint16("port", 3000, "Port to bind to")

rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint")
Expand Down
3 changes: 0 additions & 3 deletions cmd/rekor-server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ var serveCmd = &cobra.Command{
Short: "start http server with configured api",
Long: `Starts a http server and serves the configured api`,
Run: func(cmd *cobra.Command, args []string) {

// Setup the logger to dev/prod
log.ConfigureLogger(viper.GetString("log_type"))

Expand All @@ -83,7 +82,6 @@ var serveCmd = &cobra.Command{
log.Logger.Error(err)
}
}()

//TODO: make this a config option for server to load via viper field
//TODO: add command line option to print versions supported in binary

Expand All @@ -101,7 +99,6 @@ var serveCmd = &cobra.Command{
hashedrekord.KIND: {hashedrekord_v001.APIVERSION},
dsse.KIND: {dsse_v001.APIVERSION},
}

for k, v := range pluggableTypeMap {
log.Logger.Infof("Loading support for pluggable type '%v'", k)
log.Logger.Infof("Loading version '%v' for pluggable type '%v'", v, k)
Expand Down
22 changes: 22 additions & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ services:
build:
context: .
target: "test"
environment:
PUBSUB_EMULATOR_HOST: gcp-pubsub-emulator:8085
command: [
"rekor-server",
"-test.coverprofile=rekor-server.cov",
Expand All @@ -32,7 +34,27 @@ services:
"--enable_attestation_storage",
"--attestation_storage_bucket=file:///var/run/attestations",
"--max_request_body_size=32792576",
"--rekor_server.new_entry_publisher=gcppubsub://projects/fake-test-project/topics/new-entry",
]
ports:
- "3000:3000"
- "2112:2112"
depends_on:
- gcp-pubsub-emulator
gcp-pubsub-emulator:
image: gcp-pubsub-emulator
ports:
- "8085:8085"
command:
- gcloud
- beta
- emulators
- pubsub
- start
- --project=fake-test-project
healthcheck:
test: ["CMD", "nc", "-zv", "localhost", "8085"]
interval: 10s
timeout: 3s
retries: 3
start_period: 10s
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,3 @@ services:
timeout: 3s
retries: 3
start_period: 5s

6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ module github.com/sigstore/rekor

go 1.19

// TEMPORARY: DO NOT MERGE WITH THIS
replace github.com/sigstore/protobuf-specs => ../../jalseth/protobuf-specs

require (
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
github.com/blang/semver v3.5.1+incompatible
Expand Down Expand Up @@ -51,6 +54,7 @@ require (
)

require (
cloud.google.com/go/pubsub v1.31.0
github.com/AdamKorcz/go-fuzz-headers-1 v0.0.0-20230329111138-12e09aba5ebd
github.com/cyberphone/json-canonicalization v0.0.0-20220623050100-57a0ce2678a7
github.com/go-redis/redismock/v9 v9.0.3
Expand Down Expand Up @@ -180,7 +184,7 @@ require (
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.128.0 // indirect
google.golang.org/api v0.128.0
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ cloud.google.com/go/pubsub v1.26.0/go.mod h1:QgBH3U/jdJy/ftjPhTkyXNj543Tin1pRYcd
cloud.google.com/go/pubsub v1.27.1/go.mod h1:hQN39ymbV9geqBnfQq6Xf63yNhUAhv9CZhzp5O6qsW0=
cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8=
cloud.google.com/go/pubsub v1.30.0/go.mod h1:qWi1OPS0B+b5L+Sg6Gmc9zD1Y+HaM0MdUr7LsupY1P4=
cloud.google.com/go/pubsub v1.31.0 h1:aXdyyJz90kA+bor9+6+xHAciMD5mj8v15WqFZ5E0sek=
cloud.google.com/go/pubsub v1.31.0/go.mod h1:dYmJ3K97NCQ/e4OwZ20rD4Ym3Bu8Gu9m/aJdWQjdcks=
cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg=
cloud.google.com/go/pubsublite v1.6.0/go.mod h1:1eFCS0U11xlOuMFV/0iBqw3zP12kddMeCbj/F3FSj9k=
Expand Down Expand Up @@ -2122,8 +2123,6 @@ github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/sigstore/protobuf-specs v0.1.0 h1:X0l/E2C2c79t/rI/lmSu8WAoKWsQtMqDzAMiDdEMGr8=
github.com/sigstore/protobuf-specs v0.1.0/go.mod h1:5shUCxf82hGnjUEFVWiktcxwzdtn6EfeeJssxZ5Q5HE=
github.com/sigstore/sigstore v1.7.1 h1:fCATemikcBK0cG4+NcM940MfoIgmioY1vC6E66hXxks=
github.com/sigstore/sigstore v1.7.1/go.mod h1:0PmMzfJP2Y9+lugD0wer4e7TihR5tM7NcIs3bQNk5xg=
github.com/sigstore/sigstore/pkg/signature/kms/aws v1.7.1 h1:rDHrG/63b3nBq3G9plg7iYnWN6lBhOfq/XultlCZgII=
Expand Down
19 changes: 19 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
"github.com/sigstore/rekor/pkg/signer"
"github.com/sigstore/rekor/pkg/storage"
Expand All @@ -39,6 +40,8 @@ import (
"github.com/sigstore/sigstore/pkg/cryptoutils"
"github.com/sigstore/sigstore/pkg/signature"
"github.com/sigstore/sigstore/pkg/signature/options"

_ "github.com/sigstore/rekor/pkg/pubsub/gcp" // Load GCP pubsub implementation
)

func dial(ctx context.Context, rpcServer string) (*grpc.ClientConn, error) {
Expand All @@ -63,6 +66,9 @@ type API struct {
signer signature.Signer
// stops checkpoint publishing
checkpointPublishCancel context.CancelFunc
// Publishes notifications when new entries are added to the log. May be
// nil if no publisher is configured.
newEntryPublisher pubsub.Publisher
}

func NewAPI(treeID uint) (*API, error) {
Expand Down Expand Up @@ -112,6 +118,14 @@ func NewAPI(treeID uint) (*API, error) {

pubkey := cryptoutils.PEMEncode(cryptoutils.PublicKeyPEMType, b)

var newEntryPublisher pubsub.Publisher
if p := viper.GetString("rekor_server.new_entry_publisher"); p != "" {
newEntryPublisher, err = pubsub.Get(ctx, p)
if err != nil {
return nil, fmt.Errorf("init event publisher: %w", err)
}
}

return &API{
// Transparency Log Stuff
logClient: logClient,
Expand All @@ -121,6 +135,8 @@ func NewAPI(treeID uint) (*API, error) {
pubkey: string(pubkey),
pubkeyHash: hex.EncodeToString(pubkeyHashBytes[:]),
signer: rekorSigner,
// Utility functionality not required for operation of the core service
newEntryPublisher: newEntryPublisher,
}, nil
}

Expand Down Expand Up @@ -165,5 +181,8 @@ func ConfigureAPI(treeID uint) {
}

func StopAPI() {
if api.newEntryPublisher != nil {
api.newEntryPublisher.Close()
}
api.checkpointPublishCancel()
}
28 changes: 27 additions & 1 deletion pkg/api/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/sigstore/rekor/pkg/generated/models"
"github.com/sigstore/rekor/pkg/generated/restapi/operations/entries"
"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
"github.com/sigstore/rekor/pkg/trillianclient"
"github.com/sigstore/rekor/pkg/types"
Expand Down Expand Up @@ -290,7 +291,7 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
RootHash: swag.String(hex.EncodeToString(root.RootHash)),
LogIndex: swag.Int64(queuedLeaf.LeafIndex),
Hashes: hashes,
Checkpoint: stringPointer(string(scBytes)),
Checkpoint: swag.String(string(scBytes)),
}

logEntryAnon.Verification = &models.LogEntryAnonVerification{
Expand All @@ -301,6 +302,31 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
logEntry := models.LogEntry{
entryID: logEntryAnon,
}

if api.newEntryPublisher != nil {
// Publishing notifications should not block the API response.
go func() {
var subjects []string
verifier, err := entry.Verifier()
if err != nil {
log.ContextLogger(ctx).Warnf("Could not get verifier for log entry: %w", err)
} else {
subjects = verifier.Subjects()
}
event, err := pubsub.BuildNewEntryEvent(entryID, logEntryAnon, subjects)
if err != nil {
log.ContextLogger(ctx).Error(err)
return
}
err = api.newEntryPublisher.Publish(context.Background(), event)
if err != nil {
log.ContextLogger(ctx).Error(err)
return
}
log.ContextLogger(ctx).Debugf("Published new entry event: %+v", event)
}()
}

return logEntry, nil
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/pubsub/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package pubsub provides an interface and implementations for publishing
// notifications for Rekor updates to a Pub/Sub system.
package pubsub
111 changes: 111 additions & 0 deletions pkg/pubsub/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pubsub

import (
"fmt"
"strings"

"github.com/sigstore/rekor/pkg/generated/models"
"github.com/sigstore/rekor/pkg/tle"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/reflect/protoreflect"

pb "github.com/sigstore/protobuf-specs/gen/pb-go/rekor/events/v1"
tspb "google.golang.org/protobuf/types/known/timestamppb"
)

// Event is any protobuf message that has getters for the required fields for
// a CloudEvent.
type Event interface {
protoreflect.ProtoMessage

GetSpecVersion() string
GetId() string
GetType() string
GetSource() string
}

// EventType is the unique name of an event type.
type EventType string

const (
// NewEntryEvent is an event that is published when a new entry is added to
// Rekor's transparency log.
NewEntryEvent EventType = "dev.sigstore.rekor.events.v1.NewEntry"
)

// BuildNewEntryEvent builds a new NewEntry event proto message. It takes the
// unique entry ID, entry model, and slice of subjects that signed the entry
// as arguments.
func BuildNewEntryEvent(entryID string, entry models.LogEntryAnon, subjects []string) (*pb.NewEntry, error) {
if entryID == "" {
return nil, fmt.Errorf("entryID parameter must be set")
}
slices.Sort(subjects) // Must be sorted for consistency.

validated, err := tle.GenerateTransparencyLogEntry(entry)
if err != nil {
return nil, fmt.Errorf("validate entry: %w", err)
}
data, err := tle.MarshalTLEToJSON(validated)
if err != nil {
return nil, fmt.Errorf("marshal entry: %w", err)
}

return &pb.NewEntry{
Id: entryID,
Type: string(NewEntryEvent),
Data: string(data),
DataType: "application/json",
Source: "/createLogEntry",
EntryKind: validated.GetKindVersion().GetKind(),
Subjects: subjects,
Time: &tspb.Timestamp{Seconds: validated.IntegratedTime},
}, nil
}

// EventAttributes returns the attributes for a given event. The key names
// follow the CloudEvents specification. All Rekor-specific attributes have a
// prefix of "rekor_".
func EventAttributes(event Event) map[string]string {
// Standard CloudEvents attributes defined in the spec.
attrs := map[string]string{
"specversion": event.GetSpecVersion(),
"id": event.GetId(),
"source": event.GetSource(),
"type": event.GetType(),
}

// Locate any Rekor-specific attributes.
fields := event.ProtoReflect().Descriptor().Fields()
for i := 0; i < fields.Len(); i++ {
fd := fields.Get(i)
if !strings.HasPrefix(fd.JSONName(), "rekor_") {
continue
}
val := event.ProtoReflect().Get(fd).Interface()
switch x := val.(type) {
case string:
attrs[fd.JSONName()] = x
case []string:
attrs[fd.JSONName()] = strings.Join(x, ",")
default:
attrs[fd.JSONName()] = fmt.Sprintf("%v", x)
}
}

return attrs
}
Loading

0 comments on commit 9fd111e

Please sign in to comment.