Skip to content

Commit

Permalink
cleanup: use a better algo for collapsing
Browse files Browse the repository at this point in the history
  • Loading branch information
densumesh authored and cdxker committed Aug 7, 2024
1 parent 4f5b638 commit 5cdcd38
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 82 deletions.
189 changes: 112 additions & 77 deletions docker/collapse-query-script/collapse_queries.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import clickhouse_connect.driver
import clickhouse_connect
import os
import dotenv
Expand All @@ -8,72 +7,55 @@


def get_search_queries(
client,
dataset_id: uuid.UUID,
limit=5000,
offset=Optional[uuid.UUID],
client, dataset_id: uuid.UUID, limit=5000, offset=Optional[datetime.datetime]
):
query = """
SELECT id, query, top_score, created_at
SELECT id, query, top_score, created_at, search_type, request_params, latency, results, query_vector, is_duplicate, query_rating, dataset_id
FROM default.search_queries
WHERE dataset_id = '{}' AND is_duplicate = 0
ORDER BY created_at, length(query)
LIMIT {}
""".format(
str(dataset_id),
limit,
str(dataset_id), limit
)
if offset is not None:
query = """
SELECT id, query, top_score, created_at
SELECT id, query, top_score, created_at, search_type, request_params, latency, results, query_vector, is_duplicate, query_rating, dataset_id
FROM default.search_queries
WHERE dataset_id = '{}'
AND created_at >= '{}' AND is_duplicate = 0
AND created_at >= '{}'
ORDER BY created_at, length(query)
LIMIT {}
""".format(
str(dataset_id),
str(offset),
datetime.datetime.utcfromtimestamp(offset).isoformat(),
limit,
)
vector_result = client.query(query)
rows = vector_result.result_rows
return rows
vector_result = client.query(query, query_formats={"Date*": "int"})
return vector_result.result_rows


def get_datasets(client):
query = """
SELECT DISTINCT dataset_id
FROM default.search_queries
"""
query = "SELECT DISTINCT dataset_id FROM default.search_queries"
dataset_result = client.query(query)
rows = dataset_result.result_rows
return rows
return dataset_result.result_rows


def get_dataset_last_collapsed(
client, dataset_id: uuid.UUID
):
def get_dataset_last_collapsed(client, dataset_id: uuid.UUID):
query = """
SELECT last_collapsed
FROM default.last_collapsed_dataset
WHERE dataset_id = '{}'
""".format(
str(dataset_id)
)
dataset_result = client.query(query, query_formats={"DateTime": "int"})
dataset_result = client.query(query, query_formats={"Date*": "int"})
row = dataset_result.result_rows
if len(row) == 1:
return datetime.datetime.fromtimestamp(row[0][0])
return None


return row[0][0] if row else None


