Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support doing a full sync without deleting the existing index #81

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ created only once, otherwise, the plugin instance will be created for each event

The progress is used to record the last sync position, such as binlog position for MySQL.

- `type`: `file` or `redis`, if set to file, another option `path` is required.
- `type`: `file` or `redis`, if set to file, the `path` option can be used to specify the path.
- `path`: the file path to store the progress, default is `progress.json`.
- `key`: the redis key to store the progress, default is `meilisync:progress`.
- `dsn`: the redis dsn, default is `redis://localhost:6379/0`.
Expand Down Expand Up @@ -204,7 +204,7 @@ The sync configuration, you can add multiple sync tasks.

- `table`: the database table name or collection name.
- `index`: the Meilisearch index name, if not set, it will use the table name.
- `full`: whether to do a full sync, default is `false`.
- `full`: whether to do a full sync, default is `false`. If the index already exists, the full sync won't take place.
- `fields`: the fields to sync, if not set, it will sync all fields. The key is table field name, the value is the
Meilisearch field name, if not set, it will use the table field name.
- `plugins`: the table level plugins, optional.
Expand Down
7 changes: 7 additions & 0 deletions meilisync/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ def refresh(
size: int = typer.Option(
10000, "-s", "--size", help="Size of data for each insert to be inserted into MeiliSearch"
),
keep_index: bool = typer.Option(
False,
"-d",
"--keep-index",
help="Flag to avoid deleting the existing index before doing a full sync.",
),
):
async def _():
settings = context.obj["settings"]
Expand All @@ -162,6 +168,7 @@ async def _():
count = await meili.refresh_data(
sync,
source.get_full_data(sync, size),
keep_index,
)
if count:
logger.info(
Expand Down
52 changes: 30 additions & 22 deletions meilisync/meili.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,30 @@ def __init__(
self.wait_for_task_timeout = wait_for_task_timeout

async def add_data(self, sync: Sync, data: list):
events = [Event(type=EventType.create, data=item) for item in data]
events = [Event(type=EventType.create, data=item, table=sync.table) for item in data]
return await self.handle_events_by_type(sync, events, EventType.create)

async def refresh_data(self, sync: Sync, data: AsyncGenerator):
async def refresh_data(self, sync: Sync, data: AsyncGenerator, keep_index: bool = False):
index = sync.index_name
pk = sync.pk
sync.index = index_name_tmp = f"{index}_tmp"
try:
await self.client.index(index_name_tmp).delete()
except MeilisearchApiError as e:
if e.code != "MeilisearchApiError.index_not_found":
raise
settings = await self.client.index(index).get_settings()
index_tmp = await self.client.create_index(index_name_tmp, primary_key=pk)
task = await index_tmp.update_settings(settings)
logger.info(f"Waiting for update tmp index {index_name_tmp} settings to complete...")
await self.client.wait_for_task(
task_id=task.task_uid, timeout_in_ms=self.wait_for_task_timeout
)
if not keep_index:
sync.index = index_name_tmp = f"{index}_tmp"
try:
await self.client.index(index_name_tmp).delete()
except MeilisearchApiError as e:
if e.code != "MeilisearchApiError.index_not_found":
raise
settings = await self.client.index(index).get_settings()
index_tmp = await self.client.create_index(index_name_tmp, primary_key=pk)
task = await index_tmp.update_settings(settings)
logger.info(f"Waiting for update tmp index {index_name_tmp} settings to complete...")
await self.client.wait_for_task(
task_id=task.task_uid, timeout_in_ms=self.wait_for_task_timeout
)
else:
logger.info("Not deleting index when refreshing data")
index_name_tmp = index

tasks = []
count = 0
async for items in data:
Expand All @@ -61,13 +66,16 @@ async def refresh_data(self, sync: Sync, data: AsyncGenerator):
]
logger.info(f"Waiting for insert tmp index {index_name_tmp} to complete...")
await asyncio.gather(*wait_tasks)
task = await self.client.swap_indexes([(index, index_name_tmp)])
logger.info(f"Waiting for swap index {index} to complete...")
await self.client.wait_for_task(
task_id=task.task_uid, timeout_in_ms=self.wait_for_task_timeout
)
await self.client.index(index_name_tmp).delete()
logger.success(f"Swap index {index} complete")

if not keep_index:
task = await self.client.swap_indexes([(index, index_name_tmp)])
logger.info(f"Waiting for swap index {index} to complete...")
await self.client.wait_for_task(
task_id=task.task_uid, timeout_in_ms=self.wait_for_task_timeout
)
await self.client.index(index_name_tmp).delete()
logger.success(f"Swap index {index} complete")

return count

async def get_count(self, index: str):
Expand Down