Skip to content

Commit

Permalink
Fixed a long standing issue that transaction would always reload the …
Browse files Browse the repository at this point in the history
…metadata file. Refactored the txn api and added a 'beforeUnlock' hook.
  • Loading branch information
Aklakan committed Nov 25, 2023
1 parent f62134a commit 87ad8fa
Show file tree
Hide file tree
Showing 11 changed files with 467 additions and 377 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.aksw.commons.txn.api.TxnApi;
import org.aksw.commons.txn.impl.PathDiffState;
import org.aksw.commons.txn.impl.PathState;
import org.aksw.commons.txn.impl.TxnHandler;
import org.aksw.commons.util.closeable.Disposable;
import org.aksw.commons.util.concurrent.ScheduleOnce;
import org.aksw.commons.util.lock.LockUtils;
Expand All @@ -58,11 +59,15 @@
import com.google.common.collect.TreeRangeMap;
import com.google.common.collect.TreeRangeSet;




/**
* A file-based slice implementation.
* The is one metadata file and one 'content' file for each page.
* The metadata file holds the information about which ranges of data have been loaded in the content files.
* Whenever a set of pages is updated, the metadata needs to be updated as well.
*
* @param <A> The collection type of what to store in the pages. Usually an an array such as Binding[] or byte[].
*/
// The outside only sees a buffer - but internally it has a structure that enables serializing the changed regions

public class SliceWithPagesSyncToDisk<A>
extends SliceBase<A>
implements SliceWithPages<A>
Expand All @@ -73,13 +78,14 @@ public class SliceWithPagesSyncToDisk<A>
// Storage layer for transactional saving of buffers
protected ObjectStore objectStore;

// The folder in the object store where to write the pages and metadata
protected org.aksw.commons.path.core.Path<String> objectStoreBasePath;

// Array abstraction; avoids having mainly used to abstract from byte[] and Object[] and consequently having to build
// separate cache implementations
// protected ArrayOps<A> arrayOps;


// Cache of loaded pages
protected AsyncClaimingCache<Long, BufferView<A>> pageCache;


Expand Down Expand Up @@ -109,13 +115,15 @@ public class SliceWithPagesSyncToDisk<A>

protected SliceMetaDataWithPages liveMetaData;