def set_dataset_last_collapsed(
client,
dataset_id: uuid.UUID,
last_collapsed: datetime.datetime,
client, dataset_id: uuid.UUID, last_collapsed: datetime.datetime
):
client.insert(
"last_collapsed_dataset",
Expand All @@ -94,84 +76,137 @@ def set_dataset_last_collapsed(
)


def collapse_queries(rows):
def collapse_queries(rows, look_range=10, time_window=10):
rows_to_be_deleted = []
cur_row = None
for row in rows:
if cur_row is None:
cur_row = row
elif row[1].startswith(cur_row[1]):
# Check if the current row's timestamp is within 10 seconds of the previous row
time_difference = (row[3] - cur_row[3]).total_seconds()
if time_difference <= 10:
rows_to_be_deleted.append(cur_row)
cur_row = row
sorted_rows = sorted(rows, key=lambda x: x[3]) # Sort by timestamp

for i, current_row in enumerate(sorted_rows):
current_query = current_row[1].strip().lower()
is_duplicate = False
longest_query = current_query
longest_row = current_row

# Look behind
start = max(0, i - look_range)
for j in range(start, i):
prev_row = sorted_rows[j]
prev_query = prev_row[1].strip().lower()
time_difference = current_row[3] - prev_row[3]

if time_difference > time_window:
continue

if current_query.startswith(prev_query) or prev_query.startswith(
current_query
):
is_duplicate = True
if len(prev_query) > len(longest_query):
longest_query = prev_query
longest_row = prev_row

# Look ahead
end = min(len(sorted_rows), i + look_range + 1)
for j in range(i + 1, end):
next_row = sorted_rows[j]
next_query = next_row[1].strip().lower()
time_difference = next_row[3] - current_row[3]

if time_difference > time_window:
break

if current_query.startswith(next_query) or next_query.startswith(
current_query
):
is_duplicate = True
if len(next_query) > len(longest_query):
longest_query = next_query
longest_row = next_row

if is_duplicate:
if longest_row != current_row:
rows_to_be_deleted.append(current_row)
else:
cur_row = row
else:
cur_row = row
return rows_to_be_deleted
# If current row is the longest, mark others for deletion
for j in range(start, end):
if j != i:
other_row = sorted_rows[j]
other_query = other_row[1].strip().lower()
if current_query.startswith(
other_query
) or other_query.startswith(current_query):
rows_to_be_deleted.append(other_row)

return rows_to_be_deleted

def delete_queries(client, rows):
query = """
ALTER TABLE default.search_queries
UPDATE is_duplicate = 1
WHERE id = '{}'
""".format(
str(rows[0][0])
)
for row in rows:
query += " OR id = '{}'".format(str(row[0]))

client.command(query)
def insert_duplicate_rows(client, rows):
if rows:
duplicate_rows = [list(row) for row in rows]
for row in duplicate_rows:
row[9] = 1 # Set is_duplicate to 1
client.insert(
"search_queries",
duplicate_rows,
column_names=[
"id",
"query",
"top_score",
"created_at",
"search_type",
"request_params",
"latency",
"results",
"query_vector",
"is_duplicate",
"query_rating",
"dataset_id",
],
)


def main():
dotenv.load_dotenv()

client = clickhouse_connect.get_client(dsn=os.getenv("CLICKHOUSE_DSN"))

try:

datasets = get_datasets(client)

last_collapsed: Optional[str] = None

for dataset in datasets:

dataset_id = dataset[0]

last_collapsed = get_dataset_last_collapsed(client, dataset_id)

print("Collapsing dataset ", dataset_id, "from ", last_collapsed)

num_deleted = 0
print("Collapsing dataset", dataset_id, "from", last_collapsed)

num_duplicates = 0
rows = get_search_queries(client, dataset_id, 5000, last_collapsed)

while len(rows) > 0:
# offset is timestamp
while rows:
last_collapsed = rows[-1][3]

to_be_deleted = collapse_queries(rows)
duplicates = collapse_queries(rows)
num_duplicates += len(duplicates)

num_deleted += len(to_be_deleted)

delete_queries(client, to_be_deleted)
insert_duplicate_rows(client, duplicates)

new_rows = get_search_queries(client, dataset_id, 5000, last_collapsed)
if len(new_rows) > 0 and new_rows[-1][0] == rows[-1][0]:
if new_rows and new_rows[-1][0] == rows[-1][0]:
break
rows = new_rows

if last_collapsed is not None:
if last_collapsed:
set_dataset_last_collapsed(client, dataset_id, last_collapsed)

print(f"Processed dataset {dataset_id}, deleted {num_deleted} rows")
print(f"Processed dataset {dataset_id}, marked {num_duplicates} duplicates")
print()

except Exception as e:
print(f"Error: {e}")

finally:
# Optionally, you can force a merge to ensure all duplicates are removed
client.command("OPTIMIZE TABLE default.search_queries FINAL")
client.command("OPTIMIZE TABLE default.last_collapsed_dataset FINAL")


main()
if __name__ == "__main__":
main()
5 changes: 3 additions & 2 deletions server/ch_migrations/1720668303_create_inital_tables/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ CREATE TABLE IF NOT EXISTS search_queries
results Array(String),
query_vector Array(Float32),
dataset_id UUID,
created_at DateTime
) ENGINE = MergeTree()
created_at DateTime,
is_duplicate UInt8 DEFAULT 0
) ENGINE = ReplacingMergeTree(is_duplicate)
ORDER BY (dataset_id, created_at, top_score, latency, id)
PARTITION BY
(toYYYYMM(created_at),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,5 @@ CREATE TABLE IF NOT EXISTS last_collapsed_dataset
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (dataset_id)
PARTITION BY (toYYYYMM(created_at), dataset_id)
TTL created_at + INTERVAL 30 DAY;

PARTITION BY (toYYYYMM(created_at), dataset_id);

0 comments on commit 5cdcd38

Please sign in to comment.