Skip to content

Commit

Permalink
feat(base): create first version of producer and consumer (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
Robin Joseph authored Jul 13, 2019
1 parent 7f50a6b commit 4f482af
Show file tree
Hide file tree
Showing 24 changed files with 1,735 additions and 1 deletion.
30 changes: 30 additions & 0 deletions .chglog/CHANGELOG.tpl.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{ range .Versions }}
<a name="{{ .Tag.Name }}"></a>
## {{ if .Tag.Previous }}[{{ .Tag.Name }}]({{ $.Info.RepositoryURL }}/compare/{{ .Tag.Previous.Name }}...{{ .Tag.Name }}){{ else }}{{ .Tag.Name }}{{ end }} ({{ datetime "2006-01-02" .Tag.Date }})

{{ range .CommitGroups -}}
### {{ .Title }}

{{ range .Commits -}}
* {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }}
{{ end }}
{{ end -}}

{{- if .RevertCommits -}}
### Reverts

{{ range .RevertCommits -}}
* {{ .Revert.Header }}
{{ end }}
{{ end -}}

{{- if .NoteGroups -}}
{{ range .NoteGroups -}}
### {{ .Title }}

{{ range .Notes }}
{{ .Body }}
{{ end }}
{{ end -}}
{{ end -}}
{{ end -}}
22 changes: 22 additions & 0 deletions .chglog/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
style: github
template: CHANGELOG.tpl.md
info:
title: CHANGELOG
repository_url: https://github.com/robinjoseph08/go-pg-migrations
options:
commit_groups:
title_maps:
docs: Documentation
feat: Features
fix: Bug Fixes
perf: Performance Improvements
refactor: Code Refactoring
header:
pattern: "^(\\w*)(?:\\(([\\w\\$\\.\\-\\*\\s]*)\\))?\\:\\s(.*)$"
pattern_maps:
- Type
- Scope
- Subject
notes:
keywords:
- BREAKING CHANGE
11 changes: 11 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[*]
charset = utf-8
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true

[Makefile]
indent_style = tab

[*.go]
indent_style = tab
67 changes: 67 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Created by https://www.gitignore.io/api/go,vim,macos

### Go ###
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
coverage.out
coverage.html

# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/

# Vendored dependencies
vendor/*
_vendor*/*

# Binary executables
bin/

### macOS ###
*.DS_Store
.AppleDouble
.LSOverride

# Icon must end with two \r
Icon

# Thumbnails
._*

# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent

# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk

### Vim ###
# swap
.sw[a-p]
.*.sw[a-p]
# session
Session.vim
# temporary
.netrwhist
*~
# auto-generated tag files
tags


# End of https://www.gitignore.io/api/go,vim,macos
1 change: 1 addition & 0 deletions .go-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.12.7
33 changes: 33 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
output:
format: tab
linters:
disable-all: true
enable:
- deadcode
- depguard
- dupl
- goconst
- gocritic
- gocyclo
- gofmt
- goimports
- golint
- gosec
- govet
- ineffassign
- maligned
- misspell
- prealloc
- scopelint
- structcheck
- typecheck
- unconvert
- varcheck
issues:
exclude-use-default: false
max-per-linter: 0
max-same-issues: 0
exclude-rules:
- path: _test\.go
linters:
- dupl
18 changes: 18 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
dist: trusty
sudo: false
language: go
addons:
apt:
packages:
- redis-server
go: "1.12.7"
env:
- GO111MODULE=on
install:
- make setup
- make install
script:
- make lint
- make test
- make enforce
- make coveralls
21 changes: 21 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2019 Robin Joseph

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
70 changes: 70 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
BIN_DIR ?= ./bin
DIRS ?= $(shell find . -name '*.go' | grep --invert-match 'vendor' | xargs -n 1 dirname | sort --unique)
GO_TOOLS := \
github.com/git-chglog/git-chglog/cmd/git-chglog \
github.com/mattn/goveralls \

TFLAGS ?=

COVERAGE_PROFILE ?= coverage.out
HTML_OUTPUT ?= coverage.html

PSQL := $(shell command -v psql 2> /dev/null)

TEST_DATABASE_USER ?= go_pg_migrations_user
TEST_DATABASE_NAME ?= go_pg_migrations

default: install

.PHONY: clean
clean:
@echo "---> Cleaning"
go clean

coveralls:
@echo "---> Sending coverage info to Coveralls"
goveralls -coverprofile=$(COVERAGE_PROFILE) -service=travis-ci

