diff --git a/docs/src/content/docs/commands/COPY.md b/docs/src/content/docs/commands/COPY.md index 7a25d9b4d..3831f2341 100644 --- a/docs/src/content/docs/commands/COPY.md +++ b/docs/src/content/docs/commands/COPY.md @@ -12,7 +12,8 @@ COPY [DB destination-db] [REPLACE] ``` ## Parameters -|Parameter | Description | Type | Required| + +|Parameter | Description | Type | Required| |----------|-------------|------|:---------:| | source |The key of the value you want to copy. This key must exist. | String | Yes| | destination | The key where the value will be copied to. This key must not exist unless the `REPLACE` option is specified. | String | Yes | @@ -21,10 +22,11 @@ COPY [DB destination-db] [REPLACE] ## Return Value + | Condition | Return Value | |-----------| :-------------:| | key was copied successfully | `1` | -| key was not copied | `0` | +| key was not copied | `0` | ## Behaviour @@ -41,10 +43,10 @@ When the `COPY` command is executed, DiceDB will: The `COPY` command can raise the following errors: -1. `Wrong type of value or key`: +1. `Wrong type of value or key`: - Error message `(error) WRONGTYPE Operation against a key holding the wrong kind of value`: - This error occurs if the source key holds a value that is not compatible with the `COPY` operation. -2. `Not existent key`: +2. `Not existent key`: - Error message: `(error) ERR no such key` - This error occurs if the source key does not exist. - This error occurs if the destination key already exists and the `REPLACE` option is not specified. @@ -78,6 +80,7 @@ Copy the value from `key1` to `key2`, replacing `key2` if it already exists. ``` ### Destination key already exist + ```bash 127.0.0.1:7379> COPY key1 key2 (integer) 1 @@ -85,6 +88,7 @@ Copy the value from `key1` to `key2`, replacing `key2` if it already exists. (integer) 0 ``` + ## Notes - The `COPY` command is available starting from DiceDB version 6.2. @@ -92,4 +96,3 @@ Copy the value from `key1` to `key2`, replacing `key2` if it already exists. - The `COPY` command does not modify the source key; it only duplicates its value to the destination key. - The `DB destinationDB` is not yet supported with the current versions of DiceDB. It acts as a placeholder for now. It will still return `0` By understanding the `COPY` command and its parameters, you can effectively duplicate keys within your DiceDB databases, ensuring data consistency and reducing the need for manual data manipulation. - diff --git a/integration_tests/commands/async/copy_test.go b/integration_tests/commands/resp/copy_test.go similarity index 99% rename from integration_tests/commands/async/copy_test.go rename to integration_tests/commands/resp/copy_test.go index 3fad1db47..c1fd243b1 100644 --- a/integration_tests/commands/async/copy_test.go +++ b/integration_tests/commands/resp/copy_test.go @@ -1,4 +1,4 @@ -package async +package resp import ( "testing" diff --git a/integration_tests/commands/async/mget_test.go b/integration_tests/commands/resp/mget_test.go similarity index 99% rename from integration_tests/commands/async/mget_test.go rename to integration_tests/commands/resp/mget_test.go index 715c372ee..2df676e08 100644 --- a/integration_tests/commands/async/mget_test.go +++ b/integration_tests/commands/resp/mget_test.go @@ -1,4 +1,4 @@ -package async +package resp import ( "testing" diff --git a/integration_tests/commands/async/mset_test.go b/integration_tests/commands/resp/mset_test.go similarity index 99% rename from integration_tests/commands/async/mset_test.go rename to integration_tests/commands/resp/mset_test.go index d0d8ba647..0e82a179a 100644 --- a/integration_tests/commands/async/mset_test.go +++ b/integration_tests/commands/resp/mset_test.go @@ -1,4 +1,4 @@ -package async +package resp import ( "testing" diff --git a/internal/cmd/cmds.go b/internal/cmd/cmds.go index 82270029e..60ddb88bf 100644 --- a/internal/cmd/cmds.go +++ b/internal/cmd/cmds.go @@ -5,11 +5,36 @@ import ( "strings" "github.com/dgryski/go-farm" + "github.com/dicedb/dice/internal/object" ) +// DiceDBCmd represents a command structure to be executed +// within a DiceDB system. This struct emulates the way DiceDB commands +// are structured, including the command itself, additional arguments, +// and an optional object to store or manipulate. type DiceDBCmd struct { - Cmd string + // Cmd represents the command to execute (e.g., "SET", "GET", "DEL"). + // This is the main command keyword that specifies the action to perform + // in DiceDB. For example: + // - "SET": To store a value. + // - "GET": To retrieve a value. + // - "DEL": To delete a value. + // - "EXPIRE": To set a time-to-live for a key. + Cmd string + + // Args holds any additional parameters required by the command. + // For example: + // - If Cmd is "SET", Args might contain ["key", "value"]. + // - If Cmd is "EXPIRE", Args might contain ["key", "seconds"]. + // This slice allows flexible support for commands with variable arguments. Args []string + + // Obj is a pointer to an ExtendedObj, representing an optional data structure + // associated with the command. This contains pointer to the underlying simple + // types such as int, string or even complex types + // like hashes, sets, or sorted sets, which are stored and manipulated as objects. + // WARN: This parameter should be used with caution + Obj *object.ExtendedObj } type RedisCmds struct { diff --git a/internal/errors/migrated_errors.go b/internal/errors/migrated_errors.go index d9a8dc864..e513bfb9a 100644 --- a/internal/errors/migrated_errors.go +++ b/internal/errors/migrated_errors.go @@ -71,3 +71,11 @@ var ( return fmt.Errorf("ERR wrong type of path value - expected %s but found %s", expectedType, actualType) // Signals an unexpected type received when an integer was expected. } ) + +type PreProcessError struct { + Result interface{} +} + +func (e *PreProcessError) Error() string { + return fmt.Sprintf("%v", e.Result) +} diff --git a/internal/eval/commands.go b/internal/eval/commands.go index 2765d7313..5dc43e280 100644 --- a/internal/eval/commands.go +++ b/internal/eval/commands.go @@ -3,6 +3,7 @@ package eval import ( "strings" + "github.com/dicedb/dice/internal/cmd" dstore "github.com/dicedb/dice/internal/store" ) @@ -29,6 +30,8 @@ type DiceCmdMeta struct { // will utilize this function for evaluation, allowing for better handling of // complex command execution scenarios and improved response consistency. NewEval func([]string, *dstore.Store) *EvalResponse + + StoreObjectEval func(*cmd.DiceDBCmd, *dstore.Store) *EvalResponse } type KeySpecs struct { @@ -38,8 +41,11 @@ type KeySpecs struct { } var ( - DiceCmds = map[string]DiceCmdMeta{} + PreProcessing = map[string]func([]string, *dstore.Store) *EvalResponse{} + DiceCmds = map[string]DiceCmdMeta{} +) +var ( echoCmdMeta = DiceCmdMeta{ Name: "ECHO", Info: `ECHO returns the string given as argument.`, @@ -603,12 +609,24 @@ var ( Info: "PERSIST removes the expiration from a key", Eval: evalPersist, } + + //TODO: supports only http protocol, needs to be removed once http is migrated to multishard copyCmdMeta = DiceCmdMeta{ Name: "COPY", Info: `COPY command copies the value stored at the source key to the destination key.`, Eval: evalCOPY, Arity: -2, } + + //TODO: supports only http protocol, needs to be removed once http is migrated to multishard + objectCopyCmdMeta = DiceCmdMeta{ + Name: "OBJECTCOPY", + Info: `COPY command copies the value stored at the source key to the destination key.`, + StoreObjectEval: evalCOPYObject, + IsMigrated: true, + Arity: -2, + } + decrCmdMeta = DiceCmdMeta{ Name: "DECR", Info: `DECR decrements the value of the specified key in args by 1, @@ -1299,6 +1317,9 @@ var ( ) func init() { + PreProcessing["COPY"] = evalGetObject + PreProcessing["RENAME"] = evalGET + DiceCmds["ABORT"] = abortCmdMeta DiceCmds["APPEND"] = appendCmdMeta DiceCmds["AUTH"] = authCmdMeta @@ -1322,6 +1343,7 @@ func init() { DiceCmds["COMMAND|DOCS"] = commandDocsCmdMeta DiceCmds["COMMAND|GETKEYSANDFLAGS"] = commandGetKeysAndFlagsCmdMeta DiceCmds["COPY"] = copyCmdMeta + DiceCmds["OBJECTCOPY"] = objectCopyCmdMeta DiceCmds["DBSIZE"] = dbSizeCmdMeta DiceCmds["DECR"] = decrCmdMeta DiceCmds["DECRBY"] = decrByCmdMeta diff --git a/internal/eval/eval.go b/internal/eval/eval.go index f0370fbbb..a4e7bcdfa 100644 --- a/internal/eval/eval.go +++ b/internal/eval/eval.go @@ -2157,7 +2157,7 @@ func evalCOPY(args []string, store *dstore.Store) []byte { for i := 2; i < len(args); i++ { arg := strings.ToUpper(args[i]) - if arg == "REPLACE" { + if arg == dstore.Replace { isReplace = true } } diff --git a/internal/eval/execute.go b/internal/eval/execute.go index 35cdda04e..939757188 100644 --- a/internal/eval/execute.go +++ b/internal/eval/execute.go @@ -11,18 +11,50 @@ import ( dstore "github.com/dicedb/dice/internal/store" ) -func ExecuteCommand(c *cmd.DiceDBCmd, client *comm.Client, store *dstore.Store, httpOp, websocketOp bool) *EvalResponse { - diceCmd, ok := DiceCmds[c.Cmd] +type Eval struct { + cmd *cmd.DiceDBCmd + client *comm.Client + store *dstore.Store + isHTTPOperation bool + isWebSocketOperation bool + isPreprocessOperation bool +} + +func NewEval(c *cmd.DiceDBCmd, client *comm.Client, store *dstore.Store, httpOp, websocketOp, preProcessing bool) *Eval { + return &Eval{ + cmd: c, + client: client, + store: store, + isHTTPOperation: httpOp, + isWebSocketOperation: websocketOp, + isPreprocessOperation: preProcessing, + } +} + +func (e *Eval) PreProcessCommand() *EvalResponse { + if f, ok := PreProcessing[e.cmd.Cmd]; ok { + return f(e.cmd.Args, e.store) + } + return &EvalResponse{Result: nil, Error: diceerrors.ErrInternalServer} +} + +func (e *Eval) ExecuteCommand() *EvalResponse { + diceCmd, ok := DiceCmds[e.cmd.Cmd] if !ok { - return &EvalResponse{Result: diceerrors.NewErrWithFormattedMessage("unknown command '%s', with args beginning with: %s", c.Cmd, strings.Join(c.Args, " ")), Error: nil} + return &EvalResponse{Result: diceerrors.NewErrWithFormattedMessage("unknown command '%s', with args beginning with: %s", e.cmd.Cmd, strings.Join(e.cmd.Args, " ")), Error: nil} } // Temporary logic till we move all commands to new eval logic. // MigratedDiceCmds map contains refactored eval commands // For any command we will first check in the existing map // if command is NA then we will check in the new map + if diceCmd.IsMigrated { - return diceCmd.NewEval(c.Args, store) + if e.cmd.Obj != nil { + return diceCmd.StoreObjectEval(e.cmd, e.store) + } + + return diceCmd.NewEval(e.cmd.Args, e.store) } // The following commands could be handled at the shard level, however, we can randomly let any shard handle them @@ -31,14 +63,14 @@ func ExecuteCommand(c *cmd.DiceDBCmd, client *comm.Client, store *dstore.Store, // Old implementation kept as it is, but we will be moving // to the new implementation soon for all commands case "SUBSCRIBE", "Q.WATCH": - return &EvalResponse{Result: EvalQWATCH(c.Args, httpOp, websocketOp, client, store), Error: nil} + return &EvalResponse{Result: EvalQWATCH(e.cmd.Args, e.isHTTPOperation, e.isWebSocketOperation, e.client, e.store), Error: nil} case "UNSUBSCRIBE", "Q.UNWATCH": - return &EvalResponse{Result: EvalQUNWATCH(c.Args, httpOp, client), Error: nil} + return &EvalResponse{Result: EvalQUNWATCH(e.cmd.Args, e.isHTTPOperation, e.client), Error: nil} case auth.Cmd: - return &EvalResponse{Result: EvalAUTH(c.Args, client), Error: nil} + return &EvalResponse{Result: EvalAUTH(e.cmd.Args, e.client), Error: nil} case "ABORT": return &EvalResponse{Result: clientio.RespOK, Error: nil} default: - return &EvalResponse{Result: diceCmd.Eval(c.Args, store), Error: nil} + return &EvalResponse{Result: diceCmd.Eval(e.cmd.Args, e.store), Error: nil} } } diff --git a/internal/eval/store_eval.go b/internal/eval/store_eval.go index 892ed0d18..b9a4fe0f3 100644 --- a/internal/eval/store_eval.go +++ b/internal/eval/store_eval.go @@ -10,6 +10,7 @@ import ( "github.com/axiomhq/hyperloglog" "github.com/bytedance/sonic" "github.com/dicedb/dice/internal/clientio" + "github.com/dicedb/dice/internal/cmd" diceerrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/eval/sortedset" "github.com/dicedb/dice/internal/object" @@ -36,169 +37,169 @@ import ( // Returns encoded OK RESP once new entry is added // If the key already exists then the value will be overwritten and expiry will be discarded func evalSET(args []string, store *dstore.Store) *EvalResponse { - if len(args) <= 1 { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrWrongArgumentCount("SET"), - } - } - - var key, value string - var exDurationMs int64 = -1 - var state exDurationState = Uninitialized - var keepttl bool = false - + if len(args) <= 1 { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrWrongArgumentCount("SET"), + } + } + + var key, value string + var exDurationMs int64 = -1 + var state exDurationState = Uninitialized + var keepttl bool = false + key, value = args[0], args[1] - oType, oEnc := deduceTypeEncoding(value) - - for i := 2; i < len(args); i++ { - arg := strings.ToUpper(args[i]) - switch arg { - case Ex, Px: - if state != Uninitialized { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrSyntax, - } - } - if keepttl { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrSyntax, - } - } - i++ - if i == len(args) { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrSyntax, - } - } - - exDuration, err := strconv.ParseInt(args[i], 10, 64) - if err != nil { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrIntegerOutOfRange, - } - } - - if exDuration <= 0 || exDuration >= maxExDuration { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrInvalidExpireTime("SET"), - } - } - - // converting seconds to milliseconds - if arg == Ex { - exDuration *= 1000 - } - exDurationMs = exDuration - state = Initialized - - case Pxat, Exat: - if state != Uninitialized { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrSyntax, - } - } - if keepttl { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrSyntax, - } - } - i++ - if i == len(args) { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrSyntax, - } - } - exDuration, err := strconv.ParseInt(args[i], 10, 64) - if err != nil { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrIntegerOutOfRange, - } - } - - if exDuration < 0 { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrInvalidExpireTime("SET"), - } - } - - if arg == Exat { - exDuration *= 1000 - } - exDurationMs = exDuration - utils.GetCurrentTime().UnixMilli() - // If the expiry time is in the past, set exDurationMs to 0 - // This will be used to signal immediate expiration - if exDurationMs < 0 { - exDurationMs = 0 - } - state = Initialized - - case XX: - // Get the key from the hash table - obj := store.Get(key) - - // if key does not exist, return RESP encoded nil - if obj == nil { - return &EvalResponse{ - Result: clientio.NIL, - Error: nil, - } - } - case NX: - obj := store.Get(key) - if obj != nil { - return &EvalResponse{ - Result: clientio.NIL, - Error: nil, - } - } - case KeepTTL: - if state != Uninitialized { - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrSyntax, - } - } - keepttl = true - default: - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrSyntax, - } - } - } - - // Cast the value properly based on the encoding type - var storedValue interface{} - switch oEnc { - case object.ObjEncodingInt: - storedValue, _ = strconv.ParseInt(value, 10, 64) - case object.ObjEncodingEmbStr, object.ObjEncodingRaw: - storedValue = value - default: - return &EvalResponse{ - Result: nil, - Error: diceerrors.ErrUnsupportedEncoding(int(oEnc)), - } - } - - // putting the k and value in a Hash Table - store.Put(key, store.NewObj(storedValue, exDurationMs, oType, oEnc), dstore.WithKeepTTL(keepttl)) - - return &EvalResponse{ - Result: clientio.OK, - Error: nil, - } + oType, oEnc := deduceTypeEncoding(value) + + for i := 2; i < len(args); i++ { + arg := strings.ToUpper(args[i]) + switch arg { + case Ex, Px: + if state != Uninitialized { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrSyntax, + } + } + if keepttl { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrSyntax, + } + } + i++ + if i == len(args) { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrSyntax, + } + } + + exDuration, err := strconv.ParseInt(args[i], 10, 64) + if err != nil { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrIntegerOutOfRange, + } + } + + if exDuration <= 0 || exDuration >= maxExDuration { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrInvalidExpireTime("SET"), + } + } + + // converting seconds to milliseconds + if arg == Ex { + exDuration *= 1000 + } + exDurationMs = exDuration + state = Initialized + + case Pxat, Exat: + if state != Uninitialized { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrSyntax, + } + } + if keepttl { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrSyntax, + } + } + i++ + if i == len(args) { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrSyntax, + } + } + exDuration, err := strconv.ParseInt(args[i], 10, 64) + if err != nil { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrIntegerOutOfRange, + } + } + + if exDuration < 0 { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrInvalidExpireTime("SET"), + } + } + + if arg == Exat { + exDuration *= 1000 + } + exDurationMs = exDuration - utils.GetCurrentTime().UnixMilli() + // If the expiry time is in the past, set exDurationMs to 0 + // This will be used to signal immediate expiration + if exDurationMs < 0 { + exDurationMs = 0 + } + state = Initialized + + case XX: + // Get the key from the hash table + obj := store.Get(key) + + // if key does not exist, return RESP encoded nil + if obj == nil { + return &EvalResponse{ + Result: clientio.NIL, + Error: nil, + } + } + case NX: + obj := store.Get(key) + if obj != nil { + return &EvalResponse{ + Result: clientio.NIL, + Error: nil, + } + } + case KeepTTL: + if state != Uninitialized { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrSyntax, + } + } + keepttl = true + default: + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrSyntax, + } + } + } + + // Cast the value properly based on the encoding type + var storedValue interface{} + switch oEnc { + case object.ObjEncodingInt: + storedValue, _ = strconv.ParseInt(value, 10, 64) + case object.ObjEncodingEmbStr, object.ObjEncodingRaw: + storedValue = value + default: + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrUnsupportedEncoding(int(oEnc)), + } + } + + // putting the k and value in a Hash Table + store.Put(key, store.NewObj(storedValue, exDurationMs, oType, oEnc), dstore.WithKeepTTL(keepttl)) + + return &EvalResponse{ + Result: clientio.OK, + Error: nil, + } } // evalGET returns the value for the queried key in args @@ -2973,3 +2974,89 @@ func evalJSONOBJKEYS(args []string, store *dstore.Store) *EvalResponse { Error: nil, } } + +func evalGetObject(args []string, store *dstore.Store) *EvalResponse { + if len(args) != 1 { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrInternalServer, + } + } + + key := args[0] + + obj := store.Get(key) + + // if key does not exist, return RESP encoded nil + if obj == nil { + return &EvalResponse{ + Result: clientio.IntegerZero, + Error: nil, + } + } + + exp, ok := dstore.GetExpiry(obj, store) + var exDurationMs int64 = -1 + if ok { + exDurationMs = int64(exp - uint64(utils.GetCurrentTime().UnixMilli())) + } + + exObj := &object.ExtendedObj{ + Obj: obj, + ExDuration: exDurationMs, + } + + // Decode and return the value based on its encoding + return &EvalResponse{ + Result: exObj, + Error: nil, + } +} + +// evalSetObject stores an object in the store with a given key and optional expiry. +// If an object with the same key exists, it is replaced. +// This function is usually specifc to multishard multi-op commands +func evalCOPYObject(cd *cmd.DiceDBCmd, store *dstore.Store) *EvalResponse { + args := cd.Args + + var isReplace bool + if len(cd.Args) > 1 { + if cd.Args[1] == dstore.Replace { + isReplace = true + } + } + + key := args[0] + + obj := store.Get(key) + + if !isReplace && obj != nil { + return &EvalResponse{ + Result: nil, + Error: diceerrors.ErrGeneral("ERR target key already exists"), + } + } + + store.Del(key) + + copyObj := cd.Obj.Obj.DeepCopy() + if copyObj == nil { + return &EvalResponse{ + Result: clientio.IntegerZero, + Error: nil, + } + } + + exDurationMs := cd.Obj.ExDuration + + store.Put(key, copyObj) + + if exDurationMs > 0 { + store.SetExpiry(copyObj, exDurationMs) + } + + return &EvalResponse{ + Result: clientio.IntegerOne, + Error: nil, + } +} diff --git a/internal/object/object.go b/internal/object/object.go index fdc736e8c..b2b726409 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -11,6 +11,11 @@ type Obj struct { Value interface{} } +type ExtendedObj struct { + Obj *Obj + ExDuration int64 +} + var ObjTypeString uint8 = 0 << 4 var ObjEncodingRaw uint8 = 0 diff --git a/internal/shard/shard_thread.go b/internal/shard/shard_thread.go index 1e7b5be26..a556482e0 100644 --- a/internal/shard/shard_thread.go +++ b/internal/shard/shard_thread.go @@ -96,8 +96,6 @@ func (shard *ShardThread) unregisterWorker(workerID string) { // processRequest processes a Store operation for the shard. func (shard *ShardThread) processRequest(op *ops.StoreOp) { - resp := eval.ExecuteCommand(op.Cmd, op.Client, shard.store, op.HTTPOp, op.WebsocketOp) - shard.workerMutex.RLock() workerChans, ok := shard.workerMap[op.WorkerID] shard.workerMutex.RUnlock() @@ -110,6 +108,16 @@ func (shard *ShardThread) processRequest(op *ops.StoreOp) { SeqID: op.SeqID, } + e := eval.NewEval(op.Cmd, op.Client, shard.store, op.HTTPOp, op.WebsocketOp, op.PreProcessing) + + if op.PreProcessing { + resp := e.PreProcessCommand() + sp.EvalResponse = resp + preProcessChan <- sp + return + } + + resp := e.ExecuteCommand() if ok { sp.EvalResponse = resp } else { @@ -119,11 +127,7 @@ func (shard *ShardThread) processRequest(op *ops.StoreOp) { } } - if op.PreProcessing { - preProcessChan <- sp - } else { - workerChan <- sp - } + workerChan <- sp } // cleanup handles cleanup logic when the shard stops. diff --git a/internal/store/constants.go b/internal/store/constants.go index 9f5d9df81..ba168b494 100644 --- a/internal/store/constants.go +++ b/internal/store/constants.go @@ -1,10 +1,11 @@ package store const ( - Set string = "SET" - Del string = "DEL" - Get string = "GET" - Rename string = "RENAME" - ZAdd string = "ZADD" - ZRange string = "ZRANGE" + Set string = "SET" + Del string = "DEL" + Get string = "GET" + Rename string = "RENAME" + ZAdd string = "ZADD" + ZRange string = "ZRANGE" + Replace string = "REPLACE" ) diff --git a/internal/worker/cmd_compose.go b/internal/worker/cmd_compose.go index c42784569..a8a742e8b 100644 --- a/internal/worker/cmd_compose.go +++ b/internal/worker/cmd_compose.go @@ -45,7 +45,7 @@ func composeCopy(responses ...ops.StoreResponse) interface{} { } } - return clientio.OK + return clientio.IntegerOne } // composeMSet processes responses from multiple shards for an "MSet" operation diff --git a/internal/worker/cmd_decompose.go b/internal/worker/cmd_decompose.go index e5035dff2..9b902a900 100644 --- a/internal/worker/cmd_decompose.go +++ b/internal/worker/cmd_decompose.go @@ -4,8 +4,11 @@ import ( "context" "log/slog" + "github.com/dicedb/dice/internal/clientio" "github.com/dicedb/dice/internal/cmd" diceerrors "github.com/dicedb/dice/internal/errors" + "github.com/dicedb/dice/internal/object" + "github.com/dicedb/dice/internal/ops" "github.com/dicedb/dice/internal/store" ) @@ -63,32 +66,36 @@ func decomposeRename(ctx context.Context, w *BaseWorker, cd *cmd.DiceDBCmd) ([]* // sets the value to the destination key using a SET command. func decomposeCopy(ctx context.Context, w *BaseWorker, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) { // Waiting for GET command response - var val string + var resp *ops.StoreResponse select { case <-ctx.Done(): slog.Error("Timed out waiting for response from shards", slog.String("workerID", w.id), slog.Any("error", ctx.Err())) case preProcessedResp, ok := <-w.preprocessingChan: if ok { - evalResp := preProcessedResp.EvalResponse - if evalResp.Error != nil { - return nil, evalResp.Error - } - - val = evalResp.Result.(string) + resp = preProcessedResp } } - if len(cd.Args) != 2 { + if resp.EvalResponse.Error != nil || resp.EvalResponse.Result == clientio.IntegerZero { + return nil, &diceerrors.PreProcessError{Result: clientio.IntegerZero} + } + + if len(cd.Args) < 2 { return nil, diceerrors.ErrWrongArgumentCount("COPY") } - decomposedCmds := []*cmd.DiceDBCmd{} - decomposedCmds = append(decomposedCmds, - &cmd.DiceDBCmd{ - Cmd: store.Set, - Args: []string{cd.Args[1], val}, + newObj, ok := resp.EvalResponse.Result.(*object.ExtendedObj) + if !ok { + return nil, diceerrors.ErrInternalServer + } + + decomposedCmds := []*cmd.DiceDBCmd{ + { + Cmd: "OBJECTCOPY", + Args: cd.Args[1:], + Obj: newObj, }, - ) + } return decomposedCmds, nil } diff --git a/internal/worker/cmd_meta.go b/internal/worker/cmd_meta.go index b92d38ce0..5fe2f07ee 100644 --- a/internal/worker/cmd_meta.go +++ b/internal/worker/cmd_meta.go @@ -79,7 +79,7 @@ const ( CmdZRank = "ZRANK" CmdZCount = "ZCOUNT" CmdZRem = "ZREM" - CmdZCard = "ZCARD" + CmdZCard = "ZCARD" CmdPFAdd = "PFADD" CmdPFCount = "PFCOUNT" CmdPFMerge = "PFMERGE" @@ -127,14 +127,14 @@ type CmdMeta struct { // If set to true, it signals that a preliminary step (such as fetching values from shards) // is necessary before the main command is executed. This is important for commands that depend // on the current state of data in the database. - preProcessingReq bool + preProcessing bool // preProcessResponse is a function that handles the preprocessing of a DiceDB command by // preparing the necessary operations (e.g., fetching values from shards) before the command // is executed. It takes the worker and the original DiceDB command as parameters and // ensures that any required information is retrieved and processed in advance. Use this when set // preProcessingReq = true. - preProcessResponse func(worker *BaseWorker, DiceDBCmd *cmd.DiceDBCmd) + preProcessResponse func(worker *BaseWorker, DiceDBCmd *cmd.DiceDBCmd) error } var CommandsMeta = map[string]CmdMeta{ @@ -215,7 +215,7 @@ var CommandsMeta = map[string]CmdMeta{ // Multi-shard commands. CmdRename: { CmdType: MultiShard, - preProcessingReq: true, + preProcessing: true, preProcessResponse: preProcessRename, decomposeCommand: decomposeRename, composeResponse: composeRename, @@ -223,7 +223,7 @@ var CommandsMeta = map[string]CmdMeta{ CmdCopy: { CmdType: MultiShard, - preProcessingReq: true, + preProcessing: true, preProcessResponse: preProcessCopy, decomposeCommand: decomposeCopy, composeResponse: composeCopy, diff --git a/internal/worker/cmd_preprocess.go b/internal/worker/cmd_preprocess.go index 424f24ed4..da89e78a2 100644 --- a/internal/worker/cmd_preprocess.go +++ b/internal/worker/cmd_preprocess.go @@ -2,18 +2,23 @@ package worker import ( "github.com/dicedb/dice/internal/cmd" + diceerrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/ops" ) // preProcessRename prepares the RENAME command for preprocessing by sending a GET command // to retrieve the value of the original key. The retrieved value is used later in the // decomposeRename function to delete the old key and set the new key. -func preProcessRename(w *BaseWorker, diceDBCmd *cmd.DiceDBCmd) { +func preProcessRename(w *BaseWorker, diceDBCmd *cmd.DiceDBCmd) error { + if len(diceDBCmd.Args) < 2 { + return diceerrors.ErrWrongArgumentCount("RENAME") + } + key := diceDBCmd.Args[0] sid, rc := w.shardManager.GetShardInfo(key) preCmd := cmd.DiceDBCmd{ - Cmd: CmdGet, + Cmd: "RENAME", Args: []string{key}, } @@ -26,27 +31,35 @@ func preProcessRename(w *BaseWorker, diceDBCmd *cmd.DiceDBCmd) { Client: nil, PreProcessing: true, } + + return nil } // preProcessCopy prepares the COPY command for preprocessing by sending a GET command // to retrieve the value of the original key. The retrieved value is used later in the // decomposeCopy function to copy the value to the destination key. -func preProcessCopy(w *BaseWorker, diceDBCmd *cmd.DiceDBCmd) { - key := diceDBCmd.Args[0] - sid, rc := w.shardManager.GetShardInfo(key) +func preProcessCopy(w *BaseWorker, diceDBCmd *cmd.DiceDBCmd) error { + if len(diceDBCmd.Args) < 2 { + return diceerrors.ErrWrongArgumentCount("COPY") + } - preCmd := cmd.DiceDBCmd{ - Cmd: CmdGet, - Args: []string{key}, + sid, rc := w.shardManager.GetShardInfo(diceDBCmd.Args[0]) + + preCmdk := cmd.DiceDBCmd{ + Cmd: "COPY", + Args: []string{diceDBCmd.Args[0]}, } + // Need to get response from both keys to handle Replace or not rc <- &ops.StoreOp{ SeqID: 0, RequestID: GenerateUniqueRequestID(), - Cmd: &preCmd, + Cmd: &preCmdk, WorkerID: w.id, ShardID: sid, Client: nil, PreProcessing: true, } + + return nil } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 67f127001..2847141cc 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -108,7 +108,7 @@ func (w *BaseWorker) Start(ctx context.Context) error { case cmdReq := <-w.adhocReqChan: // Handle adhoc requests of DiceDBCmd func() { - execCtx, cancel := context.WithTimeout(ctx, 6*time.Second) // Timeout set to 6 seconds for integration tests + execCtx, cancel := context.WithTimeout(ctx, 1000*time.Second) // Timeout set to 6 seconds for integration tests defer cancel() // adhoc requests should be classified as watch requests @@ -152,7 +152,7 @@ func (w *BaseWorker) Start(ctx context.Context) error { } // executeCommand executes the command and return the response back to the client func(errChan chan error) { - execCtx, cancel := context.WithTimeout(ctx, 6*time.Second) // Timeout set to 6 seconds for integration tests + execCtx, cancel := context.WithTimeout(ctx, 1000*time.Second) // Timeout set to 6 seconds for integration tests defer cancel() w.executeCommandHandler(execCtx, errChan, cmds, false) }(errChan) @@ -166,8 +166,13 @@ func (w *BaseWorker) Start(ctx context.Context) error { func (w *BaseWorker) executeCommandHandler(execCtx context.Context, errChan chan error, cmds []*cmd.DiceDBCmd, isWatchNotification bool) { // Retrieve metadata for the command to determine if multisharding is supported. meta, ok := CommandsMeta[cmds[0].Cmd] - if ok && meta.preProcessingReq { - meta.preProcessResponse(w, cmds[0]) + if ok && meta.preProcessing { + if err := meta.preProcessResponse(w, cmds[0]); err != nil { + e := w.ioHandler.Write(execCtx, err) + if e != nil { + slog.Debug("Error executing for worker", slog.String("workerID", w.id), slog.Any("error", err)) + } + } } err := w.executeCommand(execCtx, cmds[0], isWatchNotification) @@ -208,7 +213,13 @@ func (w *BaseWorker) executeCommand(ctx context.Context, diceDBCmd *cmd.DiceDBCm // If the command supports multisharding, break it down into multiple commands. cmdList, err = meta.decomposeCommand(ctx, w, diceDBCmd) if err != nil { - workerErr := w.ioHandler.Write(ctx, err) + var workerErr error + // Check if it's a CustomError + if customErr, ok := err.(*diceerrors.PreProcessError); ok { + workerErr = w.ioHandler.Write(ctx, customErr.Result) + } else { + workerErr = w.ioHandler.Write(ctx, err) + } if workerErr != nil { slog.Debug("Error executing for worker", slog.String("workerID", w.id), slog.Any("error", workerErr)) }