From 87ad8faa22fa27a5785cc6f0e4b52e49e7019a9a Mon Sep 17 00:00:00 2001 From: Claus Stadler Date: Sat, 25 Nov 2023 11:43:35 +0100 Subject: [PATCH] Fixed a long standing issue that transaction would always reload the metadata file. Refactored the txn api and added a 'beforeUnlock' hook. --- .../io/slice/SliceWithPagesSyncToDisk.java | 93 ++++-- .../store/object/key/api/ObjectStore.java | 19 +- .../object/key/api/ObjectStoreConnection.java | 14 +- .../object/key/impl/ObjectStoreImpl.java | 42 +-- .../aksw/commons/rx/cache/range/SyncPool.java | 2 +- .../java/org/aksw/commons/txn/api/TxnMgr.java | 6 + .../org/aksw/commons/txn/impl/FileSync.java | 11 +- .../org/aksw/commons/txn/impl/TxnHandler.java | 311 +----------------- .../aksw/commons/txn/impl/TxnHandlerImpl.java | 44 +++ .../org/aksw/commons/txn/impl/TxnUtils.java | 290 ++++++++++++++++ .../commons/util/concurrent/ScheduleOnce.java | 12 +- 11 files changed, 467 insertions(+), 377 deletions(-) create mode 100644 aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnHandlerImpl.java create mode 100644 aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnUtils.java diff --git a/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/slice/SliceWithPagesSyncToDisk.java b/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/slice/SliceWithPagesSyncToDisk.java index bfe423cd..c81ef621 100644 --- a/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/slice/SliceWithPagesSyncToDisk.java +++ b/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/slice/SliceWithPagesSyncToDisk.java @@ -41,6 +41,7 @@ import org.aksw.commons.txn.api.TxnApi; import org.aksw.commons.txn.impl.PathDiffState; import org.aksw.commons.txn.impl.PathState; +import org.aksw.commons.txn.impl.TxnHandler; import org.aksw.commons.util.closeable.Disposable; import org.aksw.commons.util.concurrent.ScheduleOnce; import org.aksw.commons.util.lock.LockUtils; @@ -58,11 +59,15 @@ import com.google.common.collect.TreeRangeMap; import com.google.common.collect.TreeRangeSet; - - - +/** + * A file-based slice implementation. + * The is one metadata file and one 'content' file for each page. + * The metadata file holds the information about which ranges of data have been loaded in the content files. + * Whenever a set of pages is updated, the metadata needs to be updated as well. + * + * @param The collection type of what to store in the pages. Usually an an array such as Binding[] or byte[]. + */ // The outside only sees a buffer - but internally it has a structure that enables serializing the changed regions - public class SliceWithPagesSyncToDisk extends SliceBase implements SliceWithPages @@ -73,13 +78,14 @@ public class SliceWithPagesSyncToDisk // Storage layer for transactional saving of buffers protected ObjectStore objectStore; + // The folder in the object store where to write the pages and metadata protected org.aksw.commons.path.core.Path objectStoreBasePath; // Array abstraction; avoids having mainly used to abstract from byte[] and Object[] and consequently having to build // separate cache implementations // protected ArrayOps arrayOps; - + // Cache of loaded pages protected AsyncClaimingCache> pageCache; @@ -109,13 +115,15 @@ public class SliceWithPagesSyncToDisk protected SliceMetaDataWithPages liveMetaData; - - // Cached view of the most recent loaded ranges - protected RangeSet liveMetaDataLoadedRangesView = new RangeSetDelegate() { + /** + * Cached view of the most recent loaded ranges + * Changing the value of 'this.liveMetaData' also changes this view's delegate + */ + protected RangeSet liveMetaDataLoadedRangesView = new RangeSetDelegate<>() { + @Override public RangeSet getDelegate() { return liveMetaData.getLoadedRanges(); - }; - + } @Override public String toString() { return Objects.toString(getDelegate()); @@ -304,7 +312,9 @@ public RefFuture> getPageForPageId(long pageId) { InternalBufferView bufferView = (InternalBufferView)buf; bufferView.getBaseBuffer().reloadIfNeeded().whenComplete((b, t) -> { if (t != null) { - logger.error("Reloading buffer failed", t); + if (logger.isErrorEnabled()) { + logger.error("Reloading buffer failed", t); + } } }); return buf; @@ -317,49 +327,47 @@ public RangeSet getGaps(Range requestRange) { return liveMetaData.getGaps(requestRange); } - public boolean hasMetaDataChanged() { String resourceName = "metadata.ser"; PathDiffState recencyStatus = objectStore.fetchRecencyStatus(objectStoreBasePath.resolve(resourceName)); boolean result = !recencyStatus.equals(baseMetaDataStatus); - logger.debug("Metadata changed: " + result + "; status now: " + recencyStatus + " - before: " + baseMetaDataStatus); + if (logger.isDebugEnabled()) { + logger.debug("Metadata changed: " + result + "; status now: " + recencyStatus + " - before: " + baseMetaDataStatus); + } return result; } - /** Returns the metadata generation */ public int lockAndSyncMetaData(ObjectStoreConnection conn, int fallbackPageSize) { - - String resourceName = "metadata.ser"; ObjectResource res = conn.access(objectStoreBasePath.resolve(resourceName)); - - boolean hasMetaDataChanged = hasMetaDataChanged(); int result; - if (hasMetaDataChanged) { - logger.info("Metadata was externally modified on disk... attempting to acquire lock for reload..."); + if (logger.isInfoEnabled()) { + logger.info("Metadata was externally modified on disk... attempting to acquire lock for reload..."); + } // Re-check recency status now that the resource is locked // Objects.requireNonNull(newMetaData, "Deserialization of metadata yeld null"); - // System.out.println("Acquired readWrite lock at: " + StackTraceUtils.toString(Thread.currentThread().getStackTrace())); - - result = LockUtils.runWithLock(readWriteLock.writeLock(), () -> { PathDiffState status = res.fetchRecencyStatus(); - SliceMetaDataWithPages newMetaData = (SliceMetaDataWithPages)res.loadNewInstance(); - - logger.info("Lock for reload acquired"); + if (logger.isInfoEnabled()) { + logger.info("Lock for reload acquired"); + } if (newMetaData != null) { - logger.info("Loaded metadata: " + newMetaData); + if (logger.isInfoEnabled()) { + logger.info("Loaded metadata: " + newMetaData); + } baseMetaData = newMetaData; } else { - logger.info("Created fresh slice metadata"); + if (logger.isInfoEnabled()) { + logger.info("Created fresh slice metadata"); + } baseMetaData = new SliceMetaDataWithPagesImpl(fallbackPageSize); // baseMetaDataStatus = null; // TODO Use a non-null placeholder value } @@ -500,12 +508,13 @@ public synchronized void sync() throws IOException { }); // Incremented on detection of external changes - int nextGeneration = liveGeneration; + // int nextGeneration = liveGeneration; + String metadataFileName = "metadata.ser"; try (ObjectStoreConnection conn = objectStore.getConnection()) { conn.begin(true); - ObjectResource res = conn.access(objectStoreBasePath.resolve("metadata.ser")); + ObjectResource res = conn.access(objectStoreBasePath.resolve(metadataFileName)); int generationNow = lockAndSyncMetaData(conn, pageSize); @@ -543,9 +552,12 @@ public synchronized void sync() throws IOException { if (!newBaseMetadata.equals(baseMetaData)) { res.save(newBaseMetadata); + + // TODO On commit we need to update the metadata status } // If the metadata changed on disk we need to reload it and check which pages need to be reloaded as well + // FIXME Update idsOfDirtyPages with changes from modified metadata // Check whether the timestamp (generation) of the in memory copy of the meta data matches that on disk @@ -553,7 +565,9 @@ public synchronized void sync() throws IOException { idsOfDirtyPages = PageUtils.touchedPageIndices(syncChanges.getRanges().asRanges(), pageSize); - logger.info("Synchronizing " + idsOfDirtyPages.size() + " dirty pages"); + if (logger.isInfoEnabled()) { + logger.info("Synchronizing " + idsOfDirtyPages.size() + " dirty pages"); + } for (long pageId : idsOfDirtyPages) { String pageFileName = pageIdToFileName.apply(pageId); @@ -605,9 +619,16 @@ public synchronized void sync() throws IOException { } - - conn.commit(); - + // Commit the transaction - but before unlocking update the timestamp of the meta data file + conn.commit(new TxnHandler() { + @Override + public void beforeUnlock(org.aksw.commons.path.core.Path resKey, boolean isCommit) throws Exception { + String fn = resKey.getFileName().toString(); + if (metadataFileName.equals(fn)) { + baseMetaDataStatus = res.fetchRecencyStatus(); + } + } + }); // Update buffer and metadata to the materialized data LockUtils.runWithLock(readWriteLock.writeLock(), () -> { @@ -645,7 +666,9 @@ public synchronized void sync() throws IOException { throw new RuntimeException(e); } - logger.info("Synchronization of " + idsOfDirtyPages.size() + " dirty pages completed in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) / 1000.0f + " seconds"); + if (logger.isInfoEnabled()) { + logger.info("Synchronization of " + idsOfDirtyPages.size() + " dirty pages completed in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) / 1000.0f + " seconds"); + } } diff --git a/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/api/ObjectStore.java b/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/api/ObjectStore.java index 3e136453..ea83f9d0 100644 --- a/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/api/ObjectStore.java +++ b/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/api/ObjectStore.java @@ -6,12 +6,17 @@ import org.aksw.commons.util.ref.RefFuture; public interface ObjectStore - extends AutoCloseable + extends AutoCloseable { - RefFuture claim(Path key); - - // Get the recency status of a resource outside of any transaction - PathDiffState fetchRecencyStatus(Path key); - - ObjectStoreConnection getConnection(); + /** + * Asynchronously attempt to claim to the given resource. + * @param key + * @return + */ + RefFuture claim(Path key); + + // Get the recency status of a resource outside of any transaction + PathDiffState fetchRecencyStatus(Path key); + + ObjectStoreConnection getConnection(); } diff --git a/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/api/ObjectStoreConnection.java b/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/api/ObjectStoreConnection.java index 61195816..505a699f 100644 --- a/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/api/ObjectStoreConnection.java +++ b/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/api/ObjectStoreConnection.java @@ -3,13 +3,19 @@ import org.aksw.commons.path.core.Path; import org.aksw.commons.txn.api.TxnApi; +import org.aksw.commons.txn.impl.TxnHandler; public interface ObjectStoreConnection - extends TxnApi, AutoCloseable + extends TxnApi, AutoCloseable { - ObjectResource access(Path keySegments); + /** + * Declare access to the requested resource and lock it with the + * ObjectStoreConnection's active transaction mode (read or write). + */ + ObjectResource access(Path keySegments); + void commit(TxnHandler handler); - // @Override - // void close(); + // @Override + // void close(); } diff --git a/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/impl/ObjectStoreImpl.java b/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/impl/ObjectStoreImpl.java index 2538e7b5..4ec30c69 100644 --- a/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/impl/ObjectStoreImpl.java +++ b/aksw-commons-objectstore/src/main/java/org/aksw/commons/store/object/key/impl/ObjectStoreImpl.java @@ -19,9 +19,10 @@ import org.aksw.commons.txn.impl.FileSync; import org.aksw.commons.txn.impl.FileSyncImpl; import org.aksw.commons.txn.impl.PathDiffState; +import org.aksw.commons.txn.impl.TxnHandlerImpl; import org.aksw.commons.txn.impl.TxnHandler; import org.aksw.commons.txn.impl.TxnMgrImpl; -import org.aksw.commons.util.array.Array; +import org.aksw.commons.txn.impl.TxnUtils; import org.aksw.commons.util.ref.RefFuture; import org.checkerframework.checker.nullness.qual.Nullable; @@ -32,7 +33,7 @@ public class ObjectStoreImpl implements ObjectStore { protected TxnMgr txnMgr; - protected TxnHandler txnHandler; + protected TxnHandlerImpl txnHandler; protected ObjectSerializer objectSerializer; // The content cache can save object state to disk @@ -42,18 +43,13 @@ public class ObjectStoreImpl // Read transactions must not access entries which may be modified by a write transaction (MRSW locking) protected AsyncClaimingCache, ObjectInfo> contentCache; - // Accessors provide a thread-safe api to read metadata about a resource and load the content // Accessors should be generally cheap to create but they can hold certain state that should only exist once // to avoid redundant checks or inconsistent data (e.g. when content was loaded) so it makes sense to manage // them in a claiming cache protected AsyncClaimingCache, ObjectResource> accessorCache; // accessorCache - - - - - public ObjectStoreImpl(TxnMgr txnMgr, TxnHandler txnHandler, ObjectSerializer objectSerializer, + public ObjectStoreImpl(TxnMgr txnMgr, TxnHandlerImpl txnHandler, ObjectSerializer objectSerializer, AsyncClaimingCache, ObjectInfo> contentCache, AsyncClaimingCache, ObjectResource> accessorCache) { super(); @@ -64,7 +60,6 @@ public ObjectStoreImpl(TxnMgr txnMgr, TxnHandler txnHandler, ObjectSerializer ob this.accessorCache = accessorCache; } - @Override public PathDiffState fetchRecencyStatus(org.aksw.commons.path.core.Path key) { Path path = PathUtils.resolve(txnMgr.getRootPath().resolve(txnMgr.getResRepo().getRootPath()), key.getSegments()); @@ -76,7 +71,7 @@ public PathDiffState fetchRecencyStatus(org.aksw.commons.path.core.Path public static ObjectStore create(Path rootPath, ObjectSerializer objectSerializer) { TxnMgr txnMgr = TxnMgrImpl.createSimple(rootPath); - TxnHandler txnHandler = new TxnHandler(txnMgr); + TxnHandlerImpl txnHandler = new TxnHandlerImpl(txnMgr); try { txnHandler.cleanupStaleTxns(); @@ -168,7 +163,7 @@ public static ObjectStore create(Path rootPath, ObjectSerializer objectSerialize return new ObjectStoreImpl(txnMgr, txnHandler, objectSerializer, contentCache, null); } - protected static void save(TxnMgr txnMgr, ObjectSerializer objectSerializer, TxnHandler txnHandler, org.aksw.commons.path.core.Path key, ObjectInfo v) throws IOException { + protected static void save(TxnMgr txnMgr, ObjectSerializer objectSerializer, TxnHandlerImpl txnHandler, org.aksw.commons.path.core.Path key, ObjectInfo v) throws IOException { Txn txn; txn = txnMgr.newTxn(true, true); TxnResourceApi api = txn.getResourceApi(key); @@ -184,24 +179,20 @@ protected static void save(TxnMgr txnMgr, ObjectSerializer objectSerializer, Txn txnHandler.commit(txn); } - @Override public RefFuture claim(org.aksw.commons.path.core.Path key) { return contentCache.claim(key); } - @Override public ObjectStoreConnection getConnection() { return new ObjectStoreConnectionImpl(); } - @Override public void close() throws Exception { } - class ObjectStoreConnectionImpl implements ObjectStoreConnection { @@ -227,6 +218,14 @@ public void commit() { txn = null; } + @Override + public void commit(TxnHandler customHandler) { + Objects.requireNonNull(txn, "Cannot commit because there is no active transaction; Perhaps missing call to .begin()?"); + // txnHandler.commit(txn); + TxnUtils.commit(txn, customHandler); + txn = null; + } + @Override public void abort() { Objects.requireNonNull(txn, "Cannot abort because there is no active transaction; Perhaps missing call to .begin()?"); @@ -241,10 +240,9 @@ public ObjectResource access(org.aksw.commons.path.core.Path keySegments api.declareAccess(); api.lock(txn.isWrite()); - return new Kor(api); + return new ObjectResourceImpl(api); } - @Override public void close() throws Exception { if (txn != null) { @@ -253,21 +251,18 @@ public void close() throws Exception { } } - - class Kor + class ObjectResourceImpl implements ObjectResource { protected TxnResourceApi res; - public Kor(TxnResourceApi res) { + public ObjectResourceImpl(TxnResourceApi res) { super(); this.res = res; } @Override public void close() throws Exception { - // TODO Auto-generated method stub - } @Override @@ -318,9 +313,7 @@ public void save(Object obj) { throw new RuntimeException(e); } // TODO Auto-generated method stub - } - } // class KorOld @@ -512,7 +505,6 @@ public void save(Object obj) { // // } // } - } } diff --git a/aksw-commons-rx/src/test/java/org/aksw/commons/rx/cache/range/SyncPool.java b/aksw-commons-rx/src/test/java/org/aksw/commons/rx/cache/range/SyncPool.java index 47eec983..a699d3e5 100644 --- a/aksw-commons-rx/src/test/java/org/aksw/commons/rx/cache/range/SyncPool.java +++ b/aksw-commons-rx/src/test/java/org/aksw/commons/rx/cache/range/SyncPool.java @@ -16,7 +16,7 @@ import org.aksw.commons.rx.lookup.ListPaginatorFromList; import org.aksw.commons.txn.api.Txn; import org.aksw.commons.txn.api.TxnResourceApi; -import org.aksw.commons.txn.impl.TxnHandler; +import org.aksw.commons.txn.impl.TxnHandlerImpl; import org.aksw.commons.txn.impl.TxnMgrImpl; import org.aksw.commons.util.lock.LockUtils; import org.junit.Test; diff --git a/aksw-commons-txn/src/main/java/org/aksw/commons/txn/api/TxnMgr.java b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/api/TxnMgr.java index abd035c3..fbfa243f 100644 --- a/aksw-commons-txn/src/main/java/org/aksw/commons/txn/api/TxnMgr.java +++ b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/api/TxnMgr.java @@ -40,5 +40,11 @@ default Txn newTxn (boolean useJournal, boolean isWrite) throws IOException { */ void deleteResources() throws IOException; + /** The time interval between two heartbeats for when + * transaction metadata in the backend (e.g. a file in the filesystem) is updated to indicate that a transaction's + * process is still running. + * Conversely, exceeding the heartbeat duration (with a little margin) indicates that the process managing a transaction + * must have stopped or is terminated and the transaction can be rolled back. + */ TemporalAmount getHeartbeatDuration(); } diff --git a/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/FileSync.java b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/FileSync.java index eb976e16..2c473238 100644 --- a/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/FileSync.java +++ b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/FileSync.java @@ -2,6 +2,13 @@ import java.nio.file.Path; +/** + * Interface for tracking changes to a file. + * Any change results in a copy of the original file; conversely, changes are NOT tracked on the level of byte ranges. + * + * Upon syncing, the original (old) file is replaced with the new one. This operation should use atomic move if supported. + * + */ public interface FileSync extends ContentSync { @@ -10,10 +17,10 @@ public interface FileSync /** Get the path to the file that contains the original content */ Path getOldContentPath(); - + /** Get the path to file with the current content - can be the original file or the temp file */ Path getCurrentPath(); - + /** Get the file that represents the new content */ Path getNewContentTmpFile(); } diff --git a/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnHandler.java b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnHandler.java index d772f6cc..1c854c7e 100644 --- a/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnHandler.java +++ b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnHandler.java @@ -1,307 +1,18 @@ package org.aksw.commons.txn.impl; -import java.io.IOException; -import java.nio.file.Path; -import java.util.Iterator; -import java.util.stream.Stream; - -import org.aksw.commons.io.util.FileUtils; -import org.aksw.commons.txn.api.Txn; -import org.aksw.commons.txn.api.TxnMgr; -import org.aksw.commons.txn.api.TxnResourceApi; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.aksw.commons.path.core.Path; /** - * Skeleton for implementing commit / rollback actions. Provides callbacks - * that allow for syncing in-memory copies. - * - * @author raven + * Callbacks for reacting to events during the life cycle management of transactions. * + * The most relevant callback is {@link #beforeUnlock(Path, boolean)}: At this point + * resources have been written and are in their final location but the lock is still held. + * This allows one to e.g. safely read the last modified timestamp of a just committed file + * before another transaction can modify it again. */ -public class TxnHandler { - - private static final Logger logger = LoggerFactory.getLogger(TxnHandler.class); - - protected TxnMgr txnMgr; - - - public TxnHandler(TxnMgr txnMgr) { - super(); - this.txnMgr = txnMgr; - } - - protected void beforePreCommit(org.aksw.commons.path.core.Path resKey) throws Exception { - - } - - protected void afterPreCommit(org.aksw.commons.path.core.Path resKey) throws Exception { - - } - - protected void beforeUnlock(org.aksw.commons.path.core.Path resKey, boolean isCommit) throws Exception { - - } - - protected void end() { - - } - - public void cleanupStaleTxns() throws IOException { - logger.info("Checking existing txns..."); - try (Stream stream = txnMgr.streamTxns()) { - stream.forEach(txn -> { - try { - // if (txn.isStale()) { - if (txn.claim()) { - rollbackOrEnd(txn); - } - } catch (Exception e) { - logger.warn("Failed to process txn", e); - } - }); - } - } - - - public void commit(Txn txn) { - try { - // TODO Non-write transactions can probably skip the sync block - or? - try (Stream> stream = txn.streamAccessedResourcePaths()) { - Iterator> it = stream.iterator(); - while (it.hasNext()) { - org.aksw.commons.path.core.Path relPath = it.next(); - // Path relPath = txnMgr.getResRepo().getRelPath(res); - - TxnResourceApi api = txn.getResourceApi(relPath); - if (api.getTxnResourceLock().ownsWriteLock()) { - logger.debug("Syncing: " + relPath); - // If we own a write lock and the state is dirty then sync - // If there are any in memory changes then write them out - - beforePreCommit(relPath); - // Precommit: Copy any new data files to their final location (but keep backups) - ContentSync fs = api.getFileSync(); - fs.preCommit(); - - afterPreCommit(relPath); - // Update the in memory cache - } - } - } - // Once all modified graphs are written out - // add the statement that the commit action can now be run - txn.addCommit(); - - applyJournal(txn); - } catch (Exception e) { - try { - if (txn.isCommit()) { - throw new RuntimeException("Failed to finalize commit after pre-commit", e); - } else { - txn.addRollback(); - } - } catch (Exception e2) { - e2.addSuppressed(e); - throw new RuntimeException(e2); - } - - try { - applyJournal(txn); - } catch (Exception e2) { - e2.addSuppressed(e); - throw new RuntimeException(e2); - } - - throw new RuntimeException(e); - } finally { - end(); - } - } - - - /** - * - * @param txn - * @param resourceAction (resourceKey, isCommit) An action to run on a resource after changes were rolled back or committed - - * but before the resource is unlocked. Typically used to synchronize an in-memory cache. - */ - @Deprecated // Old version does not separate finalization and unlocking - // The advantage of the old version is that resources are unlocked earlier which could - // reduce latency - public void applyJournalOld(Txn txn) { //LoadingCache, SyncedDataset> syncCache) { - TxnMgr txnMgr = txn.getTxnMgr(); - // ResourceRepository resRepo = txnMgr.getResRepo(); - Path resRepoRootPath = txnMgr.getRootPath(); - - boolean isCommit; - try { - isCommit = txn.isCommit() && !txn.isRollback(); - } catch (IOException e1) { - throw new RuntimeException(e1); - } - - try { - - // Run the finalization actions - // As these actions remove undo information - // there is no turning back anymore - if (isCommit) { - txn.addFinalize(); - } - - // TODO Stream the relPaths rather than the string resource names? - try (Stream> stream = txn.streamAccessedResourcePaths()) { - Iterator> it = stream.iterator(); - while (it.hasNext()) { - org.aksw.commons.path.core.Path res = it.next(); - logger.debug("Finalizing and unlocking: " + res); - TxnResourceApi api = txn.getResourceApi(res); - - org.aksw.commons.path.core.Path resourceKey = api.getResourceKey(); - - Path targetFile = api.getFileSync().getTargetFile(); - if (isCommit) { - api.finalizeCommit(); - } else { - api.rollback(); - } - - // Clean up empty paths - FileUtils.deleteEmptyFolders(targetFile.getParent(), resRepoRootPath, true); - - beforeUnlock(resourceKey, isCommit); -// SyncedDataset synced = syncCache.getIfPresent(Array.wrap(resourceKey)); -// if (synced != null) { -// if (synced.isDirty()) { -// if (isCommit) { -// synced.getDiff().materialize(); -// } else { -// synced.getDiff().clearChanges(); -// } -// synced.updateState(); -// } -// } - - api.unlock(); - api.undeclareAccess(); - } - } - - txn.cleanUpTxn(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void applyJournal(Txn txn) { - sync(txn); - unlock(txn); - } - - - /** Finalize all pending changes. All files of the transaction will be in their final state. - */ - public void sync(Txn txn) { //LoadingCache, SyncedDataset> syncCache) { - TxnMgr txnMgr = txn.getTxnMgr(); - // ResourceRepository resRepo = txnMgr.getResRepo(); - Path resRepoRootPath = txnMgr.getRootPath(); - - boolean isCommit; - try { - isCommit = txn.isCommit() && !txn.isRollback(); - } catch (IOException e1) { - throw new RuntimeException(e1); - } - - try { - - // Run the finalization actions - // As these actions remove undo information - // there is no turning back anymore - if (isCommit) { - txn.addFinalize(); - } - - // TODO Stream the relPaths rather than the string resource names? - try (Stream> stream = txn.streamAccessedResourcePaths()) { - Iterator> it = stream.iterator(); - while (it.hasNext()) { - org.aksw.commons.path.core.Path res = it.next(); - logger.debug("Finalizing: " + res); - TxnResourceApi api = txn.getResourceApi(res); - - org.aksw.commons.path.core.Path resourceKey = api.getResourceKey(); - - Path targetFile = api.getFileSync().getTargetFile(); - if (isCommit) { - api.finalizeCommit(); - } else { - api.rollback(); - } - - // Clean up empty paths - FileUtils.deleteEmptyFolders(targetFile.getParent(), resRepoRootPath, true); - } - } - - txn.cleanUpTxn(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void unlock(Txn txn) { //LoadingCache, SyncedDataset> syncCache) { - TxnMgr txnMgr = txn.getTxnMgr(); - - boolean isCommit; - try { - isCommit = txn.isCommit() && !txn.isRollback(); - } catch (IOException e1) { - throw new RuntimeException(e1); - } - - try { - // TODO Stream the relPaths rather than the string resource names? - try (Stream> stream = txn.streamAccessedResourcePaths()) { - Iterator> it = stream.iterator(); - while (it.hasNext()) { - org.aksw.commons.path.core.Path res = it.next(); - logger.debug("Unlocking: " + res); - TxnResourceApi api = txn.getResourceApi(res); - - org.aksw.commons.path.core.Path resourceKey = api.getResourceKey(); - - beforeUnlock(resourceKey, isCommit); - api.unlock(); - api.undeclareAccess(); - } - } - - txn.cleanUpTxn(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - - public void abort(Txn txn) { - try { - txn.addRollback(); - applyJournal(txn); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - end(); - } - } - - public void rollbackOrEnd(Txn txn) throws IOException { - logger.info("Detected stale txn; applying rollback: " + txn.getId()); - if (!txn.isCommit()) { - txn.addRollback(); - } - applyJournal(txn); - } - +public interface TxnHandler { + default void beforePreCommit(Path resKey) throws Exception { }; + default void afterPreCommit(Path resKey) throws Exception { }; + default void beforeUnlock(Path resKey, boolean isCommit) throws Exception { }; + default void end() { }; } diff --git a/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnHandlerImpl.java b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnHandlerImpl.java new file mode 100644 index 00000000..d7a08eb3 --- /dev/null +++ b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnHandlerImpl.java @@ -0,0 +1,44 @@ +package org.aksw.commons.txn.impl; + +import java.io.IOException; + +import org.aksw.commons.txn.api.Txn; +import org.aksw.commons.txn.api.TxnMgr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Skeleton for implementing commit / rollback actions. Provides callbacks + * that allow for syncing in-memory copies. + * + * @author raven + * + */ +public class TxnHandlerImpl + implements TxnHandler +{ + static final Logger logger = LoggerFactory.getLogger(TxnHandlerImpl.class); + + protected TxnMgr txnMgr; + + public TxnHandlerImpl(TxnMgr txnMgr) { + super(); + this.txnMgr = txnMgr; + } + + public void cleanupStaleTxns() throws IOException { + TxnUtils.cleanupStaleTxns(txnMgr, this); + } + + public void commit(Txn txn) { + TxnUtils.commit(txn, this); + } + + public void abort(Txn txn) { + TxnUtils.abort(txn, this); + } + + public void rollbackOrEnd(Txn txn) throws IOException { + TxnUtils.rollbackOrEnd(txn, this); + } +} diff --git a/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnUtils.java b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnUtils.java new file mode 100644 index 00000000..60bf35c8 --- /dev/null +++ b/aksw-commons-txn/src/main/java/org/aksw/commons/txn/impl/TxnUtils.java @@ -0,0 +1,290 @@ +package org.aksw.commons.txn.impl; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.stream.Stream; + +import org.aksw.commons.io.util.FileUtils; +import org.aksw.commons.txn.api.Txn; +import org.aksw.commons.txn.api.TxnMgr; +import org.aksw.commons.txn.api.TxnResourceApi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Static methods for running commits and rollbacks on the {@link Txn} API. + */ +public class TxnUtils { + private static final Logger logger = LoggerFactory.getLogger(TxnUtils.class); + + public static void cleanupStaleTxns(TxnMgr txnMgr, TxnHandler handler) throws IOException { + if (logger.isInfoEnabled()) { + logger.info("Checking existing txns..."); + } + try (Stream stream = txnMgr.streamTxns()) { + stream.forEach(txn -> { + try { + // if (txn.isStale()) { + if (txn.claim()) { + TxnUtils.rollbackOrEnd(txn, handler); + } + } catch (Exception e) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to process txn", e); + } + } + }); + } + } + + public static void precommit(Txn txn, TxnHandler handler) throws Exception, IOException { + // TODO Non-write transactions can probably skip the sync block - or? + try (Stream> stream = txn.streamAccessedResourcePaths()) { + Iterator> it = stream.iterator(); + while (it.hasNext()) { + org.aksw.commons.path.core.Path relPath = it.next(); + // Path relPath = txnMgr.getResRepo().getRelPath(res); + + TxnResourceApi api = txn.getResourceApi(relPath); + if (api.getTxnResourceLock().ownsWriteLock()) { + if (logger.isDebugEnabled()) { + logger.debug("Syncing: " + relPath); + } + // If we own a write lock and the state is dirty then sync + // If there are any in memory changes then write them out + + handler.beforePreCommit(relPath); + // Precommit: Copy any new data files to their final location (but keep backups) + ContentSync fs = api.getFileSync(); + fs.preCommit(); + + handler.afterPreCommit(relPath); + // Update the in memory cache + } + } + } + } + + public static void commit(Txn txn, TxnHandler handler) { + try { + precommit(txn, handler); + // Once all modified graphs are written out + // add the statement that the commit action can now be run + txn.addCommit(); + + TxnUtils.applyJournal(txn, handler); + } catch (Exception e) { + try { + if (txn.isCommit()) { + throw new RuntimeException("Failed to finalize commit after pre-commit", e); + } else { + txn.addRollback(); + } + } catch (Exception e2) { + e2.addSuppressed(e); + throw new RuntimeException(e2); + } + + try { + TxnUtils.applyJournal(txn, handler); + } catch (Exception e2) { + e2.addSuppressed(e); + throw new RuntimeException(e2); + } + + throw new RuntimeException(e); + } finally { + handler.end(); + } + } + + /** + * + * @param txn + * @param resourceAction (resourceKey, isCommit) An action to run on a resource after changes were rolled back or committed - + * but before the resource is unlocked. Typically used to synchronize an in-memory cache. + */ + @Deprecated // Old version does not separate finalization and unlocking + // The advantage of the old version is that resources are unlocked earlier which could + // reduce latency + public static void applyJournalOld(Txn txn, TxnHandler handler) { //LoadingCache, SyncedDataset> syncCache) { + TxnMgr txnMgr = txn.getTxnMgr(); + // ResourceRepository resRepo = txnMgr.getResRepo(); + Path resRepoRootPath = txnMgr.getRootPath(); + + boolean isCommit; + try { + isCommit = txn.isCommit() && !txn.isRollback(); + } catch (IOException e1) { + throw new RuntimeException(e1); + } + + try { + + // Run the finalization actions + // As these actions remove undo information + // there is no turning back anymore + if (isCommit) { + txn.addFinalize(); + } + + // TODO Stream the relPaths rather than the string resource names? + try (Stream> stream = txn.streamAccessedResourcePaths()) { + Iterator> it = stream.iterator(); + while (it.hasNext()) { + org.aksw.commons.path.core.Path res = it.next(); + if (logger.isDebugEnabled()) { + logger.debug("Finalizing and unlocking: " + res); + } + TxnResourceApi api = txn.getResourceApi(res); + + org.aksw.commons.path.core.Path resourceKey = api.getResourceKey(); + + Path targetFile = api.getFileSync().getTargetFile(); + if (isCommit) { + api.finalizeCommit(); + } else { + api.rollback(); + } + + // Clean up empty paths + FileUtils.deleteEmptyFolders(targetFile.getParent(), resRepoRootPath, true); + + handler.beforeUnlock(resourceKey, isCommit); + // SyncedDataset synced = syncCache.getIfPresent(Array.wrap(resourceKey)); + // if (synced != null) { + // if (synced.isDirty()) { + // if (isCommit) { + // synced.getDiff().materialize(); + // } else { + // synced.getDiff().clearChanges(); + // } + // synced.updateState(); + // } + // } + + api.unlock(); + api.undeclareAccess(); + } + } + + txn.cleanUpTxn(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** */ + public static void applyJournal(Txn txn, TxnHandler handler) { + TxnUtils.finalizeCommit(txn); + TxnUtils.unlock(txn, handler); + } + + /** Finalize all pending changes. All files of the transaction will be in their final state. + */ + public static void finalizeCommit(Txn txn) { //LoadingCache, SyncedDataset> syncCache) { + TxnMgr txnMgr = txn.getTxnMgr(); + // ResourceRepository resRepo = txnMgr.getResRepo(); + Path resRepoRootPath = txnMgr.getRootPath(); + + boolean isCommit; + try { + isCommit = txn.isCommit() && !txn.isRollback(); + } catch (IOException e1) { + throw new RuntimeException(e1); + } + + try { + + // Run the finalization actions + // As these actions remove undo information + // there is no turning back anymore + if (isCommit) { + txn.addFinalize(); + } + + // TODO Stream the relPaths rather than the string resource names? + try (Stream> stream = txn.streamAccessedResourcePaths()) { + Iterator> it = stream.iterator(); + while (it.hasNext()) { + org.aksw.commons.path.core.Path res = it.next(); + if (logger.isDebugEnabled()) { + logger.debug("Finalizing: " + res); + } + TxnResourceApi api = txn.getResourceApi(res); + + org.aksw.commons.path.core.Path resourceKey = api.getResourceKey(); + + Path targetFile = api.getFileSync().getTargetFile(); + if (isCommit) { + api.finalizeCommit(); + } else { + api.rollback(); + } + + // Clean up empty paths + FileUtils.deleteEmptyFolders(targetFile.getParent(), resRepoRootPath, true); + } + } + + txn.cleanUpTxn(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void unlock(Txn txn, TxnHandler handler) { //LoadingCache, SyncedDataset> syncCache) { + boolean isCommit; + try { + isCommit = txn.isCommit() && !txn.isRollback(); + } catch (IOException e1) { + throw new RuntimeException(e1); + } + + try { + // TODO Stream the relPaths rather than the string resource names? + try (Stream> stream = txn.streamAccessedResourcePaths()) { + Iterator> it = stream.iterator(); + while (it.hasNext()) { + org.aksw.commons.path.core.Path res = it.next(); + if (logger.isDebugEnabled()) { + logger.debug("Unlocking: " + res); + } + TxnResourceApi api = txn.getResourceApi(res); + + org.aksw.commons.path.core.Path resourceKey = api.getResourceKey(); + + handler.beforeUnlock(resourceKey, isCommit); + api.unlock(); + api.undeclareAccess(); + } + } + + txn.cleanUpTxn(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void abort(Txn txn, TxnHandler handler) { + try { + txn.addRollback(); + applyJournal(txn, handler); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + handler.end(); + } + } + + public static void rollbackOrEnd(Txn txn, TxnHandler handler) throws IOException { + if (logger.isInfoEnabled()) { + logger.info("Detected stale txn; applying rollback: " + txn.getId()); + } + if (!txn.isCommit()) { + txn.addRollback(); + } + applyJournal(txn, handler); + } +} diff --git a/aksw-commons-utils-parent/aksw-commons-utils/src/main/java/org/aksw/commons/util/concurrent/ScheduleOnce.java b/aksw-commons-utils-parent/aksw-commons-utils/src/main/java/org/aksw/commons/util/concurrent/ScheduleOnce.java index 40646399..013b7334 100644 --- a/aksw-commons-utils-parent/aksw-commons-utils/src/main/java/org/aksw/commons/util/concurrent/ScheduleOnce.java +++ b/aksw-commons-utils-parent/aksw-commons-utils/src/main/java/org/aksw/commons/util/concurrent/ScheduleOnce.java @@ -57,10 +57,14 @@ public void scheduleTask() { lastRequestTime = Instant.now(); if (lastExecTime != null && lastRequestTime.isAfter(lastExecTime)) { - logger.info("Scheduled task with a delay of " + execDelay); + if (logger.isInfoEnabled()) { + logger.info("Scheduled task with a delay of " + execDelay); + } lastExecTime = null; scheduledExecutorService.schedule(() -> { - logger.info("Running task " + task); + if (logger.isInfoEnabled()) { + logger.info("Running task " + task); + } synchronized (lock) { lastExecTime = Instant.now(); @@ -69,7 +73,9 @@ public void scheduleTask() { try { return task.call(); } catch (Exception e) { - logger.warn("Task execution failed", e); + if (logger.isWarnEnabled()) { + logger.warn("Task execution failed", e); + } throw new RuntimeException(e); } }, execDelay.toMillis(), TimeUnit.MILLISECONDS);