.PHONY: enforce
enforce:
@echo "---> Enforcing coverage"
./scripts/coverage.sh $(COVERAGE_PROFILE)

.PHONY: html
html:
@echo "---> Generating HTML coverage report"
go tool cover -html $(COVERAGE_PROFILE) -o $(HTML_OUTPUT)
open $(HTML_OUTPUT)

.PHONY: install
install:
@echo "---> Installing dependencies"
go mod download

.PHONY: lint
lint:
@echo "---> Linting"
$(BIN_DIR)/golangci-lint run

.PHONY: release
release:
@echo "---> Creating new release"
ifndef tag
$(error tag must be specified)
endif
git-chglog --output CHANGELOG.md --next-tag $(tag)
git add CHANGELOG.md
git commit -m $(tag)
git tag $(tag)
git push origin master --tags

.PHONY: setup
setup: install
@echo "--> Setting up"
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(BIN_DIR) v1.16.0
go get $(GO_TOOLS) && GOBIN=$$(realpath $(BIN_DIR)) go install $(GO_TOOLS)

.PHONY: test
test:
@echo "---> Testing"
go test ./... -coverprofile $(COVERAGE_PROFILE) $(TFLAGS)
114 changes: 113 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,114 @@
# redisqueue
A Golang queue worker using Redis streams

[![GoDoc](https://godoc.org/github.com/robinjoseph08/redisqueue?status.svg)](https://godoc.org/github.com/robinjoseph08/redisqueue)
[![Build Status](https://travis-ci.org/robinjoseph08/redisqueue.svg?branch=master)](https://travis-ci.org/robinjoseph08/redisqueue)
[![Coverage Status](https://coveralls.io/repos/github/robinjoseph08/redisqueue/badge.svg?branch=master)](https://coveralls.io/github/robinjoseph08/redisqueue?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/robinjoseph08/redisqueue)](https://goreportcard.com/report/github.com/robinjoseph08/redisqueue)
![License](https://img.shields.io/github/license/robinjoseph08/redisqueue.svg)

`redisqueue` provides a producer and consumer of a queue that uses [Redis
streams](https://redis.io/topics/streams-intro).

## Features

- A `Producer` struct to make enqueuing messages easy.
- A `Consumer` struct to make processing messages concurrenly.
- Claiming and acknowledging messages if there's no error, so that if a consumer
dies while processing, the message it was working on isn't lost. This
guarantees at least once delivery.
- A "visibility timeout" so that if a message isn't processed in a designated
time frame, it will be be processed by another consumer.
- A max length on the stream so that it doesn't store the messages indefinitely
and run out of memory.
- Graceful handling of Unix signals (`SIGINT` and `SIGTERM`) to let in-flight
messages complete.
- A channel that will surface any errors so you can handle them centrally.
- Graceful handling of panics to avoid crashing the whole process.
- A concurrency setting to control how many goroutines are spawned to process
messages.
- A batch size setting to limit the total messages in flight.
- Support for multiple streams.

## Example

Here's an example of a producer that inserts 1000 messages into a queue:

```go
package main

import (
"fmt"

"github.com/robinjoseph08/redisqueue"
)

func main() {
p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
StreamMaxLength: 10000,
ApproximateMaxLength: true,
})
if err != nil {
panic(err)
}

for i := 0; i < 1000; i++ {
err := p.Enqueue(&redisqueue.Message{
Stream: "redisqueue:test",
Values: map[string]interface{}{
"index": i,
},
})
if err != nil {
panic(err)
}

if i%100 == 0 {
fmt.Printf("enqueued %d\n", i)
}
}
}
```

And here's an example of a consumer that reads the messages off of that queue:

```go
package main

import (
"fmt"
"time"

"github.com/robinjoseph08/redisqueue"
)

func main() {
c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
VisibilityTimeout: 60 * time.Second,
BlockingTimeout: 5 * time.Second,
ReclaimInterval: 1 * time.Second,
BufferSize: 100,
Concurrency: 10,
})
if err != nil {
panic(err)
}

c.Register("redisqueue:test", process)

go func() {
for err := range c.Errors {
// handle errors accordingly
fmt.Printf("err: %+v\n", err)
}
}()

fmt.Println("starting")
c.Run()
fmt.Println("stopped")
}

func process(msg *redisqueue.Message) error {
fmt.Printf("processing message: %v\n", msg.Values["index"])
return nil
}
```
Loading

0 comments on commit 4f482af

Please sign in to comment.