Skip to content

Commit f2e0e22

Browse files
joeingenerall
andauthored
new: expose cluster operations (#1114)
* new: expose cluster operations * fix: fix cluster collection update return result, add tests * fix test --------- Co-authored-by: generall <andrey@vasnetsov.com>
1 parent 27acfd0 commit f2e0e22

12 files changed

Lines changed: 190 additions & 120 deletions

qdrant_client/async_client_base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,3 +367,8 @@ async def delete_shard_key(
367367

368368
async def info(self) -> types.VersionInfo:
369369
raise NotImplementedError()
370+
371+
async def cluster_collection_update(
372+
self, collection_name: str, cluster_operation: types.ClusterOperations, **kwargs: Any
373+
) -> bool:
374+
raise NotImplementedError()

qdrant_client/async_qdrant_client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2286,3 +2286,18 @@ async def info(self) -> types.VersionInfo:
22862286
22872287
"""
22882288
return await self._client.info()
2289+
2290+
async def cluster_collection_update(
2291+
self,
2292+
collection_name: str,
2293+
cluster_operation: types.ClusterOperations,
2294+
timeout: Optional[int] = None,
2295+
**kwargs: Any,
2296+
) -> bool:
2297+
assert len(kwargs) == 0, f"Unknown arguments: {list(kwargs.keys())}"
2298+
return await self._client.cluster_collection_update(
2299+
collection_name=collection_name,
2300+
cluster_operation=cluster_operation,
2301+
timeout=timeout,
2302+
**kwargs,
2303+
)

qdrant_client/async_qdrant_remote.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2394,3 +2394,20 @@ async def info(self) -> types.VersionInfo:
23942394
version_info = await self.rest.service_api.root()
23952395
assert version_info is not None, "Healthcheck returned None"
23962396
return version_info
2397+
2398+
async def cluster_collection_update(
2399+
self,
2400+
collection_name: str,
2401+
cluster_operation: types.ClusterOperations,
2402+
timeout: Optional[int] = None,
2403+
**kwargs: Any,
2404+
) -> bool:
2405+
update_result = (
2406+
await self.rest.distributed_api.update_collection_cluster(
2407+
collection_name=collection_name,
2408+
cluster_operations=cluster_operation,
2409+
timeout=timeout,
2410+
)
2411+
).result
2412+
assert update_result is not None, "Cluster collection update returned None"
2413+
return update_result

qdrant_client/client_base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,11 @@ def delete_shard_key(
394394

395395
def info(self) -> types.VersionInfo:
396396
raise NotImplementedError()
397+
398+
def cluster_collection_update(
399+
self,
400+
collection_name: str,
401+
cluster_operation: types.ClusterOperations,
402+
**kwargs: Any,
403+
) -> bool:
404+
raise NotImplementedError()

qdrant_client/conversions/common_types.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ def get_args_subscribed(tp): # type: ignore
136136
VersionInfo: TypeAlias = rest.VersionInfo
137137

138138
ReplicaState: TypeAlias = rest.ReplicaState
139+
ClusterOperations: TypeAlias = rest.ClusterOperations
140+
139141
# we can't use `nptyping` package due to numpy/python-version incompatibilities
140142
# thus we need to define precise type annotations while we support python3.7
141143
_np_numeric = Union[

qdrant_client/embed/_inspection_cache.py

Lines changed: 12 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -3150,28 +3150,10 @@
31503150
"title": "Fusion",
31513151
"type": "string",
31523152
},
3153-
"ShardTransferMethodOneOf": {
3154-
"description": "Stream all shard records in batches until the whole shard is transferred.",
3155-
"enum": ["stream_records"],
3156-
"title": "ShardTransferMethodOneOf",
3157-
"type": "string",
3158-
},
3159-
"ShardTransferMethodOneOf1": {
3160-
"description": "Snapshot the shard, transfer and restore it on the receiver.",
3161-
"enum": ["snapshot"],
3162-
"title": "ShardTransferMethodOneOf1",
3163-
"type": "string",
3164-
},
3165-
"ShardTransferMethodOneOf2": {
3166-
"description": "Attempt to transfer shard difference by WAL delta.",
3167-
"enum": ["wal_delta"],
3168-
"title": "ShardTransferMethodOneOf2",
3169-
"type": "string",
3170-
},
3171-
"ShardTransferMethodOneOf3": {
3172-
"description": "Shard transfer for resharding: stream all records in batches until all points are transferred.",
3173-
"enum": ["resharding_stream_records"],
3174-
"title": "ShardTransferMethodOneOf3",
3153+
"ShardTransferMethod": {
3154+
"description": "Methods for transferring a shard from one node to another. - `stream_records` - Stream all shard records in batches until the whole shard is transferred. - `snapshot` - Snapshot the shard, transfer and restore it on the receiver. - `wal_delta` - Attempt to transfer shard difference by WAL delta. - `resharding_stream_records` - Shard transfer for resharding: stream all records in batches until all points are transferred.",
3155+
"enum": ["stream_records", "snapshot", "wal_delta", "resharding_stream_records"],
3156+
"title": "ShardTransferMethod",
31753157
"type": "string",
31763158
},
31773159
"MoveShard": {
@@ -3181,16 +3163,9 @@
31813163
"to_peer_id": {"description": "", "title": "To Peer Id", "type": "integer"},
31823164
"from_peer_id": {"description": "", "title": "From Peer Id", "type": "integer"},
31833165
"method": {
3184-
"anyOf": [
3185-
{"$ref": "#/$defs/ShardTransferMethodOneOf"},
3186-
{"$ref": "#/$defs/ShardTransferMethodOneOf1"},
3187-
{"$ref": "#/$defs/ShardTransferMethodOneOf2"},
3188-
{"$ref": "#/$defs/ShardTransferMethodOneOf3"},
3189-
{"type": "null"},
3190-
],
3166+
"anyOf": [{"$ref": "#/$defs/ShardTransferMethod"}, {"type": "null"}],
31913167
"default": None,
31923168
"description": "Method for transferring the shard from one node to another",
3193-
"title": "Method",
31943169
},
31953170
},
31963171
"required": ["shard_id", "to_peer_id", "from_peer_id"],
@@ -4031,16 +4006,9 @@
40314006
"to_peer_id": {"description": "", "title": "To Peer Id", "type": "integer"},
40324007
"from_peer_id": {"description": "", "title": "From Peer Id", "type": "integer"},
40334008
"method": {
4034-
"anyOf": [
4035-
{"$ref": "#/$defs/ShardTransferMethodOneOf"},
4036-
{"$ref": "#/$defs/ShardTransferMethodOneOf1"},
4037-
{"$ref": "#/$defs/ShardTransferMethodOneOf2"},
4038-
{"$ref": "#/$defs/ShardTransferMethodOneOf3"},
4039-
{"type": "null"},
4040-
],
4009+
"anyOf": [{"$ref": "#/$defs/ShardTransferMethod"}, {"type": "null"}],
40414010
"default": None,
40424011
"description": "Method for transferring the shard from one node to another",
4043-
"title": "Method",
40444012
},
40454013
},
40464014
"required": ["shard_id", "to_peer_id", "from_peer_id"],
@@ -4053,16 +4021,7 @@
40534021
"shard_id": {"description": "", "title": "Shard Id", "type": "integer"},
40544022
"from_peer_id": {"description": "", "title": "From Peer Id", "type": "integer"},
40554023
"to_peer_id": {"description": "", "title": "To Peer Id", "type": "integer"},
4056-
"method": {
4057-
"anyOf": [
4058-
{"$ref": "#/$defs/ShardTransferMethodOneOf"},
4059-
{"$ref": "#/$defs/ShardTransferMethodOneOf1"},
4060-
{"$ref": "#/$defs/ShardTransferMethodOneOf2"},
4061-
{"$ref": "#/$defs/ShardTransferMethodOneOf3"},
4062-
],
4063-
"description": "",
4064-
"title": "Method",
4065-
},
4024+
"method": {"$ref": "#/$defs/ShardTransferMethod", "description": ""},
40664025
},
40674026
"required": ["shard_id", "from_peer_id", "to_peer_id", "method"],
40684027
"title": "RestartTransfer",
@@ -4186,29 +4145,16 @@
41864145
"title": "SnapshotPriority",
41874146
"type": "string",
41884147
},
4189-
"ReshardingDirectionOneOf": {
4190-
"description": "Scale up, add a new shard",
4191-
"enum": ["up"],
4192-
"title": "ReshardingDirectionOneOf",
4193-
"type": "string",
4194-
},
4195-
"ReshardingDirectionOneOf1": {
4196-
"description": "Scale down, remove a shard",
4197-
"enum": ["down"],
4198-
"title": "ReshardingDirectionOneOf1",
4148+
"ReshardingDirection": {
4149+
"description": "Resharding direction, scale up or down in number of shards - `up` - Scale up, add a new shard - `down` - Scale down, remove a shard",
4150+
"enum": ["up", "down"],
4151+
"title": "ReshardingDirection",
41994152
"type": "string",
42004153
},
42014154
"StartResharding": {
42024155
"additionalProperties": False,
42034156
"properties": {
4204-
"direction": {
4205-
"anyOf": [
4206-
{"$ref": "#/$defs/ReshardingDirectionOneOf"},
4207-
{"$ref": "#/$defs/ReshardingDirectionOneOf1"},
4208-
],
4209-
"description": "",
4210-
"title": "Direction",
4211-
},
4157+
"direction": {"$ref": "#/$defs/ReshardingDirection", "description": ""},
42124158
"peer_id": {
42134159
"anyOf": [{"type": "integer"}, {"type": "null"}],
42144160
"default": None,

qdrant_client/http/models/models.py

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2425,25 +2425,15 @@ class RequestsTelemetry(BaseModel):
24252425
grpc: "GrpcTelemetry" = Field(..., description="")
24262426

24272427

2428-
class ReshardingDirectionOneOf(str, Enum):
2428+
class ReshardingDirection(str, Enum):
24292429
"""
2430-
Scale up, add a new shard
2430+
Resharding direction, scale up or down in number of shards - `up` - Scale up, add a new shard - `down` - Scale down, remove a shard
24312431
"""
24322432

24332433
def __str__(self) -> str:
24342434
return str(self.value)
24352435

24362436
UP = "up"
2437-
2438-
2439-
class ReshardingDirectionOneOf1(str, Enum):
2440-
"""
2441-
Scale down, remove a shard
2442-
"""
2443-
2444-
def __str__(self) -> str:
2445-
return str(self.value)
2446-
24472437
DOWN = "down"
24482438

24492439

@@ -2834,47 +2824,17 @@ class ShardTransferInfo(BaseModel):
28342824
)
28352825

28362826

2837-
class ShardTransferMethodOneOf(str, Enum):
2827+
class ShardTransferMethod(str, Enum):
28382828
"""
2839-
Stream all shard records in batches until the whole shard is transferred.
2829+
Methods for transferring a shard from one node to another. - `stream_records` - Stream all shard records in batches until the whole shard is transferred. - `snapshot` - Snapshot the shard, transfer and restore it on the receiver. - `wal_delta` - Attempt to transfer shard difference by WAL delta. - `resharding_stream_records` - Shard transfer for resharding: stream all records in batches until all points are transferred.
28402830
"""
28412831

28422832
def __str__(self) -> str:
28432833
return str(self.value)
28442834

28452835
STREAM_RECORDS = "stream_records"
2846-
2847-
2848-
class ShardTransferMethodOneOf1(str, Enum):
2849-
"""
2850-
Snapshot the shard, transfer and restore it on the receiver.
2851-
"""
2852-
2853-
def __str__(self) -> str:
2854-
return str(self.value)
2855-
28562836
SNAPSHOT = "snapshot"
2857-
2858-
2859-
class ShardTransferMethodOneOf2(str, Enum):
2860-
"""
2861-
Attempt to transfer shard difference by WAL delta.
2862-
"""
2863-
2864-
def __str__(self) -> str:
2865-
return str(self.value)
2866-
28672837
WAL_DELTA = "wal_delta"
2868-
2869-
2870-
class ShardTransferMethodOneOf3(str, Enum):
2871-
"""
2872-
Shard transfer for resharding: stream all records in batches until all points are transferred.
2873-
"""
2874-
2875-
def __str__(self) -> str:
2876-
return str(self.value)
2877-
28782838
RESHARDING_STREAM_RECORDS = "resharding_stream_records"
28792839

28802840

@@ -3745,10 +3705,6 @@ def __str__(self) -> str:
37453705
StrictInt,
37463706
ReadConsistencyType,
37473707
]
3748-
ReshardingDirection = Union[
3749-
ReshardingDirectionOneOf,
3750-
ReshardingDirectionOneOf1,
3751-
]
37523708
ShardCleanStatusTelemetry = Union[
37533709
ShardCleanStatusTelemetryOneOf,
37543710
ShardCleanStatusTelemetryOneOf1,
@@ -3761,12 +3717,6 @@ def __str__(self) -> str:
37613717
ShardSnapshotLocation = Union[
37623718
StrictStr,
37633719
]
3764-
ShardTransferMethod = Union[
3765-
ShardTransferMethodOneOf,
3766-
ShardTransferMethodOneOf1,
3767-
ShardTransferMethodOneOf2,
3768-
ShardTransferMethodOneOf3,
3769-
]
37703720
SparseIndexType = Union[
37713721
SparseIndexTypeOneOf,
37723722
SparseIndexTypeOneOf1,

qdrant_client/local/async_qdrant_local.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,3 +941,14 @@ async def info(self) -> types.VersionInfo:
941941
return rest_models.VersionInfo(
942942
title="qdrant - vector search engine", version=version, commit=None
943943
)
944+
945+
async def cluster_collection_update(
946+
self,
947+
collection_name: str,
948+
cluster_operation: types.ClusterOperations,
949+
timeout: Optional[int] = None,
950+
**kwargs: Any,
951+
) -> bool:
952+
raise NotImplementedError(
953+
"Cluster collection updates is not supported in the local Qdrant. Please use server Qdrant if you need a cluster"
954+
)

qdrant_client/local/qdrant_local.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,3 +1009,15 @@ def info(self) -> types.VersionInfo:
10091009
return rest_models.VersionInfo(
10101010
title="qdrant - vector search engine", version=version, commit=None
10111011
)
1012+
1013+
def cluster_collection_update(
1014+
self,
1015+
collection_name: str,
1016+
cluster_operation: types.ClusterOperations,
1017+
timeout: Optional[int] = None,
1018+
**kwargs: Any,
1019+
) -> bool:
1020+
raise NotImplementedError(
1021+
"Cluster collection updates is not supported in the local Qdrant. "
1022+
"Please use server Qdrant if you need a cluster"
1023+
)

qdrant_client/qdrant_client.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2415,3 +2415,19 @@ def info(self) -> types.VersionInfo:
24152415
24162416
"""
24172417
return self._client.info()
2418+
2419+
def cluster_collection_update(
2420+
self,
2421+
collection_name: str,
2422+
cluster_operation: types.ClusterOperations,
2423+
timeout: Optional[int] = None,
2424+
**kwargs: Any,
2425+
) -> bool:
2426+
assert len(kwargs) == 0, f"Unknown arguments: {list(kwargs.keys())}"
2427+
2428+
return self._client.cluster_collection_update(
2429+
collection_name=collection_name,
2430+
cluster_operation=cluster_operation,
2431+
timeout=timeout,
2432+
**kwargs,
2433+
)

0 commit comments

Comments
 (0)