Skip to content

Commit 40bd23d

Browse files
committed
fix consistency level on codecs
1 parent 84b23d8 commit 40bd23d

File tree

4 files changed

+19
-57
lines changed

4 files changed

+19
-57
lines changed

proxy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
func main() {
2626
ctx, cancel := signalContext(context.Background(), os.Interrupt, os.Kill)
27+
2728
defer cancel()
2829

2930
os.Exit(proxy.Run(ctx, os.Args[1:]))

proxy/codecs.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (c *partialQueryCodec) Decode(source io.Reader, _ primitive.ProtocolVersion
4242
if query, err := primitive.ReadLongString(source); err != nil {
4343
return nil, err
4444
} else {
45-
return &partialQuery{query, primitive.ConsistencyLevelAny}, nil
45+
return &partialQuery{query, primitive.ConsistencyLevelLocalQuorum}, nil
4646
}
4747
}
4848

@@ -105,7 +105,7 @@ func (c *partialExecuteCodec) EncodedLength(_ message.Message, _ primitive.Proto
105105
}
106106

107107
func (c *partialExecuteCodec) Decode(source io.Reader, _ primitive.ProtocolVersion) (msg message.Message, err error) {
108-
execute := &partialExecute{}
108+
execute := &partialExecute{Consistency: primitive.ConsistencyLevelLocalQuorum}
109109
if execute.queryId, err = primitive.ReadShortBytes(source); err != nil {
110110
return nil, fmt.Errorf("cannot read EXECUTE query id: %w", err)
111111
} else if len(execute.queryId) == 0 {
@@ -184,7 +184,7 @@ func (p partialBatchCodec) Decode(source io.Reader, version primitive.ProtocolVe
184184
}
185185
queryOrIds[i] = queryOrId
186186
}
187-
return &partialBatch{queryOrIds, primitive.ConsistencyLevelAny}, nil
187+
return &partialBatch{queryOrIds, primitive.ConsistencyLevelLocalQuorum}, nil
188188
}
189189

190190
func (p partialBatchCodec) GetOpCode() primitive.OpCode {

proxy/proxy_test.go

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -407,47 +407,6 @@ func setupProxyTestWithConfig(ctx context.Context, numNodes int, cfg *proxyTestC
407407
return tester, proxyAddr, nil
408408
}
409409

410-
func TestProxy_Consistency(t *testing.T) {
411-
ctx, cancel := context.WithCancel(context.Background())
412-
tester, proxyContactPoint, err := setupProxyTest(ctx, 3, proxycore.MockRequestHandlers{
413-
primitive.OpCodeQuery: func(cl *proxycore.MockClient, frm *frame.Frame) message.Message {
414-
qry := frm.Body.Message.(*message.Query)
415-
if qry.Query == "SELECT * FROM test.test" {
416-
// A read timeout is not retried because of 0 replicas received
417-
return &message.ReadTimeout{
418-
ErrorMessage: "this is a mock read timeout error",
419-
Consistency: getConsistencyLevel(frm.Body.Message),
420-
Received: 0,
421-
BlockFor: 1,
422-
DataPresent: false,
423-
}
424-
}
425-
return cl.InterceptQuery(frm.Header, frm.Body.Message.(*message.Query))
426-
},
427-
})
428-
defer func() {
429-
cancel()
430-
tester.shutdown()
431-
}()
432-
require.NoError(t, err)
433-
434-
cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(proxyContactPoint), proxycore.ClientConnConfig{})
435-
require.NoError(t, err)
436-
437-
version, err := cl.Handshake(ctx, primitive.ProtocolVersion5, nil)
438-
require.NoError(t, err)
439-
assert.Equal(t, primitive.ProtocolVersion4, version) // Expected to be negotiated to v4
440-
441-
_, err = cl.Query(ctx, primitive.ProtocolVersion4, &message.Query{
442-
Query: "SELECT * FROM test.test",
443-
Options: &message.QueryOptions{
444-
Consistency: primitive.ConsistencyLevelOne,
445-
},
446-
})
447-
448-
print(err)
449-
}
450-
451410
func connectTestClient(t *testing.T, ctx context.Context, proxyContactPoint string) *proxycore.ClientConn {
452411
cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(proxyContactPoint), proxycore.ClientConnConfig{})
453412
require.NoError(t, err)
@@ -459,19 +418,6 @@ func connectTestClient(t *testing.T, ctx context.Context, proxyContactPoint stri
459418
return cl
460419
}
461420

462-
func getConsistencyLevel(message message.Message) primitive.ConsistencyLevel {
463-
switch m := message.(type) {
464-
case *partialQuery:
465-
return m.Consistency
466-
case *partialBatch:
467-
return m.Consistency
468-
case *partialExecute:
469-
return m.Consistency
470-
default:
471-
return primitive.ConsistencyLevelOne
472-
}
473-
}
474-
475421
func waitUntil(d time.Duration, check func() bool) bool {
476422
iterations := int(d / (100 * time.Millisecond))
477423
for i := 0; i < iterations; i++ {

proxy/request.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ func (r *request) executeInternal(next bool) {
6868
r.done = true
6969
r.send(&message.ServerError{ErrorMessage: "Proxy exhausted query plan and there are no more hosts available to try"})
7070
} else {
71+
r.client.proxy.logger.Info("sending request to host", zap.Stringer("host", r.host),
72+
zap.String("consistency", r.getConsistencyLevel(r.msg).String()))
7173
err := r.session.Send(r.host, r)
7274
if err == nil {
7375
break
@@ -78,6 +80,19 @@ func (r *request) executeInternal(next bool) {
7880
}
7981
}
8082

83+
func (r *request) getConsistencyLevel(message message.Message) primitive.ConsistencyLevel {
84+
switch m := message.(type) {
85+
case *partialQuery:
86+
return m.Consistency
87+
case *partialBatch:
88+
return m.Consistency
89+
case *partialExecute:
90+
return m.Consistency
91+
default:
92+
return primitive.ConsistencyLevelOne
93+
}
94+
}
95+
8196
func (r *request) send(msg message.Message) {
8297
_ = r.client.conn.Write(proxycore.SenderFunc(func(writer io.Writer) error {
8398
return codec.EncodeFrame(frame.NewFrame(r.raw.Header.Version, r.stream, msg), writer)

0 commit comments

Comments
 (0)