Skip to content

Commit

Permalink
Try to be compatible with the plugin server AGAIN.
Browse files Browse the repository at this point in the history
  • Loading branch information
KasumiNova committed Jan 26, 2023
1 parent a3b5491 commit 4af4776
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 194 deletions.
81 changes: 15 additions & 66 deletions src/main/java/github/kasuminova/mmce/concurrent/TaskExecutor.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package github.kasuminova.mmce.concurrent;

import hellfirepvp.modularmachinery.ModularMachinery;
import net.minecraftforge.fml.common.eventhandler.SubscribeEvent;
import net.minecraftforge.fml.common.gameevent.TickEvent;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.LinkedList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicLong;

/**
* 一个简单的单 Tick 并发执行器
Expand All @@ -16,37 +14,17 @@
public class TaskExecutor {
public static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors() / 2;
public static final int THREAD_COUNT = Math.max(AVAILABLE_PROCESSORS, 4);
public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(AVAILABLE_PROCESSORS);
public static long totalSubmitted = 0;
public static long totalExecuted = 0;
public static long taskUsedTime = 0;
public static AtomicLong taskUsedTime = new AtomicLong(0);
public static long totalUsedTime = 0;
public static long tickExisted = 0;
public final AtomicInteger completedThreadCount = new AtomicInteger(0);
private final ArrayList<TaskExecutorThread> executors = new ArrayList<>((int) (THREAD_COUNT * 1.5));
private final ConcurrentLinkedQueue<Action> preActions = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Action> postActions = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Action> mainThreadActions = new ConcurrentLinkedQueue<>();
public Thread serverThread = null;

public TaskExecutor() {
for (int i = 0; i < THREAD_COUNT; i++) {
TaskExecutorThread taskExecutorThread = new TaskExecutorThread(this, completedThreadCount);
taskExecutorThread.start();
executors.add(taskExecutorThread);
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (TaskExecutorThread executor : executors) {
executor.actions.set(null);
LockSupport.unpark(executor.executorThread);
executor.executorThread.interrupt();
}
}));
}
private final LinkedList<Action> preActions = new LinkedList<>();
private final LinkedList<Action> postActions = new LinkedList<>();
private final LinkedList<Action> mainThreadActions = new LinkedList<>();

@SubscribeEvent
public void onTick(final TickEvent.ServerTickEvent event) {
serverThread = Thread.currentThread();

switch (event.phase) {
case START: {
executeActions(preActions);
Expand All @@ -60,24 +38,17 @@ public void onTick(final TickEvent.ServerTickEvent event) {
}
}

public void executeActions(final ConcurrentLinkedQueue<Action> actions) {
public void executeActions(final LinkedList<Action> actions) {
if (actions.isEmpty()) return;

long time = System.nanoTime() / 1000;

for (TaskExecutorThread executor : executors) {
executor.actions.set(actions);
LockSupport.unpark(executor.executorThread);
}

await(actions);

for (TaskExecutorThread executor : executors) {
taskUsedTime += executor.usedTime;
totalExecuted += executor.executed;
}

completedThreadCount.set(0);
actions.stream().parallel().forEach(action -> {
long start = System.nanoTime();
action.doAction();
taskUsedTime.getAndAdd((System.nanoTime() - start) / 1000);
});
actions.clear();

Action action;
while ((action = mainThreadActions.poll()) != null) {
Expand Down Expand Up @@ -110,26 +81,4 @@ public void addPostTickTask(final Action action) {
public void addMainThreadTask(final Action action) {
mainThreadActions.add(action);
}

public void await(final ConcurrentLinkedQueue<Action> actions) {
long time = System.currentTimeMillis();
//Timeout 1000ms
LockSupport.parkNanos(1000 * 1000 * 250);
if (System.currentTimeMillis() - time > 250) {
ModularMachinery.log.warn(
String.format(
"[Modular Machinery] Parallel task execute timeout for 250ms (%s Threads Completed / %s Thread Total, %s Tasks Left).",
completedThreadCount.get(),
THREAD_COUNT,
actions.size())
);
}
}

public void onThreadFinished() {
int completed = completedThreadCount.incrementAndGet();
if (completed >= THREAD_COUNT) {
LockSupport.unpark(serverThread);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ public void execute(MinecraftServer server, ICommandSender sender, String[] args
if (args.length > 0 && args[0].equals("reset")) {
TaskExecutor.tickExisted = 0;
TaskExecutor.totalSubmitted = 0;
TaskExecutor.totalExecuted = 0;
TaskExecutor.totalUsedTime = 0;
TaskExecutor.taskUsedTime = 0;
TaskExecutor.taskUsedTime.set(0);
sender.sendMessage(new TextComponentTranslation("command.modularmachinery.performance_report.reset"));
return;
}

long totalExecuted = TaskExecutor.totalExecuted;
long taskUsedTime = TaskExecutor.taskUsedTime;
long totalSubmitted = TaskExecutor.totalSubmitted;
long taskUsedTime = TaskExecutor.taskUsedTime.get();
long totalUsedTime = TaskExecutor.totalUsedTime;
long tickExisted = TaskExecutor.tickExisted;

Expand All @@ -53,17 +52,12 @@ public void execute(MinecraftServer server, ICommandSender sender, String[] args
MiscUtils.formatDecimal(TaskExecutor.totalSubmitted))
);

sender.sendMessage(
new TextComponentTranslation("command.modularmachinery.performance_report.total_executed",
MiscUtils.formatDecimal(totalExecuted))
);

sender.sendMessage(
new TextComponentTranslation("command.modularmachinery.performance_report.total_used_time",
totalUsedTime / 1000)
);

long usedTimeAvg = totalExecuted == 0 ? 0 : taskUsedTime / totalExecuted;
long usedTimeAvg = totalSubmitted == 0 ? 0 : taskUsedTime / totalSubmitted;
sender.sendMessage(
new TextComponentTranslation("command.modularmachinery.performance_report.used_time_avg",
usedTimeAvg)
Expand All @@ -75,7 +69,7 @@ public void execute(MinecraftServer server, ICommandSender sender, String[] args
usedTimeAvgPerTick)
);

long executedAvgPerTick = totalExecuted / tickExisted;
long executedAvgPerTick = totalSubmitted / tickExisted;
sender.sendMessage(
new TextComponentTranslation("command.modularmachinery.performance_report.executed_avg_per_tick",
executedAvgPerTick)
Expand Down
Loading

0 comments on commit 4af4776

Please sign in to comment.