diff --git a/docs/benchmark.md b/docs/benchmark.md new file mode 100644 index 0000000000..a1f66ea231 --- /dev/null +++ b/docs/benchmark.md @@ -0,0 +1,272 @@ +# Valkey Benchmark + +Benchmark utility for measuring Valkey server performance. + +```bash +valkey-benchmark [OPTIONS] [--] [COMMAND ARGS...] +``` + +## Connection Options + +| Option | Description | +|--------|-------------| +| `-h ` | Server hostname (default: 127.0.0.1) | +| `-p ` | Server port (default: 6379) | +| `-s ` | Server socket (overrides host and port) | +| `-u ` | Server URI: `valkey://user:password@host:port/dbnum` | +| `-a ` | Password for Valkey Auth | +| `--user ` | Used to send ACL style 'AUTH username pass'. Needs `-a` | +| `--dbnum ` | SELECT the specified db number (default: 0) | +| `-3` | Start session in RESP3 protocol mode | + +## Performance Options + +| Option | Description | +|--------|-------------| +| `-c ` | Number of parallel connections (default: 50) | +| `-n ` | Total number of requests (default: 100000) | +| `-d ` | Data size of SET/GET value in bytes (default: 3) | +| `-P ` | Pipeline requests (default: 1, no pipeline) | +| `-k ` | Keep alive: 1=keep alive, 0=reconnect (default: 1) | +| `--threads ` | Enable multi-thread mode | +| `--rps ` | Limit requests per second (default: 0, no limit) | + +## Test Selection + +| Option | Description | +|--------|-------------| +| `-t ` | Comma-separated list of tests to run | +| `-l` | Loop mode: run tests forever | +| `-I` | Idle mode: open N idle connections and wait | + +Available tests: `ping`, `ping_inline`, `ping_mbulk`, `set`, `get`, `incr`, `lpush`, `rpush`, `lpop`, `rpop`, `sadd`, `hset`, `spop`, `zadd`, `zpopmin`, `lrange`, `lrange_100`, `lrange_300`, `lrange_500`, `lrange_600`, `mset`, `mget`, `xadd`, `function_load`, `fcall` + +## Output Options + +| Option | Description | +|--------|-------------| +| `-q` | Quiet mode: show only query/sec values | +| `--csv` | Output in CSV format | +| `--precision` | Number of decimal places in latency output (default: 0) | + +## Cluster Options + +| Option | Description | +|--------|-------------| +| `--cluster` | Enable cluster mode | +| `--rfr ` | Read from replicas: `no`/`yes`/`all` (default: `no`) | + +## Randomization Options + +| Option | Description | +|--------|-------------| +| `-r ` | Use random keys in range [0, keyspacelen-1] | +| `--sequential` | Use sequential numbers instead of random | +| `--seed ` | Set random number generator seed | + +## Dataset Support + +| Option | Description | +|--------|-------------| +| `--dataset ` | Dataset file for field placeholder replacement | + +### File Formats + +**CSV** +```csv +term,category +anarchism,politics +democracy,politics +``` +Header row required, comma-delimited, field names become `__field:name__` placeholders. + +**TSV** +Tab-delimited with header row. + +**XML** +```xml + + Anarchism + 12 + + 1317806107 + Article content... + + +``` +Requires `--xml-root-element` parameter. Root element choice affects discovered fields - deeper elements include nested content. + +### Dataset Behavior + +- One row per command +- Sequential iteration with wraparound +- Thread-safe atomic selection +- Duplicate XML field names: first occurrence wins + +### Usage + +```bash +# CSV dataset +valkey-benchmark --dataset terms.csv \ + -n 50000 FT.SEARCH myindex "__field:term__" + +# Wikipedia XML +valkey-benchmark --dataset wiki.xml --xml-root-element page \ + -n 10000 HSET "doc:__rand_int__" title "__field:title__" body "__field:text__" +``` + +**Memory:** Large datasets may require GB-scale RAM. + +## Additional Options + +| Option | Description | +|--------|-------------| +| `--enable-tracking` | Send CLIENT TRACKING on | +| `--num-functions ` | Functions in Lua lib (default: 10) | +| `--num-keys-in-fcall ` | Keys for FCALL (default: 1) | +| `--seed ` | RNG seed | +| `-x` | Read last arg from STDIN | +| `--mptcp` | Enable MPTCP | +| `--help` | Show help | +| `--version` | Show version | + +## Placeholder System + +### Random Placeholders + +| Placeholder | Behavior | +|-------------|----------| +| `__rand_int__` | Different random value per occurrence | +| `__rand_1st__` | Same random value for all occurrences in command | +| `__rand_2nd__` | Same random value for all occurrences in command | +| ... | ... | +| `__rand_9th__` | Same random value for all occurrences in command | + +Random values are 12-digit zero-padded numbers in range [0, keyspacelen-1]. + +### Data Placeholders + +| Placeholder | Description | +|-------------|-------------| +| `__data__` | Random data of size specified by `-d` option | + +### Cluster Placeholders + +| Placeholder | Description | +|-------------|-------------| +| `{tag}` | Cluster slot hashtag for proper key distribution | + +Required in cluster mode to ensure commands route to correct nodes. + +## Command Sequences + +Commands can be chained using semicolon separators: + +```bash +valkey-benchmark -- multi ';' set key:__rand_int__ __data__ ';' incr counter ';' exec +``` + +### Repetition Syntax + +Prefix commands with a number to repeat: + +```bash +valkey-benchmark -- 5 set key:__rand_int__ value ';' get key:__rand_int__ +``` + +This executes 5 SET commands followed by 1 GET command per pipeline iteration. + + +## Examples + +### Basic Benchmarking + +```bash +# Default benchmark suite +valkey-benchmark + +# Specific tests +valkey-benchmark -t ping,set,get -n 100000 + +# Custom data size +valkey-benchmark -t set -d 1024 -n 50000 +``` + +### Random Key Distribution + +```bash +# Random keys in range [0, 999999] +valkey-benchmark -t set,get -r 1000000 -n 100000 + +# Sequential keys +valkey-benchmark -t set --sequential -r 1000000 -n 100000 +``` + +### Dataset-Driven Benchmarking + +```bash +# CSV dataset +valkey-benchmark --dataset terms.csv \ + -n 50000 FT.SEARCH myindex "__field:term__" + +# Wikipedia XML dataset (page-level) +valkey-benchmark --dataset wiki_sample.xml --xml-root-element page \ + -n 10000 HSET "doc:__rand_int__" title "__field:title__" content "__field:text__" id "__field:id__" + +# Wikipedia XML dataset (revision-level) +valkey-benchmark --dataset wiki_sample.xml --xml-root-element revision \ + -n 10000 HSET "doc:__rand_int__" content "__field:text__" timestamp "__field:timestamp__" + +# Multiple field usage +valkey-benchmark --dataset products.csv \ + -- HSET product:__field:id__ name "__field:name__" price __field:price__ +``` + +### Cluster Benchmarking + +```bash +# Cluster mode with proper key distribution +valkey-benchmark --cluster -t set,get \ + -- SET key:{tag}:__rand_int__ __data__ + +# Read from replicas +valkey-benchmark --cluster --rfr yes -t get \ + -- GET key:{tag}:__rand_int__ +``` + +### Pipelining + +```bash +# Pipeline 10 requests +valkey-benchmark -P 10 -t set -n 100000 + +# Pipeline with datasets +valkey-benchmark --dataset terms.csv -P 5 \ + -n 50000 FT.SEARCH index "__field:term__" +``` + +### Complex Command Sequences + +```bash +# Transaction benchmark +valkey-benchmark -r 100000 -n 10000 \ + -- multi ';' set key:__rand_int__ __data__ ';' \ + incr counter:__rand_int__ ';' exec + +# Mixed operations with repetition +valkey-benchmark -r 100000 \ + -- 3 set key:__rand_int__ __data__ ';' \ + 2 get key:__rand_int__ ';' \ + del key:__rand_int__ +``` + +### Rate Limiting + +```bash +# Limit to 1000 requests/second +valkey-benchmark --rps 1000 -t set -n 50000 + +# Dataset with rate limiting +valkey-benchmark --dataset search_terms.csv --rps 500 \ + -n 10000 FT.SEARCH index "__field:term__" +``` diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 282faa6d95..5924c0deb2 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -81,6 +81,32 @@ static const char *PLACEHOLDERS[PLACEHOLDER_COUNT] = { "__rand_int__", "__rand_1st__", "__rand_2nd__", "__rand_3rd__", "__rand_4th__", "__rand_5th__", "__rand_6th__", "__rand_7th__", "__rand_8th__", "__rand_9th__"}; +/* Dataset support structures */ +#define MAX_DATASET_FIELDS 64 +#define FIELD_PREFIX "__field:" +#define FIELD_PREFIX_LEN 8 +#define FIELD_SUFFIX "__" +#define FIELD_SUFFIX_LEN 2 + +typedef enum datasetFormat { + DATASET_FORMAT_CSV = 0, + DATASET_FORMAT_TSV, + DATASET_FORMAT_XML +} datasetFormat; + +typedef struct datasetRecord { + sds *fields; +} datasetRecord; + +typedef struct dataset { + datasetFormat format; /* Dataset file format */ + char delimiter; /* Field delimiter for CSV/TSV */ + sds *field_names; /* Field name lookup table */ + int field_count; /* Number of fields */ + datasetRecord *records; /* Structured field data */ + size_t record_count; /* Number of records */ +} dataset; + struct benchmarkThread; struct clusterNode; struct serverConfig; @@ -153,6 +179,15 @@ static struct config { atomic_uint_fast64_t last_time_ns; uint64_t time_per_token; uint64_t time_per_burst; + /* Dataset support */ + sds dataset_file; + int max_documents; /* Maximum documents to load from dataset */ + sds xml_root_element; /* XML root element name */ + dataset *current_dataset; /* Current loaded dataset */ + /* Command template for dataset mode */ + int template_argc; + sds *template_argv; + int has_field_placeholders; } config; /* Locations of the placeholders __rand_int__, __rand_1st__, @@ -229,6 +264,14 @@ static void freeServerConfig(serverConfig *cfg); static int fetchClusterSlotsConfiguration(client c); static void updateClusterSlotsConfiguration(void); static long long showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData); +static sds getFieldValue(const char *row, int column_index, char delimiter); +static sds getXmlFieldValue(const char *xml_doc, const char *field_name); +static sds generateCompleteCommand(int record_index); +static sds formatBytes(size_t bytes); +static int parseCommandTemplate(int argc, char **argv); +static dataset *initDataset(void); +static void freeDataset(dataset *ds); +static void reportDatasetMemory(dataset *ds); /* Dict callbacks */ static uint64_t dictSdsHash(const void *key); @@ -858,8 +901,25 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } - /* Really initialize: replace keys and set start time. */ - if (config.replace_placeholders) replacePlaceholders(c->obuf + c->prefixlen, config.pipeline); + /* Dataset field access mode - completely independent command generation */ + if (config.has_field_placeholders && config.current_dataset && config.current_dataset->record_count > 0) { + static _Atomic uint64_t record_counter = 0; + + /* Generate complete pipeline commands for dataset placeholders */ + sdssetlen(c->obuf, c->prefixlen); + for (int p = 0; p < config.pipeline; p++) { + uint64_t record_index = atomic_fetch_add_explicit(&record_counter, 1, memory_order_relaxed) % config.current_dataset->record_count; + sds complete_cmd = generateCompleteCommand(record_index); + c->obuf = sdscatlen(c->obuf, complete_cmd, sdslen(complete_cmd)); + sdsfree(complete_cmd); + } + } else { + /* Standard mode */ + if (config.replace_placeholders) { + replacePlaceholders(c->obuf + c->prefixlen, config.pipeline); + } + } + if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); c->start = ustime(); @@ -1606,6 +1666,683 @@ static void updateClusterSlotsConfiguration(void) { pthread_mutex_unlock(&config.is_updating_slots_mutex); } +/* Validate field placeholders in command arguments */ +static void validateFieldPlaceholders(sds *template_argv, int template_argc) { + for (int arg_idx = 0; arg_idx < template_argc; arg_idx++) { + const char *arg = template_argv[arg_idx]; + const char *field_pos = strstr(arg, FIELD_PREFIX); + while (field_pos) { + const char *field_start = field_pos + FIELD_PREFIX_LEN; + const char *field_end = strstr(field_start, FIELD_SUFFIX); + if (!field_end) break; + + /* Extract and validate field name */ + size_t field_name_len = field_end - field_start; + sds field_name = sdsnewlen(field_start, field_name_len); + + int field_found = 0; + for (int k = 0; k < config.current_dataset->field_count; k++) { + if (!strcmp(field_name, config.current_dataset->field_names[k])) { + field_found = 1; + break; + } + } + + if (!field_found) { + fprintf(stderr, "Error: Field placeholder '__field:%s__' not found in dataset fields\n", field_name); + fprintf(stderr, "Available fields: "); + for (int j = 0; j < config.current_dataset->field_count; j++) { + fprintf(stderr, "%s%s", config.current_dataset->field_names[j], + (j < config.current_dataset->field_count - 1) ? ", " : "\n"); + } + sdsfree(field_name); + exit(1); + } + + sdsfree(field_name); + field_pos = strstr(field_end + FIELD_SUFFIX_LEN, FIELD_PREFIX); + } + } +} + +/* Format bytes into human-readable string */ +static sds formatBytes(size_t bytes) { + if (bytes < 1024) { + return sdscatprintf(sdsempty(), "%zu bytes", bytes); + } else if (bytes < 1024 * 1024) { + return sdscatprintf(sdsempty(), "%.2f KB", bytes / 1024.0); + } else if (bytes < 1024 * 1024 * 1024) { + return sdscatprintf(sdsempty(), "%.2f MB", bytes / (1024.0 * 1024.0)); + } else { + return sdscatprintf(sdsempty(), "%.2f GB", bytes / (1024.0 * 1024.0 * 1024.0)); + } +} + +/* CSV field discovery - parse header line */ +static int csvDiscoverFields(dataset *ds) { + FILE *fp = fopen(config.dataset_file, "r"); + if (!fp) { + fprintf(stderr, "Cannot open dataset file: %s\n", config.dataset_file); + return 0; + } + + char *line = NULL; + size_t len = 0; + if (getline(&line, &len, fp) == -1) { + fprintf(stderr, "Cannot read header from dataset file\n"); + free(line); + fclose(fp); + return 0; + } + + /* Remove trailing newlines */ + len = strlen(line); + if (len > 0 && line[len - 1] == '\n') line[len - 1] = '\0'; + if (len > 1 && line[len - 2] == '\r') line[len - 2] = '\0'; + + /* Parse header using delimiter */ + int count; + char delim_str[2] = {ds->delimiter, '\0'}; + ds->field_names = sdssplitlen(line, strlen(line), delim_str, 1, &count); + ds->field_count = count; + + free(line); + fclose(fp); + return 1; +} + +/* Generic document limit check */ +static int shouldStopLoading(size_t current_count) { + if (config.max_documents > 0 && (int)current_count >= config.max_documents) { + return 1; + } + return 0; +} + + +static int scanXmlFields(const char *doc_start, const char *doc_end, dataset *ds, const char *start_root_tag, const char *end_root_tag) { + char field_names[MAX_DATASET_FIELDS][64]; + int field_count = 0; + int root_start_tag_len = strlen(start_root_tag); + int root_end_tag_len = strlen(end_root_tag); + + const char *current_pos = doc_start; + while ((current_pos = strchr(current_pos, '<')) != NULL && current_pos < doc_end) { + if (current_pos[1] == '/' || current_pos[1] == '!' || + !strncmp(current_pos, start_root_tag, root_start_tag_len) || + !strncmp(current_pos, end_root_tag, root_end_tag_len)) { + current_pos++; + continue; + } + + const char *tag_end = strchr(current_pos, '>'); + if (!tag_end || tag_end >= doc_end) break; + + const char *field_start = current_pos + 1; + const char *field_name_end = field_start; + + /* Find end of tag name (either space for attributes or '>' for tag end) */ + while (field_name_end < tag_end && *field_name_end != ' ' && *field_name_end != '\t') { + field_name_end++; + } + + size_t field_name_len = field_name_end - field_start; + + if (field_name_len == 0 || field_name_len >= 64) { + current_pos = tag_end + 1; + continue; + } + + int is_duplicate = 0; + for (int i = 0; i < field_count; i++) { + if (strlen(field_names[i]) == field_name_len && + !memcmp(field_names[i], field_start, field_name_len)) { + is_duplicate = 1; + break; + } + } + + if (!is_duplicate && field_count < MAX_DATASET_FIELDS) { + memcpy(field_names[field_count], field_start, field_name_len); + field_names[field_count][field_name_len] = '\0'; + field_count++; + } + + current_pos = tag_end + 1; + } + + if (field_count == 0) return 0; + + ds->field_names = zmalloc(field_count * sizeof(sds)); + for (int i = 0; i < field_count; i++) { + ds->field_names[i] = sdsnew(field_names[i]); + } + ds->field_count = field_count; + + return 1; +} + +/* Discover XML fields from first document */ +static int scanXmlFieldsFromFile(dataset *ds, const char *xml_root_element) { + FILE *fp = fopen(config.dataset_file, "r"); + if (!fp) return 0; + + char start_tag[64], end_tag[64]; + snprintf(start_tag, sizeof(start_tag), "<%s>", xml_root_element); + snprintf(end_tag, sizeof(end_tag), "", xml_root_element); + + char buffer[1024]; + sds current_doc = sdsempty(); + + /* Read until we find first complete document for field discovery */ + while (fgets(buffer, sizeof(buffer), fp)) { + current_doc = sdscat(current_doc, buffer); + + const char *doc_start = strstr(current_doc, start_tag); + if (!doc_start) continue; + + const char *doc_end = strstr(doc_start, end_tag); + if (!doc_end) continue; + + doc_end += strlen(end_tag); + + /* Scan fields from first document */ + int result = scanXmlFields(doc_start, doc_end, ds, start_tag, end_tag); + sdsfree(current_doc); + fclose(fp); + return result; + } + + sdsfree(current_doc); + fclose(fp); + return 0; +} + +static int loadXmlDataset(dataset *ds) { + FILE *fp = fopen(config.dataset_file, "r"); + if (!fp) return 0; + + char start_tag[64], end_tag[64]; + snprintf(start_tag, sizeof(start_tag), "<%s>", config.xml_root_element); + snprintf(end_tag, sizeof(end_tag), "", config.xml_root_element); + + char buffer[1024]; + sds current_doc = sdsempty(); + int fields_discovered = 0; + size_t capacity = 1000; + + ds->records = zmalloc(sizeof(datasetRecord) * capacity); + + if (!config.quiet) { + printf("Loading XML dataset from %s...\n", config.dataset_file); + } + + while (fgets(buffer, sizeof(buffer), fp) && !shouldStopLoading(ds->record_count)) { + current_doc = sdscat(current_doc, buffer); + + const char *doc_start = strstr(current_doc, start_tag); + if (!doc_start) continue; + + const char *doc_end = strstr(doc_start, end_tag); + if (!doc_end) continue; + + doc_end += strlen(end_tag); + + if (!fields_discovered) { + /* Check if fields are already discovered from scanXmlFieldsFromFile() */ + if (ds->field_names && ds->field_count > 0) { + fields_discovered = 1; + } else { + if (!scanXmlFields(doc_start, doc_end, ds, start_tag, end_tag)) { + fprintf(stderr, "No XML fields discovered\n"); + sdsfree(current_doc); + fclose(fp); + return 0; + } + fields_discovered = 1; + + if (!config.quiet) { + printf("Discovered %d fields: ", ds->field_count); + for (int i = 0; i < ds->field_count; i++) { + printf("%s%s", ds->field_names[i], (i < ds->field_count - 1) ? ", " : "\n"); + } + } + } + } + + if (ds->record_count >= capacity) { + capacity *= 2; + ds->records = zrealloc(ds->records, sizeof(datasetRecord) * capacity); + } + + datasetRecord *record = &ds->records[ds->record_count]; + record->fields = zmalloc(sizeof(sds) * ds->field_count); + + sds doc_str = sdsnewlen(doc_start, doc_end - doc_start); + for (int i = 0; i < ds->field_count; i++) { + record->fields[i] = getXmlFieldValue(doc_str, ds->field_names[i]); + } + sdsfree(doc_str); + + ds->record_count++; + + if (!config.quiet && ds->record_count % 10000 == 0) { + printf("\rLoaded %zu documents...", ds->record_count); + fflush(stdout); + } + + sdsclear(current_doc); + } + + sdsfree(current_doc); + fclose(fp); + return 1; +} + +/* CSV/TSV structured record loader */ +static int csvLoadDocuments(dataset *ds) { + FILE *fp = fopen(config.dataset_file, "r"); + if (!fp) return 0; + + /* Skip header */ + char *line = NULL; + size_t len = 0; + if (getline(&line, &len, fp) == -1) { + fprintf(stderr, "Cannot read header from dataset file\n"); + free(line); + fclose(fp); + return 0; + } + + size_t capacity = 1000; + ds->records = zmalloc(sizeof(datasetRecord) * capacity); + + const char *format_name = (ds->format == DATASET_FORMAT_CSV) ? "csv" : (ds->format == DATASET_FORMAT_TSV) ? "tsv" + : "xml"; + if (!config.quiet) { + printf("Loading %s dataset from %s...\n", format_name, config.dataset_file); + } + + while (getline(&line, &len, fp) != -1 && !shouldStopLoading(ds->record_count)) { + if (line[0] == '\0' || line[0] == '\n') continue; + + /* Clean line endings */ + size_t line_len = strlen(line); + if (line_len > 0 && line[line_len - 1] == '\n') line[line_len - 1] = '\0'; + if (line_len > 1 && line[line_len - 2] == '\r') line[line_len - 2] = '\0'; + + if (ds->record_count >= capacity) { + capacity *= 2; + ds->records = zrealloc(ds->records, sizeof(datasetRecord) * capacity); + } + + /* Extract field values into structured record */ + datasetRecord *record = &ds->records[ds->record_count]; + record->fields = zmalloc(sizeof(sds) * ds->field_count); + + for (int i = 0; i < ds->field_count; i++) { + record->fields[i] = getFieldValue(line, i, ds->delimiter); + } + + ds->record_count++; + } + + free(line); + fclose(fp); + return 1; +} + +/* Dataset initialization */ +static dataset *initDataset(void) { + dataset *ds = zcalloc(sizeof(dataset)); + if (!ds) return NULL; + + /* Validate XML parameters early */ + if (strstr(config.dataset_file, ".xml") && !config.xml_root_element) { + fprintf(stderr, "Error: XML dataset requires --xml-root-element parameter\n"); + fprintf(stderr, "Example: --xml-root-element doc\n"); + zfree(ds); + return NULL; + } + + /* Detect file format from extension */ + const char *filename = config.dataset_file; + if (strstr(filename, ".csv")) { + ds->format = DATASET_FORMAT_CSV; + ds->delimiter = ','; + } else if (strstr(filename, ".tsv")) { + ds->format = DATASET_FORMAT_TSV; + ds->delimiter = '\t'; + } else if (strstr(filename, ".xml")) { + ds->format = DATASET_FORMAT_XML; + ds->delimiter = 0; + } else { + ds->format = DATASET_FORMAT_CSV; + ds->delimiter = ','; + } + + /* Step 1: Discover fields (lightweight operation) */ + if (ds->format == DATASET_FORMAT_XML) { + /* XML: Need to scan first document for field discovery */ + if (!scanXmlFieldsFromFile(ds, config.xml_root_element)) goto error; + } else { + /* CSV/TSV: Discover fields from header line */ + if (!csvDiscoverFields(ds)) goto error; + } + + /* Step 2: Early validation (before bulk loading) */ + if (config.has_field_placeholders && config.template_argv && config.template_argc > 0) { + config.current_dataset = ds; /* Temporarily set for validation */ + validateFieldPlaceholders(config.template_argv, config.template_argc); + config.current_dataset = NULL; /* Reset until fully loaded */ + } + + /* Step 3: Load all data (only after validation passes) */ + if (ds->format == DATASET_FORMAT_XML) { + if (!loadXmlDataset(ds)) goto error; + } else { + if (!csvLoadDocuments(ds)) goto error; + } + + return ds; + +error: + freeDataset(ds); + return NULL; +} + +/* Free dataset */ +static void freeDataset(dataset *ds) { + if (!ds) return; + + /* Free field names */ + if (ds->field_names) { + sdsfreesplitres(ds->field_names, ds->field_count); + } + + /* Free structured records */ + if (ds->records) { + for (size_t i = 0; i < ds->record_count; i++) { + if (ds->records[i].fields) { + for (int j = 0; j < ds->field_count; j++) { + sdsfree(ds->records[i].fields[j]); + } + zfree(ds->records[i].fields); + } + } + zfree(ds->records); + } + + zfree(ds); +} + +/* Extract field value from CSV/TSV row */ +static sds getFieldValue(const char *row, int column_index, char delimiter) { + int current_col = 0; + const char *start = row; + const char *p = row; + int in_quotes = 0; + + while (*p) { + if (*p == '"') { + in_quotes = !in_quotes; + } else if (*p == delimiter && !in_quotes) { + if (current_col == column_index) { + /* Found our column */ + size_t len = p - start; + /* Remove surrounding quotes if present */ + if (len > 0 && start[0] == '"' && p[-1] == '"') { + start++; + len -= 2; + } + return sdsnewlen(start, len); + } + current_col++; + start = p + 1; + } + p++; + } + + /* Last column or only column */ + if (current_col == column_index) { + size_t len = p - start; + if (len > 0 && start[0] == '"' && p[-1] == '"') { + start++; + len -= 2; + } + return sdsnewlen(start, len); + } + + return sdsempty(); +} + +/* Extract field value from XML document */ +static sds getXmlFieldValue(const char *xml_doc, const char *field_name) { + char start_tag_prefix[128], end_tag[128]; + snprintf(start_tag_prefix, sizeof(start_tag_prefix), "<%s", field_name); + snprintf(end_tag, sizeof(end_tag), "", field_name); + + /* Find opening tag (with or without attributes) */ + const char *tag_start = strstr(xml_doc, start_tag_prefix); + if (!tag_start) return sdsempty(); + + /* Find the end of the opening tag */ + const char *tag_end = strchr(tag_start, '>'); + if (!tag_end) return sdsempty(); + + const char *content_start = tag_end + 1; + + /* Find closing tag */ + const char *closing_tag = strstr(content_start, end_tag); + if (!closing_tag) return sdsempty(); + + size_t content_len = closing_tag - content_start; + return sdsnewlen(content_start, content_len); +} + +/* Report dataset memory usage */ +static void reportDatasetMemory(dataset *ds) { + if (!config.quiet) { + /* Calculate total memory from structured records */ + size_t total_memory = 0; + for (size_t i = 0; i < ds->record_count; i++) { + for (int j = 0; j < ds->field_count; j++) { + total_memory += sdslen(ds->records[i].fields[j]); + } + } + sds size_str = formatBytes(total_memory); + printf("Dataset: %zu documents (%s)\n", ds->record_count, size_str); + sdsfree(size_str); + } +} + + +/* Find field index in dataset by name */ +static int findFieldIndex(const char *field_name, size_t field_name_len) { + for (int k = 0; k < config.current_dataset->field_count; k++) { + if (strlen(config.current_dataset->field_names[k]) == field_name_len && + !memcmp(config.current_dataset->field_names[k], field_name, field_name_len)) { + return k; + } + } + return -1; +} + +/* Extract field value from dataset record */ +static const char *extractDatasetFieldValue(int field_idx, int record_index) { + return config.current_dataset->records[record_index].fields[field_idx]; +} + +static sds replaceOccurrence(sds processed_arg, const char *pos, const char *replacement) { + size_t offset = pos - processed_arg; + size_t replacement_len = strlen(replacement); + size_t total_len = offset + replacement_len + (sdslen(processed_arg) - offset - PLACEHOLDER_LEN); + + /* Single allocation for final result */ + sds result = sdsnewlen(NULL, total_len); + char *p = result; + + memcpy(p, processed_arg, offset); + p += offset; + + memcpy(p, replacement, replacement_len); + p += replacement_len; + + const char *after_start = pos + PLACEHOLDER_LEN; + size_t after_len = sdslen(processed_arg) - offset - PLACEHOLDER_LEN; + memcpy(p, after_start, after_len); + + sdsfree(processed_arg); + return result; +} + +/* Process field placeholders in a single argument */ +static sds processFieldsInArg(sds arg, int record_index) { + if (!strstr(arg, FIELD_PREFIX)) return arg; + + const char *field_pos = strstr(arg, FIELD_PREFIX); + const char *field_start = field_pos + FIELD_PREFIX_LEN; + const char *field_end = strstr(field_start, FIELD_SUFFIX); + if (!field_end) return arg; + + size_t field_name_len = field_end - field_start; + int field_idx = findFieldIndex(field_start, field_name_len); + if (field_idx == -1) return arg; + + const char *field_value = extractDatasetFieldValue(field_idx, record_index); + size_t before_len = field_pos - arg; + const char *after_start = field_end + FIELD_SUFFIX_LEN; + + sds result = sdsnewlen(arg, before_len); + result = sdscat(result, field_value); + result = sdscat(result, after_start); + + sdsfree(arg); + return result; +} + + +static sds processRandPlaceholdersForDataSet(sds cmd, _Atomic uint64_t *seq_key) { + if (!config.replace_placeholders || config.keyspacelen == 0) return cmd; + + for (int ph = 0; ph < PLACEHOLDER_COUNT; ph++) { + if (!strstr(cmd, PLACEHOLDERS[ph])) continue; + + uint64_t shared_key = 0; + int generate_shared_key = (ph != 0); + + if (generate_shared_key) { + /* Generate shared key for __rand_1st__ - __rand_9th__ */ + if (config.sequential_replacement) { + shared_key = atomic_fetch_add_explicit(&seq_key[ph], 1, memory_order_relaxed); + } else { + shared_key = random(); + } + shared_key %= config.keyspacelen; + } + + /* Process all occurrences */ + size_t search_offset = 0; + char *pos; + while ((pos = strstr(cmd + search_offset, PLACEHOLDERS[ph])) != NULL) { + uint64_t key = generate_shared_key ? shared_key : 0; + + if (!generate_shared_key) { + /* __rand_int__: Generate different key per occurrence */ + if (config.sequential_replacement) { + key = atomic_fetch_add_explicit(&seq_key[ph], 1, memory_order_relaxed); + } else { + key = random(); + } + key %= config.keyspacelen; + } + + char key_str[24]; + snprintf(key_str, sizeof(key_str), "%012llu", (unsigned long long)key); + + size_t offset = pos - cmd; + cmd = replaceOccurrence(cmd, pos, key_str); + search_offset = offset + PLACEHOLDER_LEN; + } + } + + return cmd; +} + +/* Generate complete command with field placeholders replaced before RESP encoding */ +static sds generateCompleteCommand(int record_index) { + static _Atomic uint64_t seq_key[PLACEHOLDER_COUNT] = {0}; + + sds *processed_argv = zmalloc(config.template_argc * sizeof(sds)); + for (int i = 0; i < config.template_argc; i++) { + processed_argv[i] = processFieldsInArg(sdsdup(config.template_argv[i]), record_index); + } + + char *cmd; + int len = valkeyFormatCommandArgv(&cmd, config.template_argc, (const char **)processed_argv, NULL); + sds result = sdsnewlen(cmd, len); + free(cmd); + + result = processRandPlaceholdersForDataSet(result, seq_key); + + /* Clean up processed arguments */ + for (int i = 0; i < config.template_argc; i++) { + sdsfree(processed_argv[i]); + } + zfree(processed_argv); + + return result; +} + +/* Free dataset memory */ +static void cleanupDataset(void) { + if (config.current_dataset) { + freeDataset(config.current_dataset); + config.current_dataset = NULL; + } +} + +/* Add RESP command to sequence with repeat count */ +static void addRespCommandToSequence(sds *sds_args, size_t *argvlen, int start, int end, int repeat, sds *cmd_seq, int *seq_len) { + char *cmd = NULL; + int len = valkeyFormatCommandArgv(&cmd, end - start, (const char **)sds_args + start, argvlen + start); + for (int j = 0; j < repeat; j++) { + *cmd_seq = sdscatlen(*cmd_seq, cmd, len); + } + *seq_len += repeat; + free(cmd); +} + +/* Parse and setup command template for dataset field validation */ +static int parseCommandTemplate(int argc, char **argv) { + sds *sds_args = getSdsArrayFromArgv(argc, argv, 0); + if (!sds_args) { + fprintf(stderr, "Invalid quoted string\n"); + return 0; + } + + /* Detect field placeholders */ + config.has_field_placeholders = 0; + for (int i = 0; i < argc; i++) { + if (strstr(sds_args[i], FIELD_PREFIX)) { + config.has_field_placeholders = 1; + break; + } + } + + if (config.has_field_placeholders) { + config.template_argc = argc; + config.template_argv = zmalloc(argc * sizeof(sds)); + for (int i = 0; i < argc; i++) { + config.template_argv[i] = sdsdup(sds_args[i]); + } + } + + sdsfreesplitres(sds_args, argc); + return 1; +} + + /* Generate random data for the benchmark. See #7196. */ static void genBenchmarkRandomData(char *data, int count) { static uint32_t state = 1234; @@ -1779,6 +2516,16 @@ int parseOptions(int argc, char **argv) { config.num_functions = atoi(argv[++i]); } else if (!strcmp(argv[i], "--num-keys-in-fcall")) { config.num_keys_in_fcall = atoi(argv[++i]); + } else if (!strcmp(argv[i], "--dataset")) { + if (lastarg) goto invalid; + config.dataset_file = sdsnew(argv[++i]); + } else if (!strcmp(argv[i], "--maxdocs")) { + if (lastarg) goto invalid; + config.max_documents = atoi(argv[++i]); + if (config.max_documents <= 0) config.max_documents = -1; + } else if (!strcmp(argv[i], "--xml-root-element")) { + if (lastarg) goto invalid; + config.xml_root_element = sdsnew(argv[++i]); } else if (!strcmp(argv[i], "--help")) { exit_status = 0; goto usage; @@ -1887,6 +2634,8 @@ int parseOptions(int argc, char **argv) { "__rand_1st__ Like __rand_int__ but multiple occurrences will have the same\n" " value. __rand_2nd__ through __rand_9th__ are also available.\n" " __data__ Replaced with data of the size specified by the -d option.\n" + " __field:name__ Replaced with data from the specified field/column in the\n" + " dataset. Requires --dataset option.\n" " {tag} Replaced with a tag that routes the command to each node in\n" " a cluster. Include this in key names when running in cluster\n" " mode.\n" @@ -1972,7 +2721,9 @@ int parseOptions(int argc, char **argv) { " loaded when running the 'function_load' test. (default 10).\n" " --num-keys-in-fcall \n" " Sets the number of keys passed to FCALL command when running\n" - " the 'fcall' test. (default 1)\n", + " the 'fcall' test. (default 1)\n" + " --dataset Path to CSV/TSV/XML dataset file for field placeholder replacement.\n" + " All fields auto-detected with natural content lengths.\n", tls_usage, rdma_usage, " --mptcp Enable an MPTCP connection.\n" @@ -2174,12 +2925,41 @@ int main(int argc, char **argv) { config.num_functions = 10; config.num_keys_in_fcall = 1; config.resp3 = 0; + config.dataset_file = NULL; + config.max_documents = -1; /* -1 = unlimited */ + config.xml_root_element = NULL; + config.current_dataset = NULL; + config.template_argc = 0; + config.template_argv = NULL; + config.has_field_placeholders = 0; resetPlaceholders(); i = parseOptions(argc, argv); argc -= i; argv += i; + /* Setup dataset if specified */ + if (config.dataset_file) { + if (argc == 0) { + fprintf(stderr, "Error: Dataset mode requires a command with field placeholders\n"); + fprintf(stderr, "Example: SET doc:__rand_int__ \"__field:content__\"\n"); + exit(1); + } + + /* Parse command template and setup field placeholder detection */ + if (!parseCommandTemplate(argc, argv)) { + exit(1); + } + + /* Initialize dataset with validation */ + config.current_dataset = initDataset(); + if (!config.current_dataset) { + fprintf(stderr, "Failed to initialize dataset\n"); + exit(1); + } + + reportDatasetMemory(config.current_dataset); + } /* Set default for requests if not specified */ if (config.requests < 0) config.requests = 100000; @@ -2324,18 +3104,15 @@ int main(int argc, char **argv) { } else if (i == argc || strcmp(";", sds_args[i]) == 0) { cmd = NULL; if (i == start) continue; - /* End of command. RESP-encode and append to sequence. */ - len = valkeyFormatCommandArgv(&cmd, i - start, - (const char **)sds_args + start, - argvlen + start); - for (int j = 0; j < repeat; j++) { - cmd_seq = sdscatlen(cmd_seq, cmd, len); - } - seq_len += repeat; - free(cmd); + + addRespCommandToSequence(sds_args, argvlen, start, i, repeat, &cmd_seq, &seq_len); start = i + 1; repeat = 1; } else if (strstr(sds_args[i], "__data__")) { + if (config.current_dataset) { + fprintf(stderr, "Error: __data__ placeholders cannot be used with --dataset option\n"); + exit(1); + } /* Replace data placeholders with data of length given by -d. */ int num_parts; sds *parts = sdssplitlen(sds_args[i], sdslen(sds_args[i]), @@ -2354,6 +3131,7 @@ int main(int argc, char **argv) { sds_args[i] = newarg; argvlen[i] = sdslen(sds_args[i]); } + /* NOTE: Field placeholder processing is handled above in the command-level loop to ensure row consistency */ } len = sdslen(cmd_seq); /* adjust the datasize to the parsed command */ @@ -2574,6 +3352,15 @@ int main(int argc, char **argv) { freeCliConnInfo(config.conn_info); if (config.server_config != NULL) freeServerConfig(config.server_config); resetPlaceholders(); + cleanupDataset(); + + /* Clean up command template */ + if (config.template_argv) { + for (int i = 0; i < config.template_argc; i++) { + sdsfree(config.template_argv[i]); + } + zfree(config.template_argv); + } return 0; } diff --git a/tests/integration/valkey-benchmark.tcl b/tests/integration/valkey-benchmark.tcl index 8b36a00d44..c53acfd786 100644 --- a/tests/integration/valkey-benchmark.tcl +++ b/tests/integration/valkey-benchmark.tcl @@ -199,6 +199,224 @@ tags {"benchmark network external:skip logreqres:skip"} { assert {$different_count > 0} } + test {benchmark: dataset CSV with field placeholders} { + # Create test CSV dataset + set csv_data "title,content,author\nTest Title 1,Test Content 1,Author 1\nTest Title 2,Test Content 2,Author 2" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 4 -r 10 -- HSET doc:__rand_int__ title \"__field:title__\" content \"__field:content__\""] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat hset] + + # Verify field data was inserted correctly + set keys [r keys "doc:*"] + assert {[llength $keys] > 0} + set sample_key [lindex $keys 0] + set title [r hget $sample_key title] + set content [r hget $sample_key content] + assert {$title eq "Test Title 1" || $title eq "Test Title 2"} + assert {$content eq "Test Content 1" || $content eq "Test Content 2"} + + file delete $csv_file + } + + test {benchmark: dataset XML with field placeholders} { + # Create test XML dataset matching Wikipedia structure + set xml_data "XML Title 1XML Abstract 1http://example1.comtest1http://test1.com\nXML Title 2XML Abstract 2http://example2.comtest2http://test2.com" + set xml_file [tmpfile "dataset.xml"] + set fd [open $xml_file w] + puts $fd $xml_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $xml_file --xml-root-element doc -n 4 -r 10 -- HSET xml_doc:__rand_int__ title \"__field:title__\" abstract \"__field:abstract__\""] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat hset] + + # Verify XML field data was inserted correctly + set keys [r keys "xml_doc:*"] + assert {[llength $keys] > 0} + set sample_key [lindex $keys 0] + set title [r hget $sample_key title] + set abstract [r hget $sample_key abstract] + assert {$title eq "XML Title 1" || $title eq "XML Title 2"} + assert {$abstract eq "XML Abstract 1" || $abstract eq "XML Abstract 2"} + + file delete $xml_file + } + + test {benchmark: dataset with maxdocs limit} { + # Create test dataset with multiple rows + set csv_data "name,value\nitem1,value1\nitem2,value2\nitem3,value3\nitem4,value4" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file --maxdocs 2 -n 4 -r 10 -- SET item:__rand_int__ \"__field:value__\""] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat set] + + # Should only use first 2 documents due to maxdocs limit + set keys [r keys "item:*"] + assert {[llength $keys] > 0} + + # Verify ALL keys only contain values from first 2 documents + set unique_values {} + foreach key $keys { + set value [r get $key] + assert {$value eq "value1" || $value eq "value2"} + if {[lsearch $unique_values $value] == -1} { + lappend unique_values $value + } + } + + file delete $csv_file + } + + test {benchmark: dataset error handling - invalid field} { + set csv_data "name,value\nitem1,value1" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 1 -- SET item:__rand_int__ \"__field:invalid_field__\""] + + # Should fail with invalid field error + if {[catch { exec {*}$cmd } error]} { + assert_match "*not found in dataset fields*" $error + } else { + fail "Expected error for invalid field placeholder" + } + + file delete $csv_file + } + + test {benchmark: dataset TSV with field placeholders} { + # Create test TSV dataset (tab-separated values) + set tsv_data "name\tvalue\tcount\nitem1\tvalue1\t100\nitem2\tvalue2\t200" + set tsv_file [tmpfile "dataset.tsv"] + set fd [open $tsv_file w] + puts $fd $tsv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $tsv_file -n 4 -r 10 -- HSET tsv_doc:__rand_int__ name \"__field:name__\" value \"__field:value__\" count __field:count__"] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat hset] + + # Verify TSV field data was inserted correctly + set keys [r keys "tsv_doc:*"] + assert {[llength $keys] > 0} + set sample_key [lindex $keys 0] + set name [r hget $sample_key name] + set value [r hget $sample_key value] + set count [r hget $sample_key count] + assert {$name eq "item1" || $name eq "item2"} + assert {$value eq "value1" || $value eq "value2"} + assert {$count eq "100" || $count eq "200"} + + file delete $tsv_file + } + + test {benchmark: XML dataset missing root element error} { + # Create test XML dataset + set xml_data "XML Title 1XML Abstract 1" + set xml_file [tmpfile "dataset.xml"] + set fd [open $xml_file w] + puts $fd $xml_data + close $fd + + # Should fail without --xml-root-element parameter + set cmd [valkeybenchmark $master_host $master_port "--dataset $xml_file -n 1 -- SET xml:__rand_int__ \"__field:title__\""] + + if {[catch { exec {*}$cmd } error]} { + assert_match "*XML dataset requires --xml-root-element parameter*" $error + } else { + fail "Expected error for XML dataset without --xml-root-element" + } + + file delete $xml_file + } + + test {benchmark: dataset with maxdocs larger than available documents} { + # Create test dataset with only 2 rows but request maxdocs=5 + set csv_data "name,value\nitem1,value1\nitem2,value2" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file --maxdocs 5 -n 4 -r 10 -- SET item:__rand_int__ \"__field:value__\""] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat set] + + # Should gracefully use all available documents (2), cycling through them + set keys [r keys "item:*"] + assert {[llength $keys] > 0} + + # All values should still be only from available documents + foreach key $keys { + set value [r get $key] + assert {$value eq "value1" || $value eq "value2"} + } + + file delete $csv_file + } + + test {benchmark: mixed placeholders - dataset fields and rand placeholders} { + # Test combining __field:name__ with __rand_int__ placeholders + set csv_data "category,description\nuser,User Management\norder,Order Processing" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 6 -r 100 -- HSET mixed:__rand_int__ category \"__field:category__\" desc \"__field:description__\" score __rand_1st__"] + common_bench_setup $cmd + assert_match {*calls=6,*} [cmdstat hset] + + # Verify both field and random placeholders work together + set keys [r keys "mixed:*"] + assert {[llength $keys] > 0} + set sample_key [lindex $keys 0] + set category [r hget $sample_key category] + set desc [r hget $sample_key desc] + set score [r hget $sample_key score] + + # Field placeholders should contain dataset values + assert {$category eq "user" || $category eq "order"} + assert {$desc eq "User Management" || $desc eq "Order Processing"} + + # Random placeholder should be a 12-digit number + assert {[string length $score] == 12} + assert {[string is digit $score]} + + file delete $csv_file + } + + test {benchmark: dataset mode requires field placeholders} { + set csv_data "name,value\nitem1,value1\nitem2,value2" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + # Dataset mode should require field placeholders in the command + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 10 -r 10 -t set"] + + # Should fail with error about missing field placeholders + if {[catch { exec {*}$cmd } error]} { + assert_match "*Dataset mode requires a command with field placeholders*" $error + } else { + fail "Expected error for dataset mode without field placeholders" + } + + file delete $csv_file + } + test {benchmark: sequential zadd results in expected number of keys} { set cmd [valkeybenchmark $master_host $master_port "-r 50 -n 50 --sequential -t zadd"] common_bench_setup $cmd