Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,7 @@ def store_list(namespace):
logger.error("Repository index is corrupted and must be repaired; skipping the pack check.")
objs_errors = index_errors + pack_errors
logger.info(
f"Checked {index_files} index files ({index_errors} errors) "
f"and {pack_files} packs ({pack_errors} errors)."
f"Checked {index_files} index files ({index_errors} errors) and {pack_files} packs ({pack_errors} errors)."
)
if objs_errors == 0:
logger.info(f"Finished {mode} repository check, no problems found.")
Expand Down Expand Up @@ -811,6 +810,70 @@ def delete(self, id):
raise self.ObjectNotFound(id, str(self._location))
logger.warning("ignoring deletion of %s in %s", bin_to_hex(id), bin_to_hex(entry.pack_id))

def compact_pack(self, pack_id, *, keep_ids: set, drop_ids: set):
Comment thread
mr-raj12 marked this conversation as resolved.
"""Rewrite pack <pack_id>, keeping <keep_ids> and dropping <drop_ids>, then delete the old pack.

keep_ids and drop_ids are sets of chunk ids that must together cover the whole pack (asserted:
their ranges tile it with no gap or overlap, and their intersection is empty). Kept objects are
copied into a new pack via store.defrag and repointed in the chunk index; dropped objects' index
entries are removed.

Returns the new pack_id, None if nothing is kept (pack dropped), or <pack_id> unchanged if the
kept objects reproduce the old pack (same sha256 name, nothing to delete).

Updates the in-memory chunk index only. The caller holds the exclusive lock and owns index
durability: invalidate the cached index before calling, write it back after, as compact does.
"""
self._lock_refresh()
pack_key = "packs/" + bin_to_hex(pack_id)

assert keep_ids & drop_ids == set(), "an id cannot appear in both keep_ids and drop_ids"

# collect every object's range, tagged with whether it is kept, ordered by offset.
located = [] # (obj_offset, obj_id, obj_size, keep)
for obj_id in keep_ids | drop_ids:
keep = obj_id in keep_ids
entry = self.chunks[obj_id]
assert entry.pack_id == pack_id, f"{bin_to_hex(obj_id)} is not in pack {bin_to_hex(pack_id)}"
located.append((entry.obj_offset, obj_id, entry.obj_size, keep))
located.sort()

# keep + drop must tile the whole pack; collect the objects to keep in the same pass.
kept = [] # (obj_offset, obj_id, obj_size), offset-ordered
covered = 0
for offset, obj_id, size, keep in located:
assert offset == covered, f"gap or overlap in pack {bin_to_hex(pack_id)} at offset {covered}"
covered += size
if keep:
kept.append((offset, obj_id, size))
assert covered == self.store.info(pack_key).size, f"pack {bin_to_hex(pack_id)} not fully covered"

for drop_id in drop_ids: # remove dropped objects from the index; their bytes are not copied forward
del self.chunks[drop_id]

if not kept: # nothing kept: drop the pack, no replacement
self.store_delete(pack_key)
return None

# copy kept objects into a new pack (named sha256 of its content)
sources = [(bin_to_hex(pack_id), offset, size) for offset, _, size in kept]
new_pack_id = hex_to_bin(self.store.defrag(sources, algorithm="sha256", namespace="packs"))
Comment thread
ThomasWaldmann marked this conversation as resolved.

# repoint kept objects at the new pack; new offset is the running sum of kept sizes
new_locations = []
offset = 0
for _, keep_id, size in kept:
new_locations.append((keep_id, new_pack_id, offset, size))
offset += size
self.chunks.update_pack_info(new_locations)

# delete the old pack last, after the new one is stored and indexed, so kept bytes are never the
# only copy. if every object was kept in order, defrag reproduced the pack (new_pack_id == pack_id)
# and deleting it would drop what we kept, so skip.
if new_pack_id != pack_id:
self.store_delete(pack_key)
return new_pack_id

