Skip to content

Commit

Permalink
feat: Add sparse vectors benchmark support for Qdrant (#114)
Browse files Browse the repository at this point in the history
* feat: Add sparse vectors benchmark support in Qdrant

* fix: Self review

* feat: Add sparse dataset for CI benchmarks

* feat: Introduce SparseVector class

* feat: Disallow sparse vector dataset being run with non sparse vector engine configs

* feat: use different engine config to run sparse vector benchmarks

* fix: use different engine config to run sparse vector benchmarks

* feat: Optimize CI benchmarks workflow

* feat: Add 1M sparse dataset

* fix: remove scipy, read csr matrix manually (#117)

* fix: remove scipy, read csr matrix manually

* fix: Dataset query reader should have sparse_vector=None by default

* refactor: Changes based on feedback

* refactoring: refactor sparse vector support (#118)

* refactoring: refactor sparse vector support

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* feat: Use pydantic construct

* refactor: Update all engines to use Query and Record dataclasses (#116)

* refactor: Update all engines to use Query and Record dataclasses

* feat: Add ruff in pre-commit hooks

* fix: Type mismatches

* fix: Redis search client types and var names

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix: Type issues detected by linter

* fix: iter_batches func type

* refactor: knn_conditions should be class level constant

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix: Type issue

* fix: Allow python 3.8 since scipy is now removed

* fix: Add missing redis-m-16-ef-128 config

* fix: redis container port

* fix linter

---------

Co-authored-by: George <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: generall <[email protected]>
  • Loading branch information
4 people authored Apr 17, 2024
1 parent b7ec57e commit 5343849
Show file tree
Hide file tree
Showing 49 changed files with 556 additions and 234 deletions.
33 changes: 22 additions & 11 deletions .github/workflows/continuous-benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,29 @@ jobs:
- uses: webfactory/[email protected]
with:
ssh-private-key: ${{ secrets.SSH_PRIVATE_KEY }}
- name: Setup CI
run: bash -x tools/setup_ci.sh
- name: Benches
run: |
export HCLOUD_TOKEN=${{ secrets.HCLOUD_TOKEN }}
export GCS_KEY=${{ secrets.GCS_KEY }}
export GCS_SECRET=${{ secrets.GCS_SECRET }}
export POSTGRES_PASSWORD=${{ secrets.POSTGRES_PASSWORD }}
export POSTGRES_HOST=${{ secrets.POSTGRES_HOST }}
export HCLOUD_TOKEN=${{ secrets.HCLOUD_TOKEN }}
export GCS_KEY=${{ secrets.GCS_KEY }}
export GCS_SECRET=${{ secrets.GCS_SECRET }}
export POSTGRES_PASSWORD=${{ secrets.POSTGRES_PASSWORD }}
export POSTGRES_HOST=${{ secrets.POSTGRES_HOST }}
# Benchmark the dev branch:
export QDRANT_VERSION=ghcr/dev
bash -x tools/run_ci.sh
declare -A DATASET_TO_ENGINE
DATASET_TO_ENGINE["laion-small-clip"]="qdrant-continuous-benchmark"
DATASET_TO_ENGINE["msmarco-sparse-1M"]="qdrant-sparse-vector"
# Benchmark the master branch:
export QDRANT_VERSION=docker/master
bash -x tools/run_ci.sh
for dataset in "${!DATASET_TO_ENGINE[@]}"; do
export ENGINE_NAME=${DATASET_TO_ENGINE[$dataset]}
export DATASETS=$dataset
# Benchmark the dev branch:
export QDRANT_VERSION=ghcr/dev
bash -x tools/run_ci.sh
# Benchmark the master branch:
export QDRANT_VERSION=docker/master
bash -x tools/run_ci.sh
done
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@ repos:
- id: isort
name: "Sort Imports"
args: ["--profile", "black"]

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.3.5
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
14 changes: 11 additions & 3 deletions benchmark/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,28 @@
from dataset_reader.ann_h5_reader import AnnH5Reader
from dataset_reader.base_reader import BaseReader
from dataset_reader.json_reader import JSONReader
from dataset_reader.sparse_reader import SparseReader


@dataclass
class DatasetConfig:
vector_size: int
distance: str
name: str
type: str
path: str

link: Optional[str] = None
schema: Optional[Dict[str, str]] = field(default_factory=dict)
# None in case of sparse vectors:
vector_size: Optional[int] = None
distance: Optional[str] = None


READER_TYPE = {"h5": AnnH5Reader, "jsonl": JSONReader, "tar": AnnCompoundReader}
READER_TYPE = {
"h5": AnnH5Reader,
"jsonl": JSONReader,
"tar": AnnCompoundReader,
"sparse": SparseReader,
}


class Dataset:
Expand Down
1 change: 1 addition & 0 deletions dataset_reader/ann_compound_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def read_queries(self) -> Iterator[Query]:
vector /= np.linalg.norm(vector)
yield Query(
vector=vector.tolist(),
sparse_vector=None,
meta_conditions=row_json["conditions"],
expected_result=row_json["closest_ids"],
expected_scores=row_json["closest_scores"],
Expand Down
5 changes: 4 additions & 1 deletion dataset_reader/ann_h5_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def read_queries(self) -> Iterator[Query]:
vector /= np.linalg.norm(vector)
yield Query(
vector=vector.tolist(),
sparse_vector=None,
meta_conditions=None,
expected_result=expected_result.tolist(),
expected_scores=expected_scores.tolist(),
Expand All @@ -33,7 +34,9 @@ def read_data(self) -> Iterator[Record]:
for idx, vector in enumerate(data["train"]):
if self.normalize:
vector /= np.linalg.norm(vector)
yield Record(id=idx, vector=vector.tolist(), metadata=None)
yield Record(
id=idx, vector=vector.tolist(), sparse_vector=None, metadata=None
)


if __name__ == "__main__":
Expand Down
12 changes: 10 additions & 2 deletions dataset_reader/base_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@
from typing import Iterator, List, Optional


@dataclass
class SparseVector:
indices: List[int]
values: List[float]


@dataclass
class Record:
id: int
vector: List[float]
vector: Optional[List[float]]
sparse_vector: Optional[SparseVector]
metadata: Optional[dict]


@dataclass
class Query:
vector: List[float]
vector: Optional[List[float]]
sparse_vector: Optional[SparseVector]
meta_conditions: Optional[dict]
expected_result: Optional[List[int]]
expected_scores: Optional[List[float]] = None
Expand Down
9 changes: 7 additions & 2 deletions dataset_reader/json_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ def read_queries(self) -> Iterator[Query]:
):
# ToDo: add meta_conditions

yield Query(vector=vector, meta_conditions=None, expected_result=neighbours)
yield Query(
vector=vector,
sparse_vector=None,
meta_conditions=None,
expected_result=neighbours,
)

def read_data(self) -> Iterator[Record]:
for idx, (vector, payload) in enumerate(
zip(self.read_vectors(), self.read_payloads())
):
yield Record(id=idx, vector=vector, metadata=payload)
yield Record(id=idx, vector=vector, sparse_vector=None, metadata=payload)


if __name__ == "__main__":
Expand Down
100 changes: 100 additions & 0 deletions dataset_reader/sparse_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import os
from pathlib import Path
from typing import Iterator, List, Tuple, Union

import numpy as np

from dataset_reader.base_reader import BaseReader, Query, Record, SparseVector


def read_sparse_matrix_fields(
filename: Union[Path, str]
) -> Tuple[np.array, np.array, np.array]:
"""Read the fields of a CSR matrix without instantiating it"""

with open(filename, "rb") as f:
sizes = np.fromfile(f, dtype="int64", count=3)
n_row, n_col, n_non_zero = sizes
index_pointer = np.fromfile(f, dtype="int64", count=n_row + 1)
assert n_non_zero == index_pointer[-1]
columns = np.fromfile(f, dtype="int32", count=n_non_zero)
assert np.all(columns >= 0) and np.all(columns < n_col)
values = np.fromfile(f, dtype="float32", count=n_non_zero)
return values, columns, index_pointer


def csr_to_sparse_vectors(
values: List[float], columns: List[int], index_pointer: List[int]
) -> Iterator[SparseVector]:
num_rows = len(index_pointer) - 1

for i in range(num_rows):
start = index_pointer[i]
end = index_pointer[i + 1]
row_values, row_indices = [], []
for j in range(start, end):
row_values.append(values[j])
row_indices.append(columns[j])
yield SparseVector(indices=row_indices, values=row_values)


def read_csr_matrix(filename: Union[Path, str]) -> Iterator[SparseVector]:
"""Read a CSR matrix in spmat format"""
values, columns, index_pointer = read_sparse_matrix_fields(filename)
values = values.tolist()
columns = columns.tolist()
index_pointer = index_pointer.tolist()

yield from csr_to_sparse_vectors(values, columns, index_pointer)


def knn_result_read(
filename: Union[Path, str]
) -> Tuple[List[List[int]], List[List[float]]]:
n, d = map(int, np.fromfile(filename, dtype="uint32", count=2))
assert os.stat(filename).st_size == 8 + n * d * (4 + 4)
with open(filename, "rb") as f:
f.seek(4 + 4)
ids = np.fromfile(f, dtype="int32", count=n * d).reshape(n, d).tolist()
scores = np.fromfile(f, dtype="float32", count=n * d).reshape(n, d).tolist()
return ids, scores


class SparseReader(BaseReader):
def __init__(self, path, normalize=False):
self.path = path
self.normalize = normalize

def read_queries(self) -> Iterator[Query]:
queries_path = self.path / "queries.csr"
X = read_csr_matrix(queries_path)

gt_path = self.path / "results.gt"
gt_indices, _ = knn_result_read(gt_path)

for i, sparse_vector in enumerate(X):
yield Query(
vector=None,
sparse_vector=sparse_vector,
meta_conditions=None,
expected_result=gt_indices[i],
)

def read_data(self) -> Iterator[Record]:
data_path = self.path / "data.csr"
X = read_csr_matrix(data_path)

for i, sparse_vector in enumerate(X):
yield Record(id=i, vector=None, sparse_vector=sparse_vector, metadata=None)


if __name__ == "__main__":
vals = [1, 3, 2, 3, 6, 4, 5]
cols = [0, 2, 2, 1, 3, 0, 2]
pointers = [0, 2, 3, 5, 7]
vecs = [vec for vec in csr_to_sparse_vectors(vals, cols, pointers)]

assert vecs[0] == SparseVector(indices=[0, 2], values=[1, 3])
assert vecs[1] == SparseVector(indices=[2], values=[2])
assert vecs[2] == SparseVector(indices=[1, 3], values=[3, 6])
assert vecs[3] == SparseVector(indices=[0, 2], values=[4, 5])
12 changes: 12 additions & 0 deletions datasets/datasets.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@
"path": "dbpedia-openai-1M-1536-angular/dbpedia_openai_1M",
"link": "https://storage.googleapis.com/ann-filtered-benchmark/datasets/dbpedia_openai_1M.tgz"
},
{
"name": "msmarco-sparse-100K",
"type": "sparse",
"path": "msmarco-sparse/100K",
"link": "https://storage.googleapis.com/ann-filtered-benchmark/datasets/msmacro-sparse-100K.tar.gz"
},
{
"name": "msmarco-sparse-1M",
"type": "sparse",
"path": "msmarco-sparse/1M",
"link": "https://storage.googleapis.com/ann-filtered-benchmark/datasets/msmacro-sparse-1M.tar.gz"
},
{
"name": "h-and-m-2048-angular-filters",
"vector_size": 2048,
Expand Down
9 changes: 9 additions & 0 deletions engine/base_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@

class IncompatibilityError(Exception):
pass


__all__ = [
"BaseClient",
"BaseConfigurator",
"BaseSearcher",
"BaseUploader",
"IncompatibilityError",
]
5 changes: 4 additions & 1 deletion engine/base_client/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import os
from datetime import datetime
from pathlib import Path
from typing import List

from benchmark import ROOT_DIR
Expand Down Expand Up @@ -31,6 +30,10 @@ def __init__(
self.searchers = searchers
self.engine = engine

@property
def sparse_vector_support(self):
return self.configurator.SPARSE_VECTOR_SUPPORT

def save_search_results(
self, dataset_name: str, results: dict, search_id: int, search_params: dict
):
Expand Down
1 change: 1 addition & 0 deletions engine/base_client/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


class BaseConfigurator:
SPARSE_VECTOR_SUPPORT: bool = False
DISTANCE_MAPPING = {}

def __init__(self, host, collection_params: dict, connection_params: dict):
Expand Down
8 changes: 3 additions & 5 deletions engine/base_client/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ def get_mp_start_method(cls):
return None

@classmethod
def search_one(
cls, vector: List[float], meta_conditions, top: Optional[int]
) -> List[Tuple[int, float]]:
def search_one(cls, query: Query, top: Optional[int]) -> List[Tuple[int, float]]:
raise NotImplementedError()

@classmethod
def _search_one(cls, query, top: Optional[int] = None):
def _search_one(cls, query: Query, top: Optional[int] = None):
if top is None:
top = (
len(query.expected_result)
Expand All @@ -45,7 +43,7 @@ def _search_one(cls, query, top: Optional[int] = None):
)

start = time.perf_counter()
search_res = cls.search_one(query.vector, query.meta_conditions, top)
search_res = cls.search_one(query, top)
end = time.perf_counter()

precision = 1.0
Expand Down
13 changes: 4 additions & 9 deletions engine/base_client/upload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
from multiprocessing import get_context
from typing import Iterable, List, Optional, Tuple
from typing import Iterable, List

import tqdm

Expand Down Expand Up @@ -80,22 +80,17 @@ def upload(
}

@classmethod
def _upload_batch(
cls, batch: Tuple[List[int], List[list], List[Optional[dict]]]
) -> float:
ids, vectors, metadata = batch
def _upload_batch(cls, batch: List[Record]) -> float:
start = time.perf_counter()
cls.upload_batch(ids, vectors, metadata)
cls.upload_batch(batch)
return time.perf_counter() - start

@classmethod
def post_upload(cls, distance):
return {}

@classmethod
def upload_batch(
cls, ids: List[int], vectors: List[list], metadata: List[Optional[dict]]
):
def upload_batch(cls, batch: List[Record]):
raise NotImplementedError()

@classmethod
Expand Down
Loading

0 comments on commit 5343849

Please sign in to comment.