-
Notifications
You must be signed in to change notification settings - Fork 90
/
client_factory.py
112 lines (100 loc) · 3.41 KB
/
client_factory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from abc import ABC
from typing import List, Type
from engine.base_client.client import (
BaseClient,
BaseConfigurator,
BaseSearcher,
BaseUploader,
)
from engine.clients.elasticsearch import (
ElasticConfigurator,
ElasticSearcher,
ElasticUploader,
)
from engine.clients.milvus import MilvusConfigurator, MilvusSearcher, MilvusUploader
from engine.clients.opensearch import (
OpenSearchConfigurator,
OpenSearchSearcher,
OpenSearchUploader,
)
from engine.clients.pgvector import (
PgVectorConfigurator,
PgVectorSearcher,
PgVectorUploader,
)
from engine.clients.qdrant import QdrantConfigurator, QdrantSearcher, QdrantUploader
from engine.clients.redis import RedisConfigurator, RedisSearcher, RedisUploader
from engine.clients.weaviate import (
WeaviateConfigurator,
WeaviateSearcher,
WeaviateUploader,
)
ENGINE_CONFIGURATORS = {
"qdrant": QdrantConfigurator,
"weaviate": WeaviateConfigurator,
"milvus": MilvusConfigurator,
"elasticsearch": ElasticConfigurator,
"opensearch": OpenSearchConfigurator,
"redis": RedisConfigurator,
"pgvector": PgVectorConfigurator,
}
ENGINE_UPLOADERS = {
"qdrant": QdrantUploader,
"weaviate": WeaviateUploader,
"milvus": MilvusUploader,
"elasticsearch": ElasticUploader,
"opensearch": OpenSearchUploader,
"redis": RedisUploader,
"pgvector": PgVectorUploader,
}
ENGINE_SEARCHERS = {
"qdrant": QdrantSearcher,
"weaviate": WeaviateSearcher,
"milvus": MilvusSearcher,
"elasticsearch": ElasticSearcher,
"opensearch": OpenSearchSearcher,
"redis": RedisSearcher,
"pgvector": PgVectorSearcher,
}
class ClientFactory(ABC):
def __init__(self, host):
self.host = host
self.engine = None
def _create_configurator(self, experiment) -> BaseConfigurator:
self.engine = experiment["engine"]
engine_configurator_class = ENGINE_CONFIGURATORS[experiment["engine"]]
engine_configurator = engine_configurator_class(
self.host,
collection_params={**experiment.get("collection_params", {})},
connection_params={**experiment.get("connection_params", {})},
)
return engine_configurator
def _create_uploader(self, experiment) -> BaseUploader:
engine_uploader_class = ENGINE_UPLOADERS[experiment["engine"]]
engine_uploader = engine_uploader_class(
self.host,
connection_params={**experiment.get("connection_params", {})},
upload_params={**experiment.get("upload_params", {})},
)
return engine_uploader
def _create_searchers(self, experiment) -> List[BaseSearcher]:
engine_searcher_class: Type[BaseSearcher] = ENGINE_SEARCHERS[
experiment["engine"]
]
engine_searchers = [
engine_searcher_class(
self.host,
connection_params={**experiment.get("connection_params", {})},
search_params=search_params,
)
for search_params in experiment.get("search_params", [{}])
]
return engine_searchers
def build_client(self, experiment):
return BaseClient(
name=experiment["name"],
engine=experiment["engine"],
configurator=self._create_configurator(experiment),
uploader=self._create_uploader(experiment),
searchers=self._create_searchers(experiment),
)