def break_lock(self):
Lock(self.store).break_lock()

Expand Down
65 changes: 65 additions & 0 deletions src/borg/testsuite/repository_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,71 @@ def test_consistency(repo_fixtures, request):
assert pdchunk(repository.get(H(0))) == b"bar"


def build_one_pack(repository, objects):
with repository:
repository._pack_writer.max_count = len(objects) + 1 # prevent per-put flush; one pack on flush()
for chunk_id, chunk in objects:
repository.put(chunk_id, chunk)
repository.flush()


def test_compact_pack_copy_forward(repo_fixtures, request):
# Keep a subset of a multi-object pack: survivors must read back, the dropped object and its bytes gone.
chunk0 = fchunk(b"DATA0", chunk_id=H(0))
chunk1 = fchunk(b"DATA1", chunk_id=H(1))
chunk2 = fchunk(b"DATA2", chunk_id=H(2))
repository = get_repository_from_fixture(repo_fixtures, request)
build_one_pack(repository, [(H(0), chunk0), (H(1), chunk1), (H(2), chunk2)])
with repository:
old_pack_id = repository.chunks[H(0)].pack_id
assert repository.chunks[H(1)].pack_id == old_pack_id
assert repository.chunks[H(2)].pack_id == old_pack_id

new_pack_id = repository.compact_pack(old_pack_id, keep_ids={H(0), H(2)}, drop_ids={H(1)})

assert new_pack_id is not None and new_pack_id != old_pack_id
assert pdchunk(repository.get(H(0))) == b"DATA0"
assert pdchunk(repository.get(H(2))) == b"DATA2"
assert repository.get(H(1), raise_missing=False) is None # compact_pack removed its index entry
packs = {info.name: info.size for info in repository.store_list("packs")}
assert bin_to_hex(old_pack_id) not in packs
assert packs[bin_to_hex(new_pack_id)] == len(chunk0) + len(chunk2) # only the kept objects' bytes


def test_compact_pack_drops_whole_pack(repo_fixtures, request):
# Dropping every object removes the pack and clears its index entries.
chunk0 = fchunk(b"DATA0", chunk_id=H(0))
chunk1 = fchunk(b"DATA1", chunk_id=H(1))
repository = get_repository_from_fixture(repo_fixtures, request)
build_one_pack(repository, [(H(0), chunk0), (H(1), chunk1)])
with repository:
old_pack_id = repository.chunks[H(0)].pack_id

assert repository.compact_pack(old_pack_id, keep_ids=set(), drop_ids={H(0), H(1)}) is None

assert repository.get(H(0), raise_missing=False) is None
assert repository.get(H(1), raise_missing=False) is None
assert bin_to_hex(old_pack_id) not in [info.name for info in repository.store_list("packs")]


def test_compact_pack_keep_all_is_noop(repo_fixtures, request):
# Keeping every object reproduces the same pack: same sha256 name, old pack not deleted. Ids passed
# out of order must give the same result, since compact_pack sorts by offset.
chunk0 = fchunk(b"DATA0", chunk_id=H(0))
chunk1 = fchunk(b"DATA1", chunk_id=H(1))
repository = get_repository_from_fixture(repo_fixtures, request)
build_one_pack(repository, [(H(0), chunk0), (H(1), chunk1)])
with repository:
old_pack_id = repository.chunks[H(0)].pack_id

new_pack_id = repository.compact_pack(old_pack_id, keep_ids={H(1), H(0)}, drop_ids=set()) # out of order

assert new_pack_id == old_pack_id
assert pdchunk(repository.get(H(0))) == b"DATA0"
assert pdchunk(repository.get(H(1))) == b"DATA1"
assert bin_to_hex(old_pack_id) in [info.name for info in repository.store_list("packs")]


def test_list(repo_fixtures, request):
with get_repository_from_fixture(repo_fixtures, request) as repository:
for x in range(100):
Expand Down
Loading