Merge pull request #2259 from cryptomator/fix/2258-catching-executors

Added catching executors
This commit is contained in:
JaniruTEC 2022-05-27 18:24:55 +02:00 committed by GitHub
commit e6ff7a6d88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 7 deletions

View File

@ -0,0 +1,92 @@
package org.cryptomator.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javafx.application.Platform;
import javafx.concurrent.Task;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//Inspired by: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html#afterExecute(java.lang.Runnable,java.lang.Throwable)
public final class CatchingExecutors {
private static final Logger LOG = LoggerFactory.getLogger(CatchingExecutors.class);
private CatchingExecutors() { /* NO-OP */ }
public static class CatchingScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
public CatchingScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
afterExecuteInternal(runnable, throwable);
}
}
public static class CatchingThreadPoolExecutor extends ThreadPoolExecutor {
public CatchingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
afterExecuteInternal(runnable, throwable);
}
}
private static void afterExecuteInternal(Runnable runnable, Throwable throwable) {
if (throwable != null) {
callHandler(Thread.currentThread(), throwable);
} else if (runnable instanceof Task<?> t) {
afterExecuteTask(t);
} else if (runnable instanceof Future<?> f) {
afterExecuteFuture(f);
}
//Errors in this method are delegated to the UncaughtExceptionHandler of the current thread
}
private static void callHandler(Thread thread, Throwable throwable) {
Objects.requireNonNullElseGet(thread.getUncaughtExceptionHandler(), CatchingExecutors::fallbackHandler).uncaughtException(thread, throwable);
}
private static Thread.UncaughtExceptionHandler fallbackHandler() {
return (thread, throwable) -> LOG.error("FALLBACK: Uncaught exception in " + thread.getName(), throwable);
}
private static void afterExecuteTask(Task<?> task) {
var caller = Thread.currentThread();
Platform.runLater(() -> {
if (task.getOnFailed() == null) {
callHandler(caller, task.getException());
}
});
}
private static void afterExecuteFuture(Future<?> future) {
assert future.isDone();
try {
future.get();
} catch (CancellationException ce) {
callHandler(Thread.currentThread(), ce);
} catch (ExecutionException ee) {
callHandler(Thread.currentThread(), ee.getCause());
} catch (InterruptedException ie) {
//Ignore/Reset
Thread.currentThread().interrupt();
}
}
}

View File

@ -12,9 +12,7 @@ import org.apache.commons.lang3.SystemUtils;
import org.cryptomator.common.keychain.KeychainModule;
import org.cryptomator.common.settings.Settings;
import org.cryptomator.common.settings.SettingsProvider;
import org.cryptomator.common.vaults.Vault;
import org.cryptomator.common.vaults.VaultComponent;
import org.cryptomator.common.vaults.VaultListManager;
import org.cryptomator.common.vaults.VaultListModule;
import org.cryptomator.cryptolib.common.MasterkeyFileAccess;
import org.cryptomator.frontend.webdav.WebDavServer;
@ -25,16 +23,13 @@ import javax.inject.Named;
import javax.inject.Singleton;
import javafx.beans.binding.Binding;
import javafx.beans.binding.Bindings;
import javafx.collections.ObservableList;
import java.net.InetSocketAddress;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Comparator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -93,7 +88,7 @@ public abstract class CommonsModule {
@Singleton
static ScheduledExecutorService provideScheduledExecutorService(ShutdownHook shutdownHook) {
final AtomicInteger threadNumber = new AtomicInteger(1);
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(NUM_SCHEDULER_THREADS, r -> {
ScheduledExecutorService executorService = new CatchingExecutors.CatchingScheduledThreadPoolExecutor(NUM_SCHEDULER_THREADS, r -> {
String name = String.format("App Scheduled Executor %02d", threadNumber.getAndIncrement());
Thread t = new Thread(r);
t.setName(name);
@ -110,7 +105,7 @@ public abstract class CommonsModule {
@Singleton
static ExecutorService provideExecutorService(ShutdownHook shutdownHook) {
final AtomicInteger threadNumber = new AtomicInteger(1);
ExecutorService executorService = new ThreadPoolExecutor(NUM_CORE_BG_THREADS, Integer.MAX_VALUE, BG_THREAD_KEEPALIVE_SECONDS, TimeUnit.SECONDS, new SynchronousQueue<>(), r -> {
ExecutorService executorService = new CatchingExecutors.CatchingThreadPoolExecutor(NUM_CORE_BG_THREADS, Integer.MAX_VALUE, BG_THREAD_KEEPALIVE_SECONDS, TimeUnit.SECONDS, new SynchronousQueue<>(), r -> {
String name = String.format("App Background Thread %03d", threadNumber.getAndIncrement());
Thread t = new Thread(r);
t.setName(name);