Introduce internal shell task scheduling

This commit is contained in:
topjohnwu 2024-06-11 18:36:27 -07:00
parent dc6d1ec3d7
commit e01d4cbb8d
9 changed files with 338 additions and 221 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2023 John "topjohnwu" Wu
* Copyright 2024 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,7 +20,6 @@ import static com.topjohnwu.superuser.Shell.EXECUTOR;
import static java.nio.charset.StandardCharsets.UTF_8;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import com.topjohnwu.superuser.Shell;
@ -36,56 +35,36 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
class JobImpl extends Shell.Job implements Shell.Task, Closeable {
abstract class JobTask extends Shell.Job implements Shell.Task, Closeable {
private static final List<String> UNSET_ERR = new ArrayList<>(0);
static final String END_UUID = UUID.randomUUID().toString();
static final byte[] END_CMD = String
static final int UUID_LEN = 36;
private static final byte[] END_CMD = String
.format("__RET=$?;echo %1$s;echo %1$s >&2;echo $__RET;unset __RET\n", END_UUID)
.getBytes(UTF_8);
static final int UUID_LEN = 36;
private static final List<String> UNSET_ERR = new ArrayList<>(0);
private final List<ShellInputSource> sources = new ArrayList<>();
private final ResultImpl result = new ResultImpl();
boolean redirect;
protected List<String> out;
protected List<String> err = UNSET_ERR;
protected ShellImpl shell;
JobImpl() {}
JobImpl(ShellImpl s) {
shell = s;
}
protected Executor callbackExecutor;
protected Shell.ResultCallback callback;
@Override
public void run(@NonNull OutputStream stdin,
@NonNull InputStream stdout,
@NonNull InputStream stderr) throws IOException {
Future<Integer> outGobbler = EXECUTOR.submit(new StreamGobbler.OUT(stdout, result.out));
Future<Void> errGobbler = EXECUTOR.submit(new StreamGobbler.ERR(stderr, result.err));
for (ShellInputSource src : sources)
src.serve(stdin);
stdin.write(END_CMD);
stdin.flush();
try {
result.code = outGobbler.get();
errGobbler.get();
} catch (ExecutionException | InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}
private ResultImpl exec0() {
@NonNull InputStream stderr) {
boolean noErr = err == UNSET_ERR;
ResultImpl result = new ResultImpl();
result.out = out;
result.err = noErr ? null : err;
if (noErr && shell.redirect)
if (noErr && redirect)
result.err = out;
if (result.out != null && result.out == result.err && !Utils.isSynchronized(result.out)) {
@ -97,39 +76,33 @@ class JobImpl extends Shell.Job implements Shell.Task, Closeable {
}
try {
shell.execTask(this);
Future<Integer> outGobbler = EXECUTOR.submit(new StreamGobbler.OUT(stdout, result.out));
Future<Void> errGobbler = EXECUTOR.submit(new StreamGobbler.ERR(stderr, result.err));
for (ShellInputSource src : sources)
src.serve(stdin);
stdin.write(END_CMD);
stdin.flush();
try {
result.code = outGobbler.get();
errGobbler.get();
result.out = out;
result.err = noErr ? null : err;
} catch (ExecutionException | InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
} catch (IOException e) {
if (e instanceof ShellTerminatedException) {
return ResultImpl.SHELL_ERR;
result = ResultImpl.SHELL_ERR;
} else {
Utils.err(e);
return ResultImpl.INSTANCE;
result = ResultImpl.INSTANCE;
}
} finally {
close();
result.out = out;
result.err = noErr ? null : err;
result.callback(callbackExecutor, callback);
}
return result;
}
@NonNull
@Override
public Shell.Result exec() {
return exec0();
}
@NonNull
@Override
public Future<Shell.Result> enqueue() {
FutureTask<Shell.Result> future = new FutureTask<>(this::exec0);
shell.executor.execute(future);
return future;
}
@Override
public void submit(@Nullable Executor executor, @Nullable Shell.ResultCallback cb) {
shell.executor.execute(() -> exec0().callback(executor, cb));
}
@NonNull

View File

@ -1,5 +1,5 @@
/*
* Copyright 2023 John "topjohnwu" Wu
* Copyright 2024 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,23 +22,22 @@ import androidx.annotation.Nullable;
import com.topjohnwu.superuser.NoShellException;
import com.topjohnwu.superuser.Shell;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
class PendingJob extends JobImpl {
class PendingJob extends JobTask {
private final boolean isSU;
private boolean retry;
PendingJob(boolean su) {
isSU = su;
retry = true;
to(NOPList.getInstance());
}
@NonNull
@Override
public Shell.Result exec() {
private Shell.Result exec0(ResultHolder h) {
ShellImpl shell;
try {
shell = MainShell.get();
} catch (NoShellException e) {
@ -49,37 +48,80 @@ class PendingJob extends JobImpl {
close();
return ResultImpl.INSTANCE;
}
try {
shell.execTask(this);
} catch (IOException ignored) { /* JobTask does not throw */ }
return h.result;
}
@NonNull
@Override
public Shell.Result exec() {
ResultHolder h = new ResultHolder();
callback = h;
callbackExecutor = null;
if (out instanceof NOPList)
out = new ArrayList<>();
Shell.Result res = super.exec();
if (retry && res == ResultImpl.SHELL_ERR) {
Shell.Result r = exec0(h);
if (r == ResultImpl.SHELL_ERR) {
// The cached shell is terminated, try to re-run this task
retry = false;
return exec();
return exec0(h);
}
return res;
return r;
}
private class RetryCallback implements Shell.ResultCallback {
private final Shell.ResultCallback base;
private boolean retry = true;
RetryCallback(Shell.ResultCallback b) {
base = b;
}
@Override
public void onResult(@NonNull Shell.Result out) {
if (retry && out == ResultImpl.SHELL_ERR) {
// The cached shell is terminated, try to re-schedule this task
retry = false;
submit0();
} else if (base != null) {
base.onResult(out);
}
}
}
private void submit0() {
MainShell.get(null, s -> {
if (isSU && !s.isRoot()) {
close();
ResultImpl.INSTANCE.callback(callbackExecutor, callback);
return;
}
ShellImpl shell = (ShellImpl) s;
shell.submitTask(this);
});
}
@NonNull
@Override
public Future<Shell.Result> enqueue() {
ResultFuture f = new ResultFuture();
callback = new RetryCallback(f);
callbackExecutor = null;
if (out instanceof NOPList)
out = new ArrayList<>();
submit0();
return f;
}
@Override
public void submit(@Nullable Executor executor, @Nullable Shell.ResultCallback cb) {
MainShell.get(null, s -> {
if (isSU && !s.isRoot()) {
close();
ResultImpl.INSTANCE.callback(executor, cb);
return;
}
if (out instanceof NOPList)
out = (cb == null) ? null : new ArrayList<>();
shell = (ShellImpl) s;
super.submit(executor, res -> {
if (retry && res == ResultImpl.SHELL_ERR) {
// The cached shell is terminated, try to re-schedule this task
retry = false;
submit(executor, cb);
} else if (cb != null) {
cb.onResult(res);
}
});
});
callbackExecutor = executor;
callback = new RetryCallback(cb);
if (out instanceof NOPList)
out = (cb == null) ? null : new ArrayList<>();
submit0();
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2024 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.topjohnwu.superuser.internal;
import androidx.annotation.NonNull;
import com.topjohnwu.superuser.Shell;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
class ResultFuture implements Shell.ResultCallback, Future<Shell.Result> {
Shell.Result result;
private CountDownLatch latch = new CountDownLatch(1);
@Override
public void onResult(@NonNull Shell.Result out) {
result = out;
latch.countDown();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return latch.getCount() != 0;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return latch.getCount() == 0;
}
@Override
public Shell.Result get() throws InterruptedException {
latch.await();
return result;
}
@Override
public Shell.Result get(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (!latch.await(timeout, unit)) {
throw new TimeoutException();
}
return result;
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2024 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.topjohnwu.superuser.internal;
import androidx.annotation.NonNull;
import com.topjohnwu.superuser.Shell;
class ResultHolder implements Shell.ResultCallback {
Shell.Result result;
@Override
public void onResult(@NonNull Shell.Result out) {
result = out;
}
}

View File

@ -1,108 +0,0 @@
/*
* Copyright 2023 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.topjohnwu.superuser.internal;
import androidx.annotation.RestrictTo;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static com.topjohnwu.superuser.Shell.EXECUTOR;
@RestrictTo(RestrictTo.Scope.LIBRARY)
public class SerialExecutorService extends AbstractExecutorService implements Callable<Void> {
private boolean isShutdown = false;
private ArrayDeque<Runnable> mTasks = new ArrayDeque<>();
private FutureTask<Void> scheduleTask = null;
@Override
public Void call() {
for (;;) {
Runnable task;
synchronized (this) {
if ((task = mTasks.poll()) == null) {
scheduleTask = null;
return null;
}
}
task.run();
}
}
@Override
public synchronized void execute(Runnable r) {
if (isShutdown) {
throw new RejectedExecutionException(
"Task " + r.toString() + " rejected from " + toString());
}
mTasks.offer(r);
if (scheduleTask == null) {
scheduleTask = new FutureTask<>(this);
EXECUTOR.execute(scheduleTask);
}
}
@Override
public synchronized void shutdown() {
isShutdown = true;
mTasks.clear();
}
@Override
public synchronized List<Runnable> shutdownNow() {
isShutdown = true;
if (scheduleTask != null)
scheduleTask.cancel(true);
try {
return new ArrayList<>(mTasks);
} finally {
mTasks.clear();
}
}
@Override
public synchronized boolean isShutdown() {
return isShutdown;
}
@Override
public synchronized boolean isTerminated() {
return isShutdown && scheduleTask == null;
}
@Override
public synchronized boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
if (scheduleTask == null)
return true;
try {
scheduleTask.get(timeout, unit);
} catch (TimeoutException e) {
return false;
} catch (ExecutionException ignored) {}
return true;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2023 John "topjohnwu" Wu
* Copyright 2024 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -33,9 +33,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -47,14 +47,15 @@ class ShellTerminatedException extends IOException {
}
class ShellImpl extends Shell {
private int status;
private volatile int status;
final ExecutorService executor;
final boolean redirect;
private final boolean redirect;
private final Process proc;
private final NoCloseOutputStream STDIN;
private final NoCloseInputStream STDOUT;
private final NoCloseInputStream STDERR;
private final ArrayDeque<Task> tasks = new ArrayDeque<>();
private boolean runningTasks = false;
private static class NoCloseInputStream extends FilterInputStream {
@ -98,10 +99,10 @@ class ShellImpl extends Shell {
STDIN = new NoCloseOutputStream(process.getOutputStream());
STDOUT = new NoCloseInputStream(process.getInputStream());
STDERR = new NoCloseInputStream(process.getErrorStream());
executor = new SerialExecutorService();
// Shell checks might get stuck indefinitely
Future<Integer> check = executor.submit(this::shellCheck);
FutureTask<Integer> check = new FutureTask<>(this::shellCheck);
EXECUTOR.execute(check);
try {
try {
status = check.get(builder.timeout, TimeUnit.SECONDS);
@ -118,7 +119,6 @@ class ShellImpl extends Shell {
throw new IOException("Shell check interrupted", e);
}
} catch (IOException e) {
executor.shutdownNow();
release();
throw e;
}
@ -172,21 +172,26 @@ class ShellImpl extends Shell {
public boolean waitAndClose(long timeout, @NonNull TimeUnit unit) throws InterruptedException {
if (status < 0)
return true;
executor.shutdown();
if (executor.awaitTermination(timeout, unit)) {
release();
return true;
} else {
status = UNKNOWN;
return false;
synchronized (tasks) {
if (runningTasks) {
tasks.clear();
tasks.wait(unit.toMillis(timeout));
}
if (!runningTasks) {
release();
return true;
}
}
status = UNKNOWN;
return false;
}
@Override
public void close() {
if (status < 0)
return;
executor.shutdownNow();
release();
}
@ -211,8 +216,7 @@ class ShellImpl extends Shell {
}
}
@Override
public synchronized void execTask(@NonNull Task task) throws IOException {
private synchronized void exec0(Task task) throws IOException {
if (status < 0)
throw new ShellTerminatedException();
@ -227,13 +231,56 @@ class ShellImpl extends Shell {
throw new ShellTerminatedException();
}
if (task instanceof JobTask) {
((JobTask) task).redirect = redirect;
}
task.run(STDIN, STDOUT, STDERR);
}
private void processTasks() {
Task task;
for (;;) {
synchronized (tasks) {
if ((task = tasks.poll()) == null) {
runningTasks = false;
tasks.notifyAll();
return;
}
}
try {
exec0(task);
} catch (IOException ignored) {}
}
}
void submitTask(Task task) {
synchronized (tasks) {
tasks.offer(task);
if (!runningTasks) {
runningTasks = true;
EXECUTOR.execute(this::processTasks);
}
}
}
@Override
public void execTask(@NonNull Task task) throws IOException {
synchronized (tasks) {
while (runningTasks) {
// Wait until all existing tasks are done
try {
tasks.wait();
} catch (InterruptedException ignored) {}
}
}
exec0(task);
}
@NonNull
@Override
public Job newJob() {
return new JobImpl(this);
return new ShellJob(this);
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2024 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.topjohnwu.superuser.internal;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import com.topjohnwu.superuser.Shell;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
class ShellJob extends JobTask {
private final ShellImpl shell;
ShellJob(ShellImpl s) {
shell = s;
}
@NonNull
@Override
public Shell.Result exec() {
ResultHolder h = new ResultHolder();
callback = h;
callbackExecutor = null;
try {
shell.execTask(this);
} catch (IOException ignored) { /* JobTask does not throw */ }
return h.result;
}
@Override
public void submit(@Nullable Executor executor, @Nullable Shell.ResultCallback cb) {
callbackExecutor = executor;
callback = cb;
shell.submitTask(this);
}
@NonNull
@Override
public Future<Shell.Result> enqueue() {
ResultFuture f = new ResultFuture();
callback = f;
callbackExecutor = null;
shell.submitTask(this);
return f;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2023 John "topjohnwu" Wu
* Copyright 2024 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,8 +16,8 @@
package com.topjohnwu.superuser.internal;
import static com.topjohnwu.superuser.internal.JobImpl.END_UUID;
import static com.topjohnwu.superuser.internal.JobImpl.UUID_LEN;
import static com.topjohnwu.superuser.internal.JobTask.END_UUID;
import static com.topjohnwu.superuser.internal.JobTask.UUID_LEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.BufferedReader;

Binary file not shown.