generated from rajrajhans/phoenix-elixir-nix-starter
-
Notifications
You must be signed in to change notification settings - Fork 3
/
clip_index.ex
155 lines (129 loc) · 5.59 KB
/
clip_index.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
defmodule MediaSearchDemo.Clip.Index do
@moduledoc """
Module for building and searching the clip index.
Building the index: Given some images, we want to create their clip embeddings
and build an Annoy index from those embeddings.
"""
require Logger
alias MediaSearchDemo.Vectorizer
alias MediaSearchDemo.ANN
alias MediaSearchDemo.Clip.ClipIndexAgent
@type search_result :: %{
filename: String.t(),
distance: float(),
url: String.t()
}
@doc """
Builds the clip index from the images in the image directory, and saves the index and filenames to disk. (The filenames array is used to map from ANN result to filename while searching)
Args:
- ann_index_save_path -> where to save the index, defaults to priv/clip_index.ann
- filenames_save_path -> where to save the list of filenames, defaults to priv/clip_index_filenames.json
- image_directory -> directory containing the images, defaults to priv/images
"""
@dialyzer {:nowarn_function, build_index: 3}
def build_index(
ann_index_save_path \\ Application.get_env(:media_search_demo, :ann_index_save_path),
filenames_save_path \\ Application.get_env(:media_search_demo, :filenames_save_path),
image_directory \\ Application.get_env(:media_search_demo, :image_directory)
) do
# list images in image directory
all_images = File.ls!(image_directory) |> Enum.reject(&(&1 |> String.starts_with?(".")))
# vectorize each image, and create a tuple with {vector, file_name} for each image
vectors_with_file_name =
all_images
|> Enum.with_index()
# task async stream is used to vectorize images in parallel (note: since we are using Axon.predict and not Nx.Serving inside vectorizer, we are still limited to actually vectorizing just one image at a time)
|> Task.async_stream(
fn {image_file_name, i} ->
Logger.debug("[CLIP_INDEX] Indexing image #{i} #{image_file_name}")
image_path = Path.join(image_directory, image_file_name)
image_data = File.read!(image_path)
case Vectorizer.vectorize_image(image_data) do
{:ok, vector} ->
Logger.debug("[CLIP_INDEX] Done indexing image #{i} #{image_file_name}")
{vector, image_file_name}
{:error, reason} ->
Logger.error(
"[CLIP_INDEX] Failed to vectorize image #{image_path}: #{inspect(reason)}"
)
nil
end
end,
max_concurrency: 5,
ordered: false,
timeout: :timer.minutes(2),
on_timeout: :kill_task
)
|> Enum.map(fn {:ok, x} -> x end)
|> Enum.to_list()
|> Enum.reject(&is_nil/1)
vectors = Enum.map(vectors_with_file_name, fn {vector, _filename} -> vector end)
filenames = Enum.map(vectors_with_file_name, fn {_vector, filename} -> filename end)
dimension = Application.get_env(:media_search_demo, :clip_embedding_dimension)
with {:ok, ann_index} <- ANN.build_index(dimension, vectors) do
ANN.save_index(ann_index, ann_index_save_path)
File.write!(filenames_save_path, Jason.encode!(filenames))
Logger.info("[CLIP_INDEX] Successfully built index")
end
rescue
e ->
Logger.error("[CLIP_INDEX] Failed to build index: #{inspect(e)}")
{:error, :build_index_failed}
end
@dialyzer {:nowarn_function, search_index_with_text: 1}
@spec search_index_with_text(String.t()) :: {:ok, list(search_result)} | {:error, any()}
def search_index_with_text(query) do
Logger.debug("[CLIP_INDEX] Searching index for query #{query}")
with {:ok, query_vector} <- Vectorizer.vectorize_text(query),
{:ok, search_results} <- search_index_with_tensor(query_vector) do
{:ok, search_results}
else
{:error, reason} ->
{:error, reason}
end
end
@dialyzer {:nowarn_function, search_index_with_image: 1}
@spec search_index_with_image(binary()) :: {:ok, list(search_result)} | {:error, any()}
def search_index_with_image(query_image) do
Logger.debug("[CLIP_INDEX] Searching index with image")
with {:ok, query_vector} <- Vectorizer.vectorize_image(query_image),
{:ok, search_results} <- search_index_with_tensor(query_vector) do
{:ok, search_results}
else
{:error, reason} ->
{:error, reason}
end
end
@dialyzer {:nowarn_function, search_index_with_tensor: 1}
@spec search_index_with_tensor(Nx.Tensor.t()) :: {:ok, list(search_result)} | {:error, any()}
def search_index_with_tensor(query_vector) do
ann_index_reference = ClipIndexAgent.get_ann_index()
filenames = ClipIndexAgent.get_filenames()
with {:ok, labels, dists} <- ANN.get_nearest_neighbors(ann_index_reference, query_vector, 15) do
result_indices = labels |> Nx.to_flat_list()
distances = dists |> Nx.to_flat_list()
search_results =
result_indices
|> Enum.with_index()
|> Enum.map(fn {result_index, i} ->
# result index is the index of the image in filenames
# i is the index of the result in the result_indices list
filename = filenames |> Enum.at(result_index)
%{
filename: filename,
url: get_url_from_file_name(filename),
distance: distances |> Enum.at(i)
}
end)
{:ok, search_results}
else
{:error, reason} ->
Logger.error("[CLIP_INDEX] Failed to search index: #{inspect(reason)}")
{:error, :search_index_failed}
end
end
@dialyzer {:nowarn_function, get_url_from_file_name: 1}
defp get_url_from_file_name(file_name) do
"/static/#{file_name}"
end
end