Skip to content

Commit

Permalink
Support PutRecords (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickhayes-at authored Jan 21, 2025
1 parent a546176 commit 6196ce6
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ There is no persistence for Kinesis data.
| ListTagsForStream | ✅ Supported | |
| MergeShards | ❌ Unsupported | No support for merging/splitting yet. |
| PutRecord | ✅ Supported | |
| PutRecords | ❌ Unsupported | Use PutRecord for single records instead. |
| PutRecords | ✅ Supported | |
| RegisterStreamConsumer | ✅ Supported | |
| RemoveTagsFromStream | ✅ Supported | |
| SplitShard | ❌ Unsupported | No support for merging/splitting yet. |
Expand Down
1 change: 1 addition & 0 deletions services/kinesis/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (k *Kinesis) RegisterHTTPHandlers(logger *slog.Logger, methodRegistry http.
http.Register(logger, methodRegistry, service, "ListShards", k.ListShards)
http.Register(logger, methodRegistry, service, "ListTagsForStream", k.ListTagsForStream)
http.Register(logger, methodRegistry, service, "PutRecord", k.PutRecord)
http.Register(logger, methodRegistry, service, "PutRecords", k.PutRecords)
http.Register(logger, methodRegistry, service, "RegisterStreamConsumer", k.RegisterStreamConsumer)
http.Register(logger, methodRegistry, service, "RemoveTagsFromStream", k.RemoveTagsFromStream)
http.RegisterOutputStream(logger, methodRegistry, service, "SubscribeToShard", k.SubscribeToShard)
Expand Down
21 changes: 21 additions & 0 deletions services/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,27 @@ func (k *Kinesis) PutRecord(input PutRecordInput) (*PutRecordOutput, *awserrors.
panic("Could not find shard for record?")
}

// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
func (k *Kinesis) PutRecords(input PutRecordsInput) (*PutRecordsOutput, *awserrors.Error) {
putRecordsOutput := &PutRecordsOutput{}
for _, record := range input.Records {
putRecordOutput, err := k.PutRecord(PutRecordInput{
StreamName: input.StreamName,
Data: record.Data,
PartitionKey: record.PartitionKey,
ExplicitHashKey: record.ExplicitHashKey,
})
if err != nil {
return nil, err
}
putRecordsOutput.Records = append(putRecordsOutput.Records, PutRecordsOutputRecord{
SequenceNumber: putRecordOutput.SequenceNumber,
ShardId: putRecordOutput.ShardId,
})
}
return putRecordsOutput, nil
}

func (k *Kinesis) lockedGetShard(streamName, shardId string) (*Shard, *awserrors.Error) {
stream, ok := k.streams[streamName]
if !ok {
Expand Down
15 changes: 14 additions & 1 deletion services/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestGetShardIterator(t *testing.T) {
t.Fatal(err)
}

for i := 0; i < 5; i++ {
for i := 0; i < 2; i++ {
_, err := k.PutRecord(PutRecordInput{
StreamName: streamName,
PartitionKey: "key",
Expand All @@ -170,6 +170,19 @@ func TestGetShardIterator(t *testing.T) {
t.Fatal(err)
}
}
putRecordsInput := PutRecordsInput{
StreamName: streamName,
}
for i := 2; i < 5; i++ {
putRecordsInput.Records = append(putRecordsInput.Records, PutRecordsRequestEntry{

Check failure on line 177 in services/kinesis/kinesis_test.go

View workflow job for this annotation

GitHub Actions / build

undefined: PutRecordsRequestEntry
PartitionKey: "key",
Data: strconv.Itoa(i),
})
}
_, err := k.PutRecords(putRecordsInput)

Check failure on line 182 in services/kinesis/kinesis_test.go

View workflow job for this annotation

GitHub Actions / build

no new variables on left side of :=
if err != nil {
t.Fatal(err)
}

shardsOutput, err := k.ListShards(ListShardsInput{
StreamName: streamName,
Expand Down
21 changes: 21 additions & 0 deletions services/kinesis/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,27 @@ type PutRecordOutput struct {
SequenceNumber string
}

type PutRecordsInputRecord struct {
PartitionKey string
Data string
ExplicitHashKey string
}

type PutRecordsInput struct {
StreamName string
StreamARN string
Records []PutRecordsInputRecord
}

type PutRecordsOutputRecord struct {
ShardId string
SequenceNumber string
}

type PutRecordsOutput struct {
Records []PutRecordsOutputRecord
}

type GetShardIteratorInput struct {
ShardId string
ShardIteratorType string
Expand Down

0 comments on commit 6196ce6

Please sign in to comment.