Skip to content

Commit 9c31f7f

Browse files
authored
Redis: Block forever (#130)
Before this commit redis only blocked for an hour. After an hour the xread command returned with empty data. This was unexpected for the calling functions. Now, redis blocks forever. On Shutdown the redis-connection gets killed.
1 parent c5fd275 commit 9c31f7f

File tree

4 files changed

+7
-11
lines changed

4 files changed

+7
-11
lines changed

internal/redis/conn.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@ func (s *Pool) TestConn() error {
3838
}
3939

4040
// XREAD reads new messages from one stream.
41-
func (s *Pool) XREAD(count, block, stream, id string) (interface{}, error) {
41+
func (s *Pool) XREAD(count, stream, id string) (interface{}, error) {
4242
conn := s.pool.Get()
4343
defer conn.Close()
44-
return conn.Do("XREAD", "COUNT", count, "BLOCK", block, "STREAMS", stream, id)
44+
return conn.Do("XREAD", "COUNT", count, "BLOCK", "0", "STREAMS", stream, id)
4545
}
4646

4747
// BlockingConn implements the redis.Conn interface but does nothing.
4848
type BlockingConn struct{}
4949

5050
// XREAD blocks forever.
51-
func (BlockingConn) XREAD(count, block, stream, id string) (interface{}, error) {
51+
func (BlockingConn) XREAD(count, stream, id string) (interface{}, error) {
5252
select {}
5353
}

internal/redis/interfaces.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ package redis
22

33
// Connection is the raw connection to a redis server.
44
type Connection interface {
5-
XREAD(count, block, stream, lastID string) (interface{}, error)
5+
XREAD(count, stream, lastID string) (interface{}, error)
66
}

internal/redis/mock_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ var testData = map[string]string{
4949
]`,
5050
}
5151

52-
func (c mockConn) XREAD(count, block, stream, lastID string) (interface{}, error) {
52+
func (c mockConn) XREAD(count, stream, lastID string) (interface{}, error) {
5353
if c.err != nil {
5454
return nil, c.err
5555
}

internal/redis/redis.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ const (
1313
// maxMessages desides how many messages are read at once from the stream.
1414
maxMessages = "10"
1515

16-
// blockTimeout is the time in miliseconds, how long the xread command will
17-
// block.
18-
blockTimeout = "3600000" // One Hour
19-
2016
// fieldChangedTopic is the redis key name of the autoupdate stream.
2117
fieldChangedTopic = "ModifiedFields"
2218

@@ -43,7 +39,7 @@ func (s *Service) Update(closing <-chan struct{}) (map[string]json.RawMessage, e
4339

4440
var data map[string]json.RawMessage
4541
err := closingFunc(closing, func() error {
46-
newID, d, err := autoupdateStream(s.Conn.XREAD(maxMessages, blockTimeout, fieldChangedTopic, id))
42+
newID, d, err := autoupdateStream(s.Conn.XREAD(maxMessages, fieldChangedTopic, id))
4743
if err != nil {
4844
return err
4945
}
@@ -75,7 +71,7 @@ func (s *Service) LogoutEvent(closing <-chan struct{}) ([]string, error) {
7571

7672
var sessionIDs []string
7773
err := closingFunc(closing, func() error {
78-
newID, sIDs, err := logoutStream(s.Conn.XREAD(maxMessages, blockTimeout, logoutTopic, id))
74+
newID, sIDs, err := logoutStream(s.Conn.XREAD(maxMessages, logoutTopic, id))
7975
if err != nil {
8076
return err
8177
}

0 commit comments

Comments
 (0)