Skip to content

Commit

Permalink
Update ShellImpl task scheduling
Browse files Browse the repository at this point in the history
- Prevent execTask starvation: tasks scheduled through execTask are
  now queued along with submitted tasks, executing in order of
  submission
- waitAndClose now properly waits for all tasks to complete, including
  both synchronous and asynchronous tasks
  • Loading branch information
topjohnwu committed Jun 27, 2024
1 parent 990a603 commit 9d245f0
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 41 deletions.
137 changes: 98 additions & 39 deletions core/src/main/java/com/topjohnwu/superuser/internal/ShellImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import android.text.TextUtils;

import androidx.annotation.NonNull;
import androidx.annotation.Nullable;

import com.topjohnwu.superuser.Shell;
import com.topjohnwu.superuser.ShellUtils;
Expand All @@ -38,16 +39,48 @@
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class ShellImpl extends Shell {
private volatile int status;

private final Process proc;
private final Process process;
private final NoCloseOutputStream STDIN;
private final NoCloseInputStream STDOUT;
private final NoCloseInputStream STDERR;

// Guarded by scheduleLock
private final ReentrantLock scheduleLock = new ReentrantLock();
private final Condition idle = scheduleLock.newCondition();
private final ArrayDeque<Task> tasks = new ArrayDeque<>();
private boolean runningTasks = false;
private boolean isRunningTask = false;

private static final class SyncTask implements Task {

private final Condition condition;
private boolean set = false;

SyncTask(Condition c) {
condition = c;
}

void signal() {
set = true;
condition.signal();
}

void await() {
while (!set) {
try {
condition.await();
} catch (InterruptedException ignored) {}
}
}

@Override
public void run(OutputStream stdin, InputStream stdout, InputStream stderr) {}
}

private static class NoCloseInputStream extends FilterInputStream {

Expand Down Expand Up @@ -84,12 +117,12 @@ void close0() throws IOException {
}
}

ShellImpl(BuilderImpl builder, Process process) throws IOException {
ShellImpl(BuilderImpl builder, Process proc) throws IOException {
status = UNKNOWN;
proc = process;
STDIN = new NoCloseOutputStream(process.getOutputStream());
STDOUT = new NoCloseInputStream(process.getInputStream());
STDERR = new NoCloseInputStream(process.getErrorStream());
process = proc;
STDIN = new NoCloseOutputStream(proc.getOutputStream());
STDOUT = new NoCloseInputStream(proc.getInputStream());
STDERR = new NoCloseInputStream(proc.getErrorStream());

// Shell checks might get stuck indefinitely
FutureTask<Integer> check = new FutureTask<>(this::shellCheck);
Expand Down Expand Up @@ -117,7 +150,7 @@ void close0() throws IOException {

private Integer shellCheck() throws IOException {
try {
proc.exitValue();
process.exitValue();
throw new IOException("Created process has terminated");
} catch (IllegalThreadStateException ignored) {
// Process is alive
Expand Down Expand Up @@ -156,27 +189,24 @@ private void release() {
try { STDIN.close0(); } catch (IOException ignored) {}
try { STDERR.close0(); } catch (IOException ignored) {}
try { STDOUT.close0(); } catch (IOException ignored) {}
proc.destroy();
process.destroy();
}

@Override
public boolean waitAndClose(long timeout, @NonNull TimeUnit unit) throws InterruptedException {
if (status < 0)
return true;

synchronized (tasks) {
if (runningTasks) {
tasks.clear();
tasks.wait(unit.toMillis(timeout));
}
if (!runningTasks) {
release();
return true;
}
scheduleLock.lock();
try {
if (isRunningTask && !idle.await(timeout, unit))
return false;
close();
} finally {
scheduleLock.unlock();
}

status = UNKNOWN;
return false;
return true;
}

@Override
Expand All @@ -198,8 +228,9 @@ public boolean isAlive() {
return false;

try {
proc.exitValue();
process.exitValue();
// Process is dead, shell is not alive
release();
return false;
} catch (IllegalThreadStateException e) {
// Process is still running
Expand Down Expand Up @@ -228,43 +259,71 @@ private synchronized void exec0(@NonNull Task task) throws IOException {
}

private void processTasks() {
for (;;) {
Task task;
synchronized (tasks) {
if ((task = tasks.poll()) == null) {
runningTasks = false;
tasks.notifyAll();
return;
}
}
Task task;
while ((task = processNextTask(false)) != null) {
try {
exec0(task);
} catch (IOException ignored) {}
}
}

@Nullable
private Task processNextTask(boolean fromExec) {
scheduleLock.lock();
try {
final Task task = tasks.poll();
if (task == null) {
isRunningTask = false;
idle.signalAll();
return null;
}
if (task instanceof SyncTask) {
((SyncTask) task).signal();
return null;
}
if (fromExec) {
// Put the task back in front of the queue
tasks.offerFirst(task);
} else {
return task;
}
} finally {
scheduleLock.unlock();
}
EXECUTOR.execute(this::processTasks);
return null;
}

@Override
public void submitTask(@NonNull Task task) {
synchronized (tasks) {
scheduleLock.lock();
try {
tasks.offer(task);
if (!runningTasks) {
runningTasks = true;
if (!isRunningTask) {
isRunningTask = true;
EXECUTOR.execute(this::processTasks);
}
} finally {
scheduleLock.unlock();
}
}

@Override
public void execTask(@NonNull Task task) throws IOException {
synchronized (tasks) {
while (runningTasks) {
// Wait until all existing tasks are done
try {
tasks.wait();
} catch (InterruptedException ignored) {}
scheduleLock.lock();
try {
if (isRunningTask) {
SyncTask sync = new SyncTask(scheduleLock.newCondition());
tasks.offer(sync);
// Wait until it's our turn
sync.await();
}
isRunningTask = true;
} finally {
scheduleLock.unlock();
}
exec0(task);
processNextTask(true);
}

@NonNull
Expand Down
14 changes: 14 additions & 0 deletions example/src/main/java/com/topjohnwu/libsuexample/MainActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,20 @@ protected void onCreate(Bundle savedInstanceState) {
binding.testAsync.setOnClickListener(v ->
Shell.cmd("test_async").to(consoleList).submit());

binding.testQueue.setOnClickListener(v -> {
Shell.getShell(Shell.EXECUTOR, s -> {
Log.i(TAG, "Queue: 1");
s.newJob().to(consoleList).add("sleep 1", "echo 1").submit();
Log.i(TAG, "Queue: 2");
s.newJob().to(consoleList).add("echo 2").exec();
Log.i(TAG, "Queue: 3");
s.newJob().to(consoleList).add("sleep 1", "echo 3").submit();
Log.i(TAG, "Queue: 4");
s.newJob().to(consoleList).add("echo 4").submit();
Log.i(TAG, "Queue: done");
});
});

binding.clear.setOnClickListener(v -> binding.console.setText(""));

binding.stressTest.setOnClickListener(v -> StressTest.perform(remoteFS));
Expand Down
12 changes: 10 additions & 2 deletions example/src/main/res/layout/activity_main.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,23 @@
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_weight="1"
android:text="Shell Sync" />
android:text="Sync CMD" />

<Button
android:id="@+id/test_async"
style="?android:borderlessButtonStyle"
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_weight="1"
android:text="Shell Async" />
android:text="Async CMD" />

<Button
android:id="@+id/test_queue"
style="?android:borderlessButtonStyle"
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_weight="1"
android:text="Test Queue" />

<Button
android:id="@+id/close_shell"
Expand Down

0 comments on commit 9d245f0

Please sign in to comment.