// Cached view of the most recent loaded ranges
protected RangeSet<Long> liveMetaDataLoadedRangesView = new RangeSetDelegate<Long>() {
/**
* Cached view of the most recent loaded ranges
* Changing the value of 'this.liveMetaData' also changes this view's delegate
*/
protected RangeSet<Long> liveMetaDataLoadedRangesView = new RangeSetDelegate<>() {
@Override
public RangeSet<Long> getDelegate() {
return liveMetaData.getLoadedRanges();
};

}
@Override
public String toString() {
return Objects.toString(getDelegate());
Expand Down Expand Up @@ -304,7 +312,9 @@ public RefFuture<BufferView<A>> getPageForPageId(long pageId) {
InternalBufferView bufferView = (InternalBufferView)buf;
bufferView.getBaseBuffer().reloadIfNeeded().whenComplete((b, t) -> {
if (t != null) {
logger.error("Reloading buffer failed", t);
if (logger.isErrorEnabled()) {
logger.error("Reloading buffer failed", t);
}
}
});
return buf;
Expand All @@ -317,49 +327,47 @@ public RangeSet<Long> getGaps(Range<Long> requestRange) {
return liveMetaData.getGaps(requestRange);
}


public boolean hasMetaDataChanged() {
String resourceName = "metadata.ser";
PathDiffState recencyStatus = objectStore.fetchRecencyStatus(objectStoreBasePath.resolve(resourceName));

boolean result = !recencyStatus.equals(baseMetaDataStatus);
logger.debug("Metadata changed: " + result + "; status now: " + recencyStatus + " - before: " + baseMetaDataStatus);
if (logger.isDebugEnabled()) {
logger.debug("Metadata changed: " + result + "; status now: " + recencyStatus + " - before: " + baseMetaDataStatus);
}
return result;
}


/** Returns the metadata generation */
public int lockAndSyncMetaData(ObjectStoreConnection conn, int fallbackPageSize) {


String resourceName = "metadata.ser";
ObjectResource res = conn.access(objectStoreBasePath.resolve(resourceName));


boolean hasMetaDataChanged = hasMetaDataChanged();

int result;

if (hasMetaDataChanged) {
logger.info("Metadata was externally modified on disk... attempting to acquire lock for reload...");
if (logger.isInfoEnabled()) {
logger.info("Metadata was externally modified on disk... attempting to acquire lock for reload...");
}

// Re-check recency status now that the resource is locked
// Objects.requireNonNull(newMetaData, "Deserialization of metadata yeld null");

// System.out.println("Acquired readWrite lock at: " + StackTraceUtils.toString(Thread.currentThread().getStackTrace()));


result = LockUtils.runWithLock(readWriteLock.writeLock(), () -> {
PathDiffState status = res.fetchRecencyStatus();

SliceMetaDataWithPages newMetaData = (SliceMetaDataWithPages)res.loadNewInstance();

logger.info("Lock for reload acquired");
if (logger.isInfoEnabled()) {
logger.info("Lock for reload acquired");
}
if (newMetaData != null) {
logger.info("Loaded metadata: " + newMetaData);
if (logger.isInfoEnabled()) {
logger.info("Loaded metadata: " + newMetaData);
}
baseMetaData = newMetaData;
} else {
logger.info("Created fresh slice metadata");
if (logger.isInfoEnabled()) {
logger.info("Created fresh slice metadata");
}
baseMetaData = new SliceMetaDataWithPagesImpl(fallbackPageSize);
// baseMetaDataStatus = null; // TODO Use a non-null placeholder value
}
Expand Down Expand Up @@ -500,12 +508,13 @@ public synchronized void sync() throws IOException {
});

// Incremented on detection of external changes
int nextGeneration = liveGeneration;
// int nextGeneration = liveGeneration;

String metadataFileName = "metadata.ser";
try (ObjectStoreConnection conn = objectStore.getConnection()) {
conn.begin(true);

ObjectResource res = conn.access(objectStoreBasePath.resolve("metadata.ser"));
ObjectResource res = conn.access(objectStoreBasePath.resolve(metadataFileName));

int generationNow = lockAndSyncMetaData(conn, pageSize);

Expand Down Expand Up @@ -543,17 +552,22 @@ public synchronized void sync() throws IOException {

if (!newBaseMetadata.equals(baseMetaData)) {
res.save(newBaseMetadata);

// TODO On commit we need to update the metadata status
}

// If the metadata changed on disk we need to reload it and check which pages need to be reloaded as well

// FIXME Update idsOfDirtyPages with changes from modified metadata

// Check whether the timestamp (generation) of the in memory copy of the meta data matches that on disk


idsOfDirtyPages = PageUtils.touchedPageIndices(syncChanges.getRanges().asRanges(), pageSize);

logger.info("Synchronizing " + idsOfDirtyPages.size() + " dirty pages");
if (logger.isInfoEnabled()) {
logger.info("Synchronizing " + idsOfDirtyPages.size() + " dirty pages");
}

for (long pageId : idsOfDirtyPages) {
String pageFileName = pageIdToFileName.apply(pageId);
Expand Down Expand Up @@ -605,9 +619,16 @@ public synchronized void sync() throws IOException {

}


conn.commit();

// Commit the transaction - but before unlocking update the timestamp of the meta data file
conn.commit(new TxnHandler() {
@Override
public void beforeUnlock(org.aksw.commons.path.core.Path<String> resKey, boolean isCommit) throws Exception {
String fn = resKey.getFileName().toString();
if (metadataFileName.equals(fn)) {
baseMetaDataStatus = res.fetchRecencyStatus();
}
}
});

// Update buffer and metadata to the materialized data
LockUtils.runWithLock(readWriteLock.writeLock(), () -> {
Expand Down Expand Up @@ -645,7 +666,9 @@ public synchronized void sync() throws IOException {
throw new RuntimeException(e);
}

logger.info("Synchronization of " + idsOfDirtyPages.size() + " dirty pages completed in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) / 1000.0f + " seconds");
if (logger.isInfoEnabled()) {
logger.info("Synchronization of " + idsOfDirtyPages.size() + " dirty pages completed in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) / 1000.0f + " seconds");
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
import org.aksw.commons.util.ref.RefFuture;

public interface ObjectStore
extends AutoCloseable
extends AutoCloseable
{
RefFuture<ObjectInfo> claim(Path<String> key);

// Get the recency status of a resource outside of any transaction
PathDiffState fetchRecencyStatus(Path<String> key);

ObjectStoreConnection getConnection();
/**
* Asynchronously attempt to claim to the given resource.
* @param key
* @return
*/
RefFuture<ObjectInfo> claim(Path<String> key);

// Get the recency status of a resource outside of any transaction
PathDiffState fetchRecencyStatus(Path<String> key);

ObjectStoreConnection getConnection();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@

import org.aksw.commons.path.core.Path;
import org.aksw.commons.txn.api.TxnApi;
import org.aksw.commons.txn.impl.TxnHandler;

public interface ObjectStoreConnection
extends TxnApi, AutoCloseable
extends TxnApi, AutoCloseable
{
ObjectResource access(Path<String> keySegments);
/**
* Declare access to the requested resource and lock it with the
* ObjectStoreConnection's active transaction mode (read or write).
*/
ObjectResource access(Path<String> keySegments);

void commit(TxnHandler handler);

// @Override
// void close();
// @Override
// void close();
}
Loading

0 comments on commit 87ad8fa

Please sign in to comment.