Skip to content

Commit afe5aee

Browse files
author
chaoyuepan
committed
add kafkamock
1 parent 23fc745 commit afe5aee

File tree

7 files changed

+256
-20
lines changed

7 files changed

+256
-20
lines changed

README.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,8 @@ Experimental packages not in std and golang.org/exp
6060

6161
- internal data structure in go std libs
6262
- `PoolDequeue`: a lock-free, fixed-size single-producer, multi-consumer queue
63-
- `PoolChain`: a lock-free, dynamically-sized single-producer, multi-consumer queue
63+
- `PoolChain`: a lock-free, dynamically-sized single-producer, multi-consumer queue
64+
65+
- mock
66+
- `sqlmock`: a simple sql mock
67+
- `kafkamock`: kafka mock

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/hashicorp/go-multierror v1.1.1
1010
github.com/mailru/easyjson v0.7.7
1111
github.com/mattn/go-sqlite3 v1.14.22
12+
github.com/segmentio/kafka-go v0.4.47
1213
github.com/smallnest/gid v1.2.0
1314
github.com/stretchr/testify v1.8.4
1415
golang.org/x/arch v0.10.0
@@ -19,6 +20,8 @@ require (
1920
require (
2021
github.com/davecgh/go-spew v1.1.1 // indirect
2122
github.com/hashicorp/errwrap v1.0.0 // indirect
23+
github.com/klauspost/compress v1.15.9 // indirect
24+
github.com/pierrec/lz4/v4 v4.1.15 // indirect
2225
github.com/pmezard/go-difflib v1.0.0 // indirect
2326
gopkg.in/yaml.v3 v3.0.1 // indirect
2427
)

go.sum

+56
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtL
2020
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
2121
github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM=
2222
github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE=
23+
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
24+
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
2325
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
2426
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
2527
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -33,27 +35,81 @@ github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/
3335
github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw=
3436
github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U=
3537
github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA=
38+
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
39+
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
3640
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3741
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3842
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
3943
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
44+
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
45+
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
4046
github.com/smallnest/gid v1.2.0 h1:/s/SJU8q2JRb/Pwq8smRS1iYzmmYOaaXeTnjpNejo9A=
4147
github.com/smallnest/gid v1.2.0/go.mod h1:xp2lHqE0ny+OFl9pFhBba1e1Uo7qVs776cnv33XCjts=
4248
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
49+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
4350
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
51+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
52+
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
4453
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
4554
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
55+
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
56+
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
57+
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
58+
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
59+
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
60+
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
61+
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
4662
golang.org/x/arch v0.10.0 h1:S3huipmSclq3PJMNe76NGwkBR504WFkQ5dhzWzP8ZW8=
4763
golang.org/x/arch v0.10.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
64+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
65+
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
66+
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
4867
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
4968
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
69+
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
70+
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
71+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
72+
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
73+
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
74+
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
75+
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
76+
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
5077
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
5178
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
79+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
80+
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
5281
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
5382
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
83+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
84+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
85+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
86+
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
87+
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
88+
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
89+
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
90+
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
5491
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
5592
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
93+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
94+
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
95+
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
96+
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
97+
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
98+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
99+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
100+
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
101+
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
102+
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
103+
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
104+
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
105+
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
106+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
107+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
108+
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
109+
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
110+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
56111
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
57112
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
113+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
58114
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
59115
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

kafkamock/kafkamock.go

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package kafkamock
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
8+
"github.com/segmentio/kafka-go"
9+
)
10+
11+
// MockReader simulates the kafka.Reader interface
12+
type MockReader struct {
13+
messages chan *kafka.Message
14+
mu sync.Mutex
15+
closed bool
16+
}
17+
18+
// NewMockReader creates a new MockReader
19+
func NewMockReader(messages chan *kafka.Message) *MockReader {
20+
return &MockReader{
21+
messages: messages,
22+
}
23+
}
24+
25+
// ReadMessage simulates reading a message
26+
func (m *MockReader) ReadMessage(ctx context.Context) (kafka.Message, error) {
27+
m.mu.Lock()
28+
defer m.mu.Unlock()
29+
30+
if m.closed {
31+
return kafka.Message{}, errors.New("reader is closed")
32+
}
33+
34+
if len(m.messages) == 0 {
35+
return kafka.Message{}, errors.New("no more messages")
36+
}
37+
38+
msg := <-m.messages
39+
40+
return *msg, nil
41+
}
42+
43+
// Close closes the MockReader
44+
func (m *MockReader) Close() error {
45+
m.mu.Lock()
46+
defer m.mu.Unlock()
47+
m.closed = true
48+
return nil
49+
}
50+
51+
// MockWriter simulates the kafka.Writer interface
52+
type MockWriter struct {
53+
messages chan *kafka.Message
54+
mu sync.Mutex
55+
closed bool
56+
}
57+
58+
// NewMockWriter creates a new MockWriter
59+
func NewMockWriter(messages chan *kafka.Message) *MockWriter {
60+
return &MockWriter{
61+
messages: messages,
62+
}
63+
}
64+
65+
// WriteMessages simulates writing messages
66+
func (m *MockWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error {
67+
m.mu.Lock()
68+
defer m.mu.Unlock()
69+
70+
if m.closed {
71+
return errors.New("writer is closed")
72+
}
73+
74+
for i := range msgs {
75+
m.messages <- &msgs[i]
76+
}
77+
return nil
78+
}
79+
80+
// Close closes the MockWriter
81+
func (m *MockWriter) Close() error {
82+
m.mu.Lock()
83+
defer m.mu.Unlock()
84+
m.closed = true
85+
return nil
86+
}
87+
88+
// MockKafka contains MockReader and MockWriter
89+
type MockKafka struct {
90+
*MockReader
91+
*MockWriter
92+
messages chan *kafka.Message
93+
}
94+
95+
// NewMockKafka creates a new MockKafka
96+
func NewMockKafka(size int) *MockKafka {
97+
messages := make(chan *kafka.Message, size)
98+
return &MockKafka{
99+
MockReader: NewMockReader(messages),
100+
MockWriter: NewMockWriter(messages),
101+
messages: messages,
102+
}
103+
}
104+
105+
// Close closes the MockKafka
106+
func (m *MockKafka) Close() error {
107+
m.MockReader.Close()
108+
m.MockWriter.Close()
109+
close(m.messages)
110+
return nil
111+
}
112+
113+
// GetMessages gets the message channel of MockKafka
114+
func (m *MockKafka) GetMessages() chan *kafka.Message {
115+
return m.messages
116+
}
117+
118+
// MockKafkaInterface defines an interface to easily replace the real Kafka client
119+
type MockKafkaInterface interface {
120+
ReadMessage(ctx context.Context) (kafka.Message, error)
121+
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
122+
Close() error
123+
}
124+
125+
// Ensure MockReader and MockWriter implement MockKafkaInterface
126+
var (
127+
_ MockKafkaInterface = &MockKafka{}
128+
)

kafkamock/kafkamock_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package kafkamock
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/segmentio/kafka-go"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestMockKafka(t *testing.T) {
12+
mockKafka := NewMockKafka(10)
13+
14+
// Test writing messages
15+
msgsToWrite := []kafka.Message{
16+
{Value: []byte("message1")},
17+
{Value: []byte("message2")},
18+
}
19+
20+
err := mockKafka.WriteMessages(context.Background(), msgsToWrite...)
21+
assert.NoError(t, err, "expected no error when writing messages")
22+
23+
// Test reading messages
24+
for _, expectedMsg := range msgsToWrite {
25+
msg, err := mockKafka.ReadMessage(context.Background())
26+
assert.NoError(t, err, "expected no error when reading messages")
27+
assert.Equal(t, expectedMsg.Value, msg.Value, "expected message values to be equal")
28+
}
29+
30+
// Test reading from empty queue
31+
_, err = mockKafka.ReadMessage(context.Background())
32+
assert.Error(t, err, "expected error when reading from empty queue")
33+
34+
// Test closing MockKafka
35+
err = mockKafka.Close()
36+
assert.NoError(t, err, "expected no error when closing MockKafka")
37+
38+
// Test writing after closing
39+
err = mockKafka.WriteMessages(context.Background(), kafka.Message{Value: []byte("message3")})
40+
assert.Error(t, err, "expected error when writing after closing")
41+
42+
// Test reading after closing
43+
_, err = mockKafka.ReadMessage(context.Background())
44+
assert.Error(t, err, "expected error when reading after closing")
45+
}

0 commit comments

Comments
 (0)