Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ type ClusterConfig struct {
// Default: 2
NumConns int

// Maximum number of inflight requests allowed per connection.
// Default: 32768 for CQL v3 and newer
// Default: 128 for older CQL versions
MaxRequestsPerConn int

// Default consistency level.
// Default: Quorum
Consistency Consistency
Expand Down Expand Up @@ -282,6 +287,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
MaxRequestsPerConn: 0,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
Expand Down
1 change: 1 addition & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestNewCluster_Defaults(t *testing.T) {
assertEqual(t, "cluster config timeout", 11*time.Second, cfg.Timeout)
assertEqual(t, "cluster config port", 9042, cfg.Port)
assertEqual(t, "cluster config num-conns", 2, cfg.NumConns)
assertEqual(t, "cluster config max requests per conn", 0, cfg.MaxRequestsPerConn)
assertEqual(t, "cluster config consistency", Quorum, cfg.Consistency)
assertEqual(t, "cluster config max prepared statements", defaultMaxPreparedStmts, cfg.MaxPreparedStmts)
assertEqual(t, "cluster config max routing key info", 1000, cfg.MaxRoutingKeyInfo)
Expand Down
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
errorHandler: errorHandler,
compressor: cfg.Compressor,
session: s,
streams: streams.New(cfg.ProtoVersion),
streams: streams.NewStreamIDGenerator(s.cfg.ProtoVersion, s.cfg.MaxRequestsPerConn),
host: host,
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
frameObserver: s.frameObserver,
Expand Down
14 changes: 12 additions & 2 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func TestQueryTimeoutClose(t *testing.T) {
func TestStream0(t *testing.T) {
// TODO: replace this with type check
const expErr = "gocql: received unexpected frame on stream 0"
const maxRequestsPerConn = 13

var buf bytes.Buffer
f := newFramer(nil, protoVersion4)
Expand All @@ -706,13 +707,22 @@ func TestStream0(t *testing.T) {
t.Fatal(err)
}

srv := NewTestServer(t, defaultProto, context.Background())
defer srv.Stop()
cluster := testCluster(defaultProto, srv.Address)
s, err := cluster.CreateSession()
s.cfg.MaxRequestsPerConn = maxRequestsPerConn
if err != nil {
t.Fatalf("NewCluster: %v", err)
}

conn := &Conn{
r: bufio.NewReader(&buf),
streams: streams.New(protoVersion4),
streams: streams.NewStreamIDGenerator(defaultProto, s.cfg.MaxRequestsPerConn),
logger: &defaultLogger{},
}

err := conn.recv(context.Background())
err = conn.recv(context.Background())
if err == nil {
t.Fatal("expected to get an error on stream 0")
} else if !strings.HasPrefix(err.Error(), expErr) {
Expand Down
18 changes: 10 additions & 8 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
type policyConnPool struct {
session *Session

port int
numConns int
keyspace string
port int
numConns int
maxRequestsPerConn int
keyspace string

mu sync.RWMutex
hostConnPools map[string]*hostConnPool
Expand Down Expand Up @@ -161,11 +162,12 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
func newPolicyConnPool(session *Session) *policyConnPool {
// create the pool
pool := &policyConnPool{
session: session,
port: session.cfg.Port,
numConns: session.cfg.NumConns,
keyspace: session.cfg.Keyspace,
hostConnPools: map[string]*hostConnPool{},
session: session,
port: session.cfg.Port,
numConns: session.cfg.NumConns,
maxRequestsPerConn: session.cfg.MaxRequestsPerConn,
keyspace: session.cfg.Keyspace,
hostConnPools: map[string]*hostConnPool{},
}

return pool
Expand Down
14 changes: 13 additions & 1 deletion internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,29 @@ type IDGenerator struct {
offset uint32
}

func NewStreamIDGenerator(protocol, maxRequestsPerConn int) *IDGenerator {
if maxRequestsPerConn > 0 {
return NewLimited(maxRequestsPerConn)
}
return New(protocol)
}

func New(protocol int) *IDGenerator {
maxStreams := 128
if protocol > 2 {
maxStreams = 32768
}
return NewLimited(maxStreams)
}

func NewLimited(maxStreams int) *IDGenerator {
// Round up maxStreams to a nearest multiple of 64
maxStreams = ((maxStreams + 63) / 64) * 64

buckets := maxStreams / 64
// reserve stream 0
streams := make([]uint64, buckets)
streams[0] = 1 << 63

return &IDGenerator{
NumStreams: maxStreams,
streams: streams,
Expand Down