Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LogRecord table update callbacks #2159

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ColloboqueServer(
)
}
}
refidToBaseTxnId[projectRefid] = "abacaba" // TODO: get from the database
refidToBaseTxnId[projectRefid] = EMPTY_LOG_BASE_TXN_ID // TODO: get from the database
}
} catch (e: Exception) {
throw ColloboqueServerException("Failed to init project $projectRefid", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ class ColloboqueWebSocketServer(port: Int, private val colloboqueServer: Collobo
}

override fun onMessage(message: WebSocketFrame) {
LOG.debug("Message received\n {}", message.textPayload)
val inputXlog = parseInputXlog(message.textPayload) ?: return
LOG.debug("Xlog received\n {}", inputXlog)
if (inputXlog.transactions.size != 1) {
// TODO: add multiple transactions support.
LOG.error("Only single transaction commit supported")
Expand All @@ -138,7 +138,7 @@ class ColloboqueWebSocketServer(port: Int, private val colloboqueServer: Collobo
override fun onPong(pong: WebSocketFrame?) {}

override fun onException(exception: IOException) {
LOG.error("WebSocket exception", exception)
LOG.error("WebSocket exception", exception = exception)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@
import net.sourceforge.ganttproject.storage.InputXlog;
import net.sourceforge.ganttproject.storage.ProjectDatabaseException;
import net.sourceforge.ganttproject.storage.ServerCommitResponse;
import net.sourceforge.ganttproject.storage.XlogRecord;
import net.sourceforge.ganttproject.task.CustomColumnsStorage;
import net.sourceforge.ganttproject.task.Task;
import net.sourceforge.ganttproject.task.event.TaskListenerAdapter;
import net.sourceforge.ganttproject.undo.GPUndoListener;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.jetbrains.annotations.NotNull;
Expand All @@ -82,15 +82,14 @@
import java.awt.*;
import java.awt.event.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static biz.ganttproject.storage.cloud.GPCloudHttpImplKt.getWebSocket;
import static biz.ganttproject.storage.cloud.GPCloudHttpImplKt.isColloboqueLocalTest;
import static net.sourceforge.ganttproject.storage.XlogKt.EMPTY_LOG_BASE_TXN_ID;

/**
* Main frame of the project
Expand Down Expand Up @@ -167,7 +166,7 @@ private static class TxnCommitInfo {
/** If `oldTxnId` is currently being hold, sets the txn ID to `newTxnId` and moves the local ID ahead by `committedNum`. */
void update(String oldTxnId, String newTxnId, int committedNum) {
myTxnId.updateAndGet(oldValue -> {
if (oldValue.left.equals(oldTxnId)) {
if (Objects.equals(oldValue.left, oldTxnId)) {
return new ImmutablePair<>(newTxnId, oldValue.right + committedNum);
} else {
return oldValue;
Expand All @@ -178,9 +177,13 @@ void update(String oldTxnId, String newTxnId, int committedNum) {
ImmutablePair<String, Integer> get() {
return myTxnId.get();
}

void reset() {
myTxnId.set(new ImmutablePair<>(null, 0));
}
}

private final TxnCommitInfo myBaseTxnCommitInfo = new TxnCommitInfo("", 0);
private final TxnCommitInfo myBaseTxnCommitInfo = new TxnCommitInfo(null, 0);


public GanttProject(boolean isOnlyViewer) {
Expand Down Expand Up @@ -208,10 +211,6 @@ public GanttProject(boolean isOnlyViewer) {
getWebSocket().register(null);
getWebSocket().onCommitResponseReceived(this::fireXlogReceived);
getWebSocket().onBaseTxnIdReceived(this::onBaseTxnIdReceived);
var taskListenerAdapter = new TaskListenerAdapter();
// TODO: add listeners sensibly.
taskListenerAdapter.setTaskAddedHandler(event -> this.sendProjectStateLogs());
getTaskManager().addTaskListener(taskListenerAdapter);
}

area = new GanttGraphicArea(this, getTaskManager(), getZoomManager(), getUndoManager(),
Expand Down Expand Up @@ -956,33 +955,73 @@ public void refresh() {
super.repaint();
}

// TODO: Accumulate changes instead of sending it every time.
private interface TxnSendListener {
void onSendCompleted();
}

private final AtomicReference<TxnSendListener> txnSendingListener = new AtomicReference<>();

private Unit sendProjectStateLogs() {
gpLogger.debug("Sending project state logs");
if (txnSendingListener.get() != null) return Unit.INSTANCE;
var baseTxnCommitInfo = myBaseTxnCommitInfo.get();
if (baseTxnCommitInfo.left == null) {
// Connection with the server was not established.
return Unit.INSTANCE;
}
List<XlogRecord> transactions;
try {
var baseTxnCommitInfo = myBaseTxnCommitInfo.get();
var txns = myProjectDatabase.fetchTransactions(baseTxnCommitInfo.right + 1, 1);
if (!txns.isEmpty()) {
transactions = myProjectDatabase.fetchTransactions(baseTxnCommitInfo.right + 1, 1);
} catch (ProjectDatabaseException e) {
gpLogger.error("Failed to send logs", new Object[]{}, ImmutableMap.of(), e);
dbarashev marked this conversation as resolved.
Show resolved Hide resolved
return Unit.INSTANCE;
}
if (!transactions.isEmpty()) {
var listener = new TxnSendListener() {
@Override
public void onSendCompleted() {
txnSendingListener.compareAndSet(this, null);
}
};
if (txnSendingListener.compareAndSet(null, listener)) {
getWebSocket().sendLogs(new InputXlog(
baseTxnCommitInfo.left,
"userId",
"refid",
txns
transactions
));
} else {
// Logs were sent by another thread, no action required.
}
} catch (ProjectDatabaseException e) {
gpLogger.error("Failed to send logs", new Object[]{}, ImmutableMap.of(), e);
}
return Unit.INSTANCE;
}

@Override
protected Unit onProjectLogUpdate() {
if (isColloboqueLocalTest()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we have more and more these checks. I thought that we would have just a few differences between the "local test" and "prod" modes (url, authentication), but now it seems that we will just not do anything in prod.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this check? And maybe the super call too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm
We perform websocket handshake (with base txn id & project refid exchange) only under this check.
If we remove the check, we'll log an error every time a new log record is added for an offline document, won't we?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, in this case we shall check "if we are working with an online document, in any sense", not "is it a local online document setup".

We probably need to modify the local dev server so that it could respond to project read and write operations, just like a regular GP Cloud server. In this case the check may look like if (project.document is OnlineDocument)

super.onProjectLogUpdate();
sendProjectStateLogs();
}
return Unit.INSTANCE;
}

private Unit fireXlogReceived(ServerCommitResponse response) {
myBaseTxnCommitInfo.update(response.getBaseTxnId(), response.getNewBaseTxnId(), 1);
txnSendingListener.get().onSendCompleted();
sendProjectStateLogs();
return Unit.INSTANCE;
}

// TODO: sync logs with the server.
private Unit onBaseTxnIdReceived(String baseTxnId) {
myBaseTxnCommitInfo.update("", baseTxnId, 0);
// Websocket is [re-]started. Previous messages are discarded.
txnSendingListener.set(null);
if (EMPTY_LOG_BASE_TXN_ID.equals(baseTxnId)) {
myBaseTxnCommitInfo.reset();
myBaseTxnCommitInfo.update(null, baseTxnId, 0);
sendProjectStateLogs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it? Sending logs doesn't make too much sense if we just have reset the state.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such a case, we'll send all the logs from the beginning, one by one
Where should we start sending logs instead? I thought that it's the right place

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Send the logs when an undoable edit happens. But I don't think that a user has any chance to make an undoable edit immediately at the moment when we reset the log.

}
return Unit.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ of the License, or (at your option) any later version.
import javafx.beans.property.SimpleIntegerProperty;
import javafx.beans.property.SimpleObjectProperty;
import javafx.collections.FXCollections;
import kotlin.Unit;
import kotlin.jvm.functions.Function0;
import net.sourceforge.ganttproject.chart.Chart;
import net.sourceforge.ganttproject.chart.ChartModelBase;
Expand Down Expand Up @@ -79,6 +80,7 @@ of the License, or (at your option) any later version.
import java.util.Map;
import java.util.function.Supplier;


/**
* This class is designed to be a GanttProject-after-refactorings. I am going to
* refactor GanttProject in order to make true view communicating with other
Expand Down Expand Up @@ -192,7 +194,10 @@ GPOptionGroup getTaskOptions() {

protected GanttProjectBase() {
super("GanttProject");
var databaseProxy = new LazyProjectDatabaseProxy(SqlProjectDatabaseImpl.Factory::createInMemoryDatabase, this::getTaskManager);
var databaseProxy = new LazyProjectDatabaseProxy(
() -> SqlProjectDatabaseImpl.Factory.createInMemoryDatabase(this::onProjectLogUpdate),
this::getTaskManager
);

myProjectDatabase = databaseProxy;
myTaskManagerConfig = new TaskManagerConfigImpl();
Expand Down Expand Up @@ -258,6 +263,9 @@ protected ParserFactory getParserFactory() {
protected GanttProjectImpl getProjectImpl() {
return myProjectImpl;
}

protected Unit onProjectLogUpdate() { return Unit.INSTANCE; }

@Override
public void restore(@NotNull Document fromDocument) throws Document.DocumentException, IOException {
GanttProjectImplKt.restoreProject(this, fromDocument, myProjectImpl.getListeners());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,33 @@ class SqlProjectDatabaseImpl(private val dataSource: DataSource) : ProjectDataba
dataSource.setURL(H2_IN_MEMORY_URL)
return SqlProjectDatabaseImpl(dataSource)
}

fun createInMemoryDatabase(logUpdateCallback: () -> Unit): ProjectDatabase {
val dataSource = JdbcDataSource()
dataSource.setURL(H2_IN_MEMORY_URL)
val database = SqlProjectDatabaseImpl(dataSource)
database.addLogUpdateCallback(logUpdateCallback)
return database
}
}

/** Queries which belong to the current transaction. Null if each statement should be committed separately. */
private var currentTxn: TransactionImpl? = null
private var localTxnId: Int = 1

private val logUpdateCallbacks: MutableList<() -> Unit> = mutableListOf()

/** Log update callbacks are invoked when a new log record is added. */
fun addLogUpdateCallback(listener: () -> Unit) = logUpdateCallbacks.add(listener)

private fun onLogUpdate() = logUpdateCallbacks.forEach {
try {
it.invoke()
} catch (e: Exception) {
LOG.error("Failed to execute update callback", e)
}
}

private fun <T> withDSL(
errorMessage: () -> String = { "Failed to execute query" },
body: (dsl: DSLContext) -> T
Expand Down Expand Up @@ -90,6 +111,7 @@ class SqlProjectDatabaseImpl(private val dataSource: DataSource) : ProjectDataba
}
}
}
onLogUpdate()
}

/** Add a query to the current txn. Executes immediately if no transaction started. */
Expand Down Expand Up @@ -155,6 +177,7 @@ class SqlProjectDatabaseImpl(private val dataSource: DataSource) : ProjectDataba
@Throws(ProjectDatabaseException::class)
internal fun commitTransaction(txn: TransactionImpl) {
try {
if (txn.statements.isEmpty()) return
executeAndLog(txn.statements, localTxnId)
localTxnId++ // Increment only on success.
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,7 @@ data class ServerCommitError(
)

const val SERVER_COMMIT_RESPONSE_TYPE = "ServerCommitResponse"
const val SERVER_COMMIT_ERROR_TYPE = "ServerCommitError"
const val SERVER_COMMIT_ERROR_TYPE = "ServerCommitError"

/** Base txn ID for the empty log state. */
const val EMPTY_LOG_BASE_TXN_ID = "abacaba"