diff --git a/canal/canal.go b/canal/canal.go index 005c67050..9f2c7877f 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -18,6 +18,7 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" + "github.com/go-mysql-org/go-mysql/utils" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser" "github.com/siddontang/go-log/log" @@ -231,7 +232,7 @@ func (c *Canal) run() error { c.cancel() }() - c.master.UpdateTimestamp(uint32(time.Now().Unix())) + c.master.UpdateTimestamp(uint32(utils.Now().Unix())) if !c.dumped { c.dumped = true @@ -373,7 +374,7 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) { // if DiscardNoMetaRowEvent is true, we just log this error if c.cfg.DiscardNoMetaRowEvent { c.tableLock.Lock() - c.errorTablesGetTime[key] = time.Now() + c.errorTablesGetTime[key] = utils.Now() c.tableLock.Unlock() // log error and return ErrMissingTableMeta c.cfg.Logger.Errorf("canal get table meta err: %v", errors.Trace(err)) diff --git a/canal/config.go b/canal/config.go index 16692287d..ebc397654 100644 --- a/canal/config.go +++ b/canal/config.go @@ -14,6 +14,7 @@ import ( "github.com/go-mysql-org/go-mysql/client" "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/utils" ) type DumpConfig struct { @@ -142,7 +143,7 @@ func NewDefaultConfig() *Config { c.User = mysql.DEFAULT_USER c.Password = mysql.DEFAULT_PASSWORD c.Charset = mysql.DEFAULT_CHARSET - c.ServerID = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001 + c.ServerID = uint32(rand.New(rand.NewSource(utils.Now().Unix())).Intn(1000)) + 1001 c.Flavor = mysql.DEFAULT_FLAVOR c.Dump.ExecutionPath = mysql.DEFAULT_DUMP_EXECUTION_PATH diff --git a/canal/dump.go b/canal/dump.go index 3a6a9e9db..91b303976 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -9,6 +9,7 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/schema" + "github.com/go-mysql-org/go-mysql/utils" "github.com/pingcap/errors" "github.com/shopspring/decimal" ) @@ -142,7 +143,7 @@ func (c *Canal) dump() error { return errors.New("mysqldump does not exist") } - c.master.UpdateTimestamp(uint32(time.Now().Unix())) + c.master.UpdateTimestamp(uint32(utils.Now().Unix())) h := &dumpParseHandler{c: c} // If users call StartFromGTID with empty position to start dumping with gtid, @@ -167,7 +168,7 @@ func (c *Canal) dump() error { h.pos = uint64(pos.Pos) } - start := time.Now() + start := utils.Now() c.cfg.Logger.Info("try dump MySQL and parse") if err := c.dumper.DumpAndParse(h); err != nil { return errors.Trace(err) diff --git a/canal/sync.go b/canal/sync.go index e9a62e76e..9635bc61a 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -7,6 +7,7 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" + "github.com/go-mysql-org/go-mysql/utils" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser/ast" ) @@ -253,7 +254,7 @@ func (c *Canal) updateTable(header *replication.EventHeader, db, table string) ( } func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) { var newDelay uint32 - now := uint32(time.Now().Unix()) + now := uint32(utils.Now().Unix()) if now >= ev.Header.Timestamp { newDelay = now - ev.Header.Timestamp } diff --git a/client/pool.go b/client/pool.go index 6e5d6dc21..a3c306f7e 100644 --- a/client/pool.go +++ b/client/pool.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/go-mysql-org/go-mysql/utils" "github.com/pingcap/errors" ) @@ -247,7 +248,7 @@ func (pool *Pool) putConnection(connection Connection) { } func (pool *Pool) nowTs() Timestamp { - return Timestamp(time.Now().Unix()) + return Timestamp(utils.Now().Unix()) } func (pool *Pool) getConnection(ctx context.Context) (Connection, error) { @@ -553,7 +554,7 @@ func (pool *Pool) startNewConnections(count int) { } func (pool *Pool) ping(conn *Conn) error { - deadline := time.Now().Add(100 * time.Millisecond) + deadline := utils.Now().Add(100 * time.Millisecond) _ = conn.SetDeadline(deadline) err := conn.Ping() if err != nil { diff --git a/packet/conn.go b/packet/conn.go index d6a8f36b5..1d49848f3 100644 --- a/packet/conn.go +++ b/packet/conn.go @@ -150,7 +150,7 @@ func (c *Conn) ReadPacketReuseMem(dst []byte) ([]byte, error) { // newCompressedPacketReader creates a new compressed packet reader. func (c *Conn) newCompressedPacketReader() (io.Reader, error) { if c.readTimeout != 0 { - if err := c.SetReadDeadline(time.Now().Add(c.readTimeout)); err != nil { + if err := c.SetReadDeadline(utils.Now().Add(c.readTimeout)); err != nil { return nil, err } } @@ -200,7 +200,7 @@ func (c *Conn) copyN(dst io.Writer, n int64) (int64, error) { // Call ReadAtLeast with the currentPacketReader as it may change on every iteration // of this loop. if c.readTimeout != 0 { - if err := c.SetReadDeadline(time.Now().Add(c.readTimeout)); err != nil { + if err := c.SetReadDeadline(utils.Now().Add(c.readTimeout)); err != nil { return written, err } } @@ -344,7 +344,7 @@ func (c *Conn) WritePacket(data []byte) error { func (c *Conn) writeWithTimeout(b []byte) (n int, err error) { if c.writeTimeout != 0 { - if err := c.SetWriteDeadline(time.Now().Add(c.writeTimeout)); err != nil { + if err := c.SetWriteDeadline(utils.Now().Add(c.writeTimeout)); err != nil { return n, err } } diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index d15441dce..ccbfb9f64 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -19,6 +19,7 @@ import ( "github.com/go-mysql-org/go-mysql/client" . "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/utils" ) var ( @@ -227,7 +228,7 @@ func (b *BinlogSyncer) close() { b.cancel() if b.c != nil { - err := b.c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + err := b.c.SetReadDeadline(utils.Now().Add(100 * time.Millisecond)) if err != nil { b.cfg.Logger.Warnf(`could not set read deadline: %s`, err) } @@ -288,7 +289,7 @@ func (b *BinlogSyncer) registerSlave() error { //set read timeout if b.cfg.ReadTimeout > 0 { - _ = b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout)) + _ = b.c.SetReadDeadline(utils.Now().Add(b.cfg.ReadTimeout)) } if b.cfg.RecvBufferSize > 0 { @@ -791,7 +792,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { //set read timeout if b.cfg.ReadTimeout > 0 { - _ = b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout)) + _ = b.c.SetReadDeadline(utils.Now().Add(b.cfg.ReadTimeout)) } // Reset retry count on successful packet receieve diff --git a/replication/replication_test.go b/replication/replication_test.go index 3a64df744..d8bdb3b5e 100644 --- a/replication/replication_test.go +++ b/replication/replication_test.go @@ -17,6 +17,7 @@ import ( "github.com/go-mysql-org/go-mysql/client" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/test_util" + "github.com/go-mysql-org/go-mysql/utils" ) var testOutputLogs = flag.Bool("out", false, "output binlog event") @@ -335,7 +336,7 @@ func (t *testSyncerSuite) testPositionSync() { // Test re-sync. time.Sleep(100 * time.Millisecond) - _ = t.b.c.SetReadDeadline(time.Now().Add(time.Millisecond)) + _ = t.b.c.SetReadDeadline(utils.Now().Add(time.Millisecond)) time.Sleep(100 * time.Millisecond) t.testSync(s) diff --git a/server/caching_sha2_cache_test.go b/server/caching_sha2_cache_test.go index 247e1870f..5b99efc84 100644 --- a/server/caching_sha2_cache_test.go +++ b/server/caching_sha2_cache_test.go @@ -18,6 +18,7 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/test_util" "github.com/go-mysql-org/go-mysql/test_util/test_keys" + "github.com/go-mysql-org/go-mysql/utils" ) var delay = 50 @@ -131,13 +132,13 @@ func (s *cacheTestSuite) runSelect() { func (s *cacheTestSuite) TestCache() { // first connection - t1 := time.Now() + t1 := utils.Now() var err error s.db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?tls=%s", *testUser, *testPassword, s.serverAddr, *testDB, s.tlsPara)) require.NoError(s.T(), err) s.db.SetMaxIdleConns(4) s.runSelect() - t2 := time.Now() + t2 := utils.Now() d1 := int(t2.Sub(t1).Nanoseconds() / 1e6) //log.Debugf("first connection took %d milliseconds", d1) @@ -149,12 +150,12 @@ func (s *cacheTestSuite) TestCache() { } // second connection - t3 := time.Now() + t3 := utils.Now() s.db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?tls=%s", *testUser, *testPassword, s.serverAddr, *testDB, s.tlsPara)) require.NoError(s.T(), err) s.db.SetMaxIdleConns(4) s.runSelect() - t4 := time.Now() + t4 := utils.Now() d2 := int(t4.Sub(t3).Nanoseconds() / 1e6) //log.Debugf("second connection took %d milliseconds", d2) diff --git a/server/ssl.go b/server/ssl.go index 42b31756c..c08b7d9fd 100644 --- a/server/ssl.go +++ b/server/ssl.go @@ -8,7 +8,8 @@ import ( "crypto/x509/pkix" "encoding/pem" "math/big" - "time" + + "github.com/go-mysql-org/go-mysql/utils" ) // NewServerTLSConfig: generate TLS config for server side @@ -75,8 +76,8 @@ func generateAndSignRSACerts(caPem, caKey []byte) ([]byte, []byte) { StreetAddress: []string{"ADDRESS"}, PostalCode: []string{"POSTAL_CODE"}, }, - NotBefore: time.Now(), - NotAfter: time.Now().AddDate(10, 0, 0), + NotBefore: utils.Now(), + NotAfter: utils.Now().AddDate(10, 0, 0), SubjectKeyId: []byte{1, 2, 3, 4, 6}, ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, KeyUsage: x509.KeyUsageDigitalSignature, @@ -112,8 +113,8 @@ func generateCA() ([]byte, []byte) { StreetAddress: []string{"ADDRESS"}, PostalCode: []string{"POSTAL_CODE"}, }, - NotBefore: time.Now(), - NotAfter: time.Now().AddDate(10, 0, 0), + NotBefore: utils.Now(), + NotAfter: utils.Now().AddDate(10, 0, 0), IsCA: true, ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign | x509.KeyUsageKeyEncipherment, diff --git a/utils/now.go b/utils/now.go new file mode 100644 index 000000000..f7a64ba8c --- /dev/null +++ b/utils/now.go @@ -0,0 +1,7 @@ +//go:build !unix + +package utils + +import "time" + +var Now = time.Now diff --git a/utils/now_unix.go b/utils/now_unix.go new file mode 100644 index 000000000..c2e6f5dc4 --- /dev/null +++ b/utils/now_unix.go @@ -0,0 +1,19 @@ +//go:build unix + +package utils + +import ( + "syscall" + "time" +) + +// Now is a faster method to get current time +func Now() time.Time { + var tv syscall.Timeval + if err := syscall.Gettimeofday(&tv); err != nil { + // If it failed at syscall, use time package instead + return time.Now() + } + + return time.Unix(0, syscall.TimevalToNsec(tv)) +} diff --git a/utils/now_unix_test.go b/utils/now_unix_test.go new file mode 100644 index 000000000..bb89f894c --- /dev/null +++ b/utils/now_unix_test.go @@ -0,0 +1,41 @@ +//go:build unix + +package utils + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestCustomTimeNow(t *testing.T) { + precision := time.Millisecond + + for i := 0; i < 1000; i++ { + timestamp := time.Now() + customTimestamp := Now() + + // two timestamp should within 1 percistion + assert.WithinDuration(t, timestamp, customTimestamp, precision, fmt.Sprintf("Loop: %d: customTimestamp should within %s. timestamp: %d, customTimestamp: %d", i, precision.String(), timestamp.UnixNano(), customTimestamp.UnixNano())) + + time.Sleep(time.Nanosecond) + } +} + +func BenchmarkGoTimeNow(t *testing.B) { + t.ResetTimer() + for n := 0; n < t.N; n++ { + _ = time.Now() + } + t.StopTimer() +} + +func BenchmarkCustomTimeNow(t *testing.B) { + t.ResetTimer() + for n := 0; n < t.N; n++ { + _ = Now() + } + t.StopTimer() +}