Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,4 @@ Dmitry Kropachev <[email protected]>
Oliver Boyle <[email protected]>
Jackson Fleming <[email protected]>
Sylwia Szunejko <[email protected]>
Evan Jones <[email protected]>
16 changes: 8 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
// frame checks that it is not 0
params.serialConsistency = qry.serialCons
params.defaultTimestamp = qry.defaultTimestamp
params.defaultTimestampValue = qry.defaultTimestampValue
params.defaultTimestampValueMicros = qry.defaultTimestampValueMicros

if len(qry.pageState) > 0 {
params.pagingState = qry.pageState
Expand Down Expand Up @@ -1525,13 +1525,13 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) *Iter {

n := len(batch.Entries)
req := &writeBatchFrame{
typ: batch.Type,
statements: make([]batchStatment, n),
consistency: batch.Cons,
serialConsistency: batch.serialCons,
defaultTimestamp: batch.defaultTimestamp,
defaultTimestampValue: batch.defaultTimestampValue,
customPayload: batch.CustomPayload,
typ: batch.Type,
statements: make([]batchStatment, n),
consistency: batch.Cons,
serialConsistency: batch.serialCons,
defaultTimestamp: batch.defaultTimestamp,
defaultTimestampValueMicros: batch.defaultTimestampValueMicros,
customPayload: batch.CustomPayload,
}

stmts := make(map[string]string, len(batch.Entries))
Expand Down
22 changes: 11 additions & 11 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,8 +1434,8 @@ type queryParams struct {
pagingState []byte
serialConsistency SerialConsistency
// v3+
defaultTimestamp bool
defaultTimestampValue int64
defaultTimestamp bool
defaultTimestampValueMicros int64
// v5+
keyspace string
}
Expand Down Expand Up @@ -1527,10 +1527,10 @@ func (f *framer) writeQueryParams(opts *queryParams) {
if f.proto > protoVersion2 && opts.defaultTimestamp {
// timestamp in microseconds
var ts int64
if opts.defaultTimestampValue != 0 {
ts = opts.defaultTimestampValue
if opts.defaultTimestampValueMicros != 0 {
ts = opts.defaultTimestampValueMicros
} else {
ts = time.Now().UnixNano() / 1000
ts = time.Now().UnixMicro()
}
f.writeLong(ts)
}
Expand Down Expand Up @@ -1633,9 +1633,9 @@ type writeBatchFrame struct {
consistency Consistency

// v3+
serialConsistency SerialConsistency
defaultTimestamp bool
defaultTimestampValue int64
serialConsistency SerialConsistency
defaultTimestamp bool
defaultTimestampValueMicros int64

//v4+
customPayload map[string][]byte
Expand Down Expand Up @@ -1710,10 +1710,10 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload

if w.defaultTimestamp {
var ts int64
if w.defaultTimestampValue != 0 {
ts = w.defaultTimestampValue
if w.defaultTimestampValueMicros != 0 {
ts = w.defaultTimestampValueMicros
} else {
ts = time.Now().UnixNano() / 1000
ts = time.Now().UnixMicro()
}
f.writeLong(ts)
}
Expand Down
122 changes: 62 additions & 60 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,29 +881,29 @@ func (qm *queryMetrics) attempt(addAttempts int, addLatency time.Duration,

// Query represents a CQL statement that can be executed.
type Query struct {
stmt string
values []interface{}
cons Consistency
pageSize int
routingKey []byte
pageState []byte
prefetch float64
trace Tracer
observer QueryObserver
session *Session
conn *Conn
rt RetryPolicy
spec SpeculativeExecutionPolicy
binding func(q *QueryInfo) ([]interface{}, error)
serialCons SerialConsistency
defaultTimestamp bool
defaultTimestampValue int64
disableSkipMetadata bool
context context.Context
idempotent bool
customPayload map[string][]byte
metrics *queryMetrics
refCount uint32
stmt string
values []interface{}
cons Consistency
pageSize int
routingKey []byte
pageState []byte
prefetch float64
trace Tracer
observer QueryObserver
session *Session
conn *Conn
rt RetryPolicy
spec SpeculativeExecutionPolicy
binding func(q *QueryInfo) ([]interface{}, error)
serialCons SerialConsistency
defaultTimestamp bool
defaultTimestampValueMicros int64
disableSkipMetadata bool
context context.Context
idempotent bool
customPayload map[string][]byte
metrics *queryMetrics
refCount uint32

disableAutoPage bool

Expand Down Expand Up @@ -1035,26 +1035,27 @@ func (q *Query) PageSize(n int) *Query {
return q
}

// DefaultTimestamp will enable the with default timestamp flag on the query.
// If enable, this will replace the server side assigned
// timestamp as default timestamp. Note that a timestamp in the query itself
// will still override this timestamp. This is entirely optional.
// DefaultTimestamp enables the with default timestamp flag on the query.
// If enabled, gocql will assign a timestamp that will be used instead of the
// server side assigned timestamp. Note that a timestamp in the query itself
// will still override this timestamp. The default value of this setting comes
// from ClusterConfig.DefaultTimestamp.
//
// Only available on protocol >= 3
func (q *Query) DefaultTimestamp(enable bool) *Query {
q.defaultTimestamp = enable
return q
}

// WithTimestamp will enable the with default timestamp flag on the query
// like DefaultTimestamp does. But also allows to define value for timestamp.
// It works the same way as USING TIMESTAMP in the query itself, but
// should not break prepared query optimization.
// WithTimestamp enables the with default timestamp flag on the query
// like DefaultTimestamp, and sets the value. It works the same way as
// USING TIMESTAMP in the query itself, but should not break prepared query
// optimization. The timestamp is in Unix microseconds.
//
// Only available on protocol >= 3
func (q *Query) WithTimestamp(timestamp int64) *Query {
func (q *Query) WithTimestamp(timestampMicros int64) *Query {
q.DefaultTimestamp(true)
q.defaultTimestampValue = timestamp
q.defaultTimestampValueMicros = timestampMicros
return q
}

Expand Down Expand Up @@ -1705,23 +1706,23 @@ func (n *nextIter) fetch() *Iter {
}

type Batch struct {
Type BatchType
Entries []BatchEntry
Cons Consistency
routingKey []byte
CustomPayload map[string][]byte
rt RetryPolicy
spec SpeculativeExecutionPolicy
trace Tracer
observer BatchObserver
session *Session
serialCons SerialConsistency
defaultTimestamp bool
defaultTimestampValue int64
context context.Context
cancelBatch func()
keyspace string
metrics *queryMetrics
Type BatchType
Entries []BatchEntry
Cons Consistency
routingKey []byte
CustomPayload map[string][]byte
rt RetryPolicy
spec SpeculativeExecutionPolicy
trace Tracer
observer BatchObserver
session *Session
serialCons SerialConsistency
defaultTimestamp bool
defaultTimestampValueMicros int64
context context.Context
cancelBatch func()
keyspace string
metrics *queryMetrics

// routingInfo is a pointer because Query can be copied and copyable struct can't hold a mutex.
routingInfo *queryRoutingInfo
Expand Down Expand Up @@ -1899,26 +1900,27 @@ func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
return b
}

// DefaultTimestamp will enable the with default timestamp flag on the query.
// If enable, this will replace the server side assigned
// timestamp as default timestamp. Note that a timestamp in the query itself
// will still override this timestamp. This is entirely optional.
// DefaultTimestamp enables the with default timestamp flag on the query.
// If enabled, gocql will assign a timestamp that will be used instead of the
// server side assigned timestamp. Note that a timestamp in the query itself
// will still override this timestamp. The default value of this setting comes
// from ClusterConfig.DefaultTimestamp.
//
// Only available on protocol >= 3
func (b *Batch) DefaultTimestamp(enable bool) *Batch {
b.defaultTimestamp = enable
return b
}

// WithTimestamp will enable the with default timestamp flag on the query
// like DefaultTimestamp does. But also allows to define value for timestamp.
// It works the same way as USING TIMESTAMP in the query itself, but
// should not break prepared query optimization.
// WithTimestamp enables the with default timestamp flag on the query
// like DefaultTimestamp, and sets the value. It works the same way as
// USING TIMESTAMP in the query itself, but should not break prepared query
// optimization. The timestamp is in Unix microseconds.
//
// Only available on protocol >= 3
func (b *Batch) WithTimestamp(timestamp int64) *Batch {
func (b *Batch) WithTimestamp(timestampMicros int64) *Batch {
b.DefaultTimestamp(true)
b.defaultTimestampValue = timestamp
b.defaultTimestampValueMicros = timestampMicros
return b
}

Expand Down