Skip to content

Commit

Permalink
Refactor sync package and update dependencies and optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
diiyw committed May 20, 2024
1 parent 90fcf47 commit c5d4a20
Show file tree
Hide file tree
Showing 23 changed files with 418 additions and 102 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Nodis is a Redis implementation using the Golang programming language. This impl
| | | RENAMEEX | DECRBY | SRANDMEMBER | HMGET | LSET | ZREM |
| | | PERSIST | SETNX | SINTERSTORE | HMSET | LRANGE | ZREMRANGEBYRANK |
| | | PTTL | INCRBYFLOAT | SUNIONSTORE | HCLEAR | LPOPRPUSH | ZREMRANGEBYSCORE |
| | | | APPEND | | HSCAN | RPOPLPUSH | ZCLEAR |
| | | UNLINK | APPEND | | HSCAN | RPOPLPUSH | ZCLEAR |
| | | | GETRANGE | | HVALS | BLPOP | ZEXISTS |
| | | | STRLEN | | HSTRLEN | BRPOP | ZUNIONSTORE |
| | | | SETRANGE | | | | ZINTERSTORE |
Expand Down Expand Up @@ -141,7 +141,7 @@ func main() {
opt.Filesystem = &fs.Memory{}
opt.Synchronizer = sync.NewWebsocket()
n := nodis.Open(opt)
n.Stick([]string{"*"}, func(op *pb.Operation) {
n.WatchKey([]string{"*"}, func(op *pb.Operation) {
fmt.Println("Subscribe: ", op.Key)
})
err := n.Subscribe("ws://127.0.0.1:6380")
Expand Down
123 changes: 123 additions & 0 deletions geo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package nodis

import (
"math"

"github.com/diiyw/nodis/ds/zset"
"github.com/diiyw/nodis/geo"
"github.com/mmcloughlin/geohash"
)

var (
earthRadius = 6372797.560856
dr = math.Pi / 180.0
)

func (n *Nodis) GeoAdd(key string, members ...*geo.Member) (int64, error) {
var v int64
_ = n.exec(func(tx *Tx) error {
meta := tx.writeKey(key, n.newZSet)
for _, member := range members {
v += meta.value.(*zset.SortedSet).ZAdd(member.Name, float64(member.Hash()))
}
return nil
})
return v, nil
}

func (n *Nodis) GeoAddXX(key string, members ...*geo.Member) (int64, error) {
var v int64
_ = n.exec(func(tx *Tx) error {
meta := tx.writeKey(key, n.newZSet)
for _, member := range members {
v += meta.value.(*zset.SortedSet).ZAddXX(member.Name, float64(member.Hash()))
}
return nil
})
return v, nil
}

func (n *Nodis) GeoAddNX(key string, members ...*geo.Member) (int64, error) {
var v int64
_ = n.exec(func(tx *Tx) error {
meta := tx.writeKey(key, n.newZSet)
for _, member := range members {
v += meta.value.(*zset.SortedSet).ZAddNX(member.Name, float64(member.Hash()))
}
return nil
})
return v, nil
}

func (n *Nodis) GeoDist(key string, member1, member2 string) (float64, error) {
var v float64
err := n.exec(func(tx *Tx) error {
meta := tx.readKey(key)
if meta == nil {
return nil
}
score1, err := meta.value.(*zset.SortedSet).ZScore(member1)
if err != nil {
return err
}
score2, err := meta.value.(*zset.SortedSet).ZScore(member2)
if err != nil {
return err
}
lat1, lng1 := geohash.DecodeInt(uint64(score1))
lat2, lng2 := geohash.DecodeInt(uint64(score2))
v = distance(float64(lat1), float64(lng1), float64(lat2), float64(lng2))
return nil
})
return v, err
}

// distance computes the distance between two given coordinates in meter
func distance(latitude1, longitude1, latitude2, longitude2 float64) float64 {
radLat1 := latitude1 * dr
radLat2 := latitude2 * dr
a := radLat1 - radLat2
b := longitude1*dr - longitude2*dr
return 2 * earthRadius * math.Asin(math.Sqrt(math.Pow(math.Sin(a/2), 2)+
math.Cos(radLat1)*math.Cos(radLat2)*math.Pow(math.Sin(b/2), 2)))
}

func (n *Nodis) GeoHash(key string, members ...string) ([]string, error) {
var v []string
err := n.exec(func(tx *Tx) error {
meta := tx.readKey(key)
if meta == nil {
return nil
}
for _, member := range members {
score, err := meta.value.(*zset.SortedSet).ZScore(member)
if err != nil {
return err
}
lat, lng := geohash.DecodeInt(uint64(score))
v = append(v, geohash.Encode(lat, lng))
}
return nil
})
return v, err
}

func (n *Nodis) GeoPos(key string, members ...string) ([]*geo.Member, error) {
var v []*geo.Member
err := n.exec(func(tx *Tx) error {
meta := tx.readKey(key)
if meta == nil {
return nil
}
for _, member := range members {
score, err := meta.value.(*zset.SortedSet).ZScore(member)
if err != nil {
return err
}
lat, lng := geohash.DecodeInt(uint64(score))
v = append(v, &geo.Member{Name: member, Latitude: lat, Longitude: lng})
}
return nil
})
return v, err
}
37 changes: 37 additions & 0 deletions geo/geo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package geo

import (
"fmt"
"strconv"

"github.com/mmcloughlin/geohash"
)

type Member struct {
Longitude float64
Latitude float64
Name string
}

func (m *Member) Hash() uint64 {
return geohash.EncodeInt(m.Latitude, m.Longitude)
}

func Parse(name, lat, lon string) (*Member, error) {
l, err := strconv.ParseFloat(lat, 64)
if err != nil {
return nil, err
}
n, err := strconv.ParseFloat(lon, 64)
if err != nil {
return nil, err
}
if l < -90 || l > 90 || n < -180 || n > 180 {
return nil, fmt.Errorf("ERR invalid longitude,latitude pair %s,%s", lat, lon)
}
return &Member{
Latitude: l,
Longitude: n,
Name: name,
}, nil
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ require (
google.golang.org/protobuf v1.34.0
)

require golang.org/x/net v0.22.0 // indirect
require (
github.com/mmcloughlin/geohash v0.10.0 // indirect
golang.org/x/net v0.22.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/mmcloughlin/geohash v0.10.0 h1:9w1HchfDfdeLc+jFEf/04D27KP7E2QmpDu52wPbJWRE=
github.com/mmcloughlin/geohash v0.10.0/go.mod h1:oNZxQo5yWJh0eMQEP/8hwQuVx9Z9tjwFUqcTB1SmG0c=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
Expand Down
26 changes: 19 additions & 7 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/diiyw/nodis/ds"
"github.com/diiyw/nodis/ds/zset"
"github.com/diiyw/nodis/internal/strings"
"github.com/diiyw/nodis/redis"
"github.com/diiyw/nodis/utils"
)

func execCommand(conn *redis.Conn, fn func()) {
Expand Down Expand Up @@ -65,6 +65,8 @@ func getCommand(name string) func(n *Nodis, conn *redis.Conn, cmd redis.Command)
return info
case "DEL":
return del
case "UNLINK":
return unlink
case "EXISTS":
return exists
case "EXPIRE":
Expand Down Expand Up @@ -273,7 +275,7 @@ func client(n *Nodis, conn *redis.Conn, cmd redis.Command) {
return
}
execCommand(conn, func() {
switch utils.ToUpper(cmd.Args[0]) {
switch strings.ToUpper(cmd.Args[0]) {
case "LIST":
conn.WriteString("id=1 addr=" + conn.Network.RemoteAddr().String() + " fd=5 name= age=0 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=client")
case "SETNAME":
Expand All @@ -295,7 +297,7 @@ func config(n *Nodis, conn *redis.Conn, cmd redis.Command) {
conn.WriteError("CONFIG GET requires at least one argument")
return
}
if utils.ToUpper(cmd.Args[1]) == "DATABASES" {
if strings.ToUpper(cmd.Args[1]) == "DATABASES" {
conn.WriteArray(2)
conn.WriteBulk("databases")
conn.WriteBulk("0")
Expand Down Expand Up @@ -455,6 +457,16 @@ func del(n *Nodis, conn *redis.Conn, cmd redis.Command) {
})
}

func unlink(n *Nodis, conn *redis.Conn, cmd redis.Command) {
if len(cmd.Args) == 0 {
conn.WriteError("UNLINK requires at least one argument")
return
}
execCommand(conn, func() {
conn.WriteInteger(n.Unlink(cmd.Args...))
})
}

func exists(n *Nodis, conn *redis.Conn, cmd redis.Command) {
if len(cmd.Args) == 0 {
conn.WriteError("EXISTS requires at least one argument")
Expand Down Expand Up @@ -669,7 +681,7 @@ func scan(n *Nodis, conn *redis.Conn, cmd redis.Command) {
}
}
if cmd.Options.TYPE > 0 {
typ = ds.StringToDataType(utils.ToUpper(cmd.Args[cmd.Options.TYPE]))
typ = ds.StringToDataType(strings.ToUpper(cmd.Args[cmd.Options.TYPE]))
}
execCommand(conn, func() {
nextCursor, keys := n.Scan(cursor, match, count, typ)
Expand Down Expand Up @@ -1760,7 +1772,7 @@ func lInsert(n *Nodis, conn *redis.Conn, cmd redis.Command) {
}
execCommand(conn, func() {
key := cmd.Args[0]
before := utils.ToUpper(cmd.Args[1]) == "BEFORE"
before := strings.ToUpper(cmd.Args[1]) == "BEFORE"
pivot := []byte(cmd.Args[2])
value := []byte(cmd.Args[3])
conn.WriteInteger(n.LInsert(key, pivot, value, before))
Expand Down Expand Up @@ -2505,7 +2517,7 @@ func zUnionStore(n *Nodis, conn *redis.Conn, cmd redis.Command) {
aggregate = cmd.Args[cmd.Options.AGGREGATE]
}
execCommand(conn, func() {
n.ZUnionStore(destination, keys, weights, utils.ToUpper(aggregate))
n.ZUnionStore(destination, keys, weights, strings.ToUpper(aggregate))
conn.WriteInteger(n.ZCard(destination))
})
}
Expand Down Expand Up @@ -2542,7 +2554,7 @@ func zInterStore(n *Nodis, conn *redis.Conn, cmd redis.Command) {
aggregate = cmd.Args[cmd.Options.AGGREGATE]
}
execCommand(conn, func() {
n.ZInterStore(destination, keys, weights, utils.ToUpper(aggregate))
n.ZInterStore(destination, keys, weights, strings.ToUpper(aggregate))
conn.WriteInteger(n.ZCard(destination))
})
}
Expand Down
30 changes: 22 additions & 8 deletions hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ func (n *Nodis) HSet(key string, field string, value []byte) int64 {
meta := tx.writeKey(key, n.newHash)
v = meta.value.(*hash.HashMap).HSet(field, value)
n.signalModifiedKey(key, meta)
n.notify(pb.NewOp(pb.OpType_HSet, key).Fields(field).Value(value))
n.notify(func() []*pb.Op {
return []*pb.Op{pb.NewOp(pb.OpType_HSet, key).Fields(field).Value(value)}
})
return nil
})
return v
Expand Down Expand Up @@ -48,7 +50,9 @@ func (n *Nodis) HDel(key string, fields ...string) int64 {
tx.delKey(key)
}
n.signalModifiedKey(key, meta)
n.notify(pb.NewOp(pb.OpType_HDel, key).Fields(fields...))
n.notify(func() []*pb.Op {
return []*pb.Op{pb.NewOp(pb.OpType_HDel, key).Fields(fields...)}
})
return nil
})
return v
Expand Down Expand Up @@ -114,7 +118,9 @@ func (n *Nodis) HIncrBy(key string, field string, value int64) (int64, error) {
meta := tx.writeKey(key, n.newHash)
v, err = meta.value.(*hash.HashMap).HIncrBy(field, value)
n.signalModifiedKey(key, meta)
n.notify(pb.NewOp(pb.OpType_HIncrBy, key).Fields(field).IncrInt(value))
n.notify(func() []*pb.Op {
return []*pb.Op{pb.NewOp(pb.OpType_HIncrBy, key).Fields(field).IncrInt(value)}
})
return nil
})
return v, err
Expand All @@ -127,7 +133,9 @@ func (n *Nodis) HIncrByFloat(key string, field string, value float64) (float64,
meta := tx.writeKey(key, n.newHash)
v, err = meta.value.(*hash.HashMap).HIncrByFloat(field, value)
n.signalModifiedKey(key, meta)
n.notify(pb.NewOp(pb.OpType_HIncrByFloat, key).Fields(field).IncrFloat(value))
n.notify(func() []*pb.Op {
return []*pb.Op{pb.NewOp(pb.OpType_HIncrByFloat, key).Fields(field).IncrFloat(value)}
})
return err
})
return v, err
Expand All @@ -145,7 +153,9 @@ func (n *Nodis) HSetNX(key string, field string, value []byte) int64 {
}
v = meta.value.(*hash.HashMap).HSet(field, value)
n.signalModifiedKey(key, meta)
n.notify(pb.NewOp(pb.OpType_HSet, key).Fields(field).Value(value))
n.notify(func() []*pb.Op {
return []*pb.Op{pb.NewOp(pb.OpType_HSet, key).Fields(field).Value(value)}
})
return nil
})
return v
Expand All @@ -155,14 +165,18 @@ func (n *Nodis) HMSet(key string, fields map[string][]byte) int64 {
var v int64
_ = n.exec(func(tx *Tx) error {
meta := tx.writeKey(key, n.newHash)
var ops = make([]*pb.Op, 0, len(fields))
var v int64 = 0
for field, value := range fields {
v += meta.value.(*hash.HashMap).HSet(field, value)
ops = append(ops, pb.NewOp(pb.OpType_HSet, key).Fields(field).Value(value))
}
n.signalModifiedKey(key, meta)
n.notify(ops...)
n.notify(func() []*pb.Op {
var ops = make([]*pb.Op, 0, len(fields))
for field, value := range fields {
ops = append(ops, pb.NewOp(pb.OpType_HSet, key).Fields(field).Value(value))
}
return ops
})
meta.value.(*hash.HashMap).HMSet(fields)
return nil
})
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion utils/utils.go → internal/strings/strings.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package strings

import "unsafe"

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit c5d4a20

Please sign in to comment.