Implement I/O optimized for streaming

This commit is contained in:
topjohnwu 2022-05-03 22:13:05 -07:00
parent b12ba81d30
commit 691af9e6d4
7 changed files with 400 additions and 219 deletions

View File

@ -23,8 +23,8 @@ import static com.topjohnwu.superuser.nio.FileSystemManager.MODE_WRITE_ONLY;
import android.util.Log;
import com.topjohnwu.superuser.Shell;
import com.topjohnwu.superuser.internal.Utils;
import com.topjohnwu.superuser.io.SuFile;
import com.topjohnwu.superuser.io.SuRandomAccessFile;
import com.topjohnwu.superuser.nio.ExtendedFile;
import com.topjohnwu.superuser.nio.FileSystemManager;
@ -41,7 +41,12 @@ import java.util.Random;
public class StressTest {
interface FileCallback {
void onFile(ExtendedFile file) throws Exception;
}
private static final String TEST_DIR= "/system/app";
private static final int BUFFER_SIZE = 512 * 1024;
private static final Random r = new Random();
private static final MessageDigest md;
@ -59,17 +64,17 @@ public class StressTest {
private static FileCallback callback;
private static Map<String, byte[]> hashes;
interface FileCallback {
void onFile(ExtendedFile file) throws Exception;
}
public static void perform(FileSystemManager fs) {
remoteFS = fs;
Shell.EXECUTOR.execute(() -> {
try {
collectHashes();
testShellIO();
testRemoteIO();
// Test I/O streams
testShellStream();
testRemoteStream();
// Test random I/O
testShellRandomIO();
testRemoteChannel();
} catch (Exception e){
Log.d(TAG, "", e);
} finally {
@ -91,38 +96,66 @@ public class StressTest {
// Collect checksums of all files in test dir and use it as a reference
// to verify the correctness of the several I/O implementations.
OutputStream out = new OutputStream() {
@Override
public void write(int b) {
md.update((byte) b);
}
@Override
public void write(byte[] b, int off, int len) {
md.update(b, off, len);
}
};
Map<String, byte[]> map = new HashMap<>();
byte[] buf = new byte[BUFFER_SIZE];
callback = file -> {
try (InputStream in = file.newInputStream()) {
Utils.pump(in, out);
for (;;) {
int read = in.read(buf);
if (read <= 0)
break;
md.update(buf, 0, read);
}
}
map.put(file.getPath(), md.digest());
};
traverse(root);
hashes = map;
}
private static void testShellStream() throws Exception {
SuFile root = new SuFile(TEST_DIR);
SuFile outFile = new SuFile("/dev/null");
testIOStream(root, outFile);
}
private static void testRemoteStream() throws Exception {
ExtendedFile root = remoteFS.getFile(TEST_DIR);
ExtendedFile outFile = remoteFS.getFile("/dev/null");
testIOStream(root, outFile);
}
private static void testIOStream(ExtendedFile root, ExtendedFile outFile) throws Exception {
OutputStream out = outFile.newOutputStream();
byte[] buf = new byte[BUFFER_SIZE];
callback = file -> {
Log.d(TAG, file.getClass().getSimpleName() + " stream: " + file.getPath());
try (InputStream in = file.newInputStream()) {
for (;;) {
int read = in.read(buf);
if (read <= 0)
break;
out.write(buf, 0, read);
md.update(buf, 0, read);
}
out.flush();
}
};
try {
traverse(root);
} finally {
out.close();
}
hashes = map;
}
private static void testShellIO() throws Exception {
private static void testShellRandomIO() throws Exception {
SuFile root = new SuFile(TEST_DIR);
OutputStream out = new SuFile("/dev/null").newOutputStream();
byte[] buf = new byte[64 * 1024];
byte[] buf = new byte[BUFFER_SIZE];
callback = file -> {
try (InputStream in = file.newInputStream()) {
Log.d(TAG, "SuRandomAccessFile: " + file.getPath());
try (SuRandomAccessFile in = SuRandomAccessFile.open(file, "r")) {
for (;;) {
// Randomize read/write length to test unaligned I/O
int len = r.nextInt(buf.length);
@ -142,13 +175,13 @@ public class StressTest {
}
}
private static void testRemoteIO() throws Exception {
private static void testRemoteChannel() throws Exception {
ExtendedFile root = remoteFS.getFile(TEST_DIR);
FileChannel out = remoteFS.openChannel("/dev/null", MODE_WRITE_ONLY);
ByteBuffer buf = ByteBuffer.allocateDirect(512 * 1024);
ByteBuffer buf = ByteBuffer.allocateDirect(BUFFER_SIZE);
callback = file -> {
Log.d(TAG, file.getPath());
Log.d(TAG, "RemoteFileChannel: " + file.getPath());
try (FileChannel src = remoteFS.openChannel(file, MODE_READ_ONLY)) {
for (;;) {
// Randomize read/write length

View File

@ -31,10 +31,12 @@ interface IFileSystemService {
// I/O APIs
oneway void register(IBinder client);
/* (err, int) */ ParcelValues open(String path, int mode, String fifo);
/* (err, int) */ ParcelValues openChannel(String path, int mode, String fifo);
/* (err) */ ParcelValues openReadStream(String path, String fifo);
/* (err) */ ParcelValues openWriteStream(String path, String fifo, boolean append);
oneway void close(int handle);
/* (err, int) */ ParcelValues pread(int handle, int len, long offset);
/* (err, int) */ ParcelValues pwrite(int handle, int len, long offset);
/* (err) */ ParcelValues pwrite(int handle, int len, long offset);
/* (err, long) */ ParcelValues lseek(int handle, long offset, int whence);
/* (err, long) */ ParcelValues size(int handle);
/* (err) */ ParcelValues ftruncate(int handle, long length);

View File

@ -0,0 +1,84 @@
/*
* Copyright 2022 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.topjohnwu.superuser.internal;
import android.os.Binder;
import android.util.SparseArray;
import androidx.annotation.NonNull;
import java.io.IOException;
class FileContainer {
private final static String ERROR_MSG = "Requested file was not opened!";
private int nextHandle = 0;
// pid -> handle -> holder
private final SparseArray<SparseArray<FileHolder>> files = new SparseArray<>();
@NonNull
synchronized FileHolder get(int handle) throws IOException {
int pid = Binder.getCallingPid();
SparseArray<FileHolder> pidFiles = files.get(pid);
if (pidFiles == null)
throw new IOException(ERROR_MSG);
FileHolder h = pidFiles.get(handle);
if (h == null)
throw new IOException(ERROR_MSG);
return h;
}
synchronized int put(FileHolder h) {
int pid = Binder.getCallingPid();
SparseArray<FileHolder> pidFiles = files.get(pid);
if (pidFiles == null) {
pidFiles = new SparseArray<>();
files.put(pid, pidFiles);
}
int handle = nextHandle++;
pidFiles.append(handle, h);
return handle;
}
synchronized void remove(int handle) {
int pid = Binder.getCallingPid();
SparseArray<FileHolder> pidFiles = files.get(pid);
if (pidFiles == null)
return;
FileHolder h = pidFiles.get(handle);
if (h == null)
return;
pidFiles.remove(handle);
synchronized (h) {
h.close();
}
}
synchronized void pidDied(int pid) {
SparseArray<FileHolder> pidFiles = files.get(pid);
if (pidFiles == null)
return;
files.remove(pid);
for (int i = 0; i < pidFiles.size(); ++i) {
FileHolder h = pidFiles.valueAt(i);
synchronized (h) {
h.close();
}
}
}
}

View File

@ -0,0 +1,164 @@
/*
* Copyright 2022 John "topjohnwu" Wu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.topjohnwu.superuser.internal;
import android.os.Build;
import android.system.ErrnoException;
import android.system.Int64Ref;
import android.system.Os;
import android.system.OsConstants;
import android.system.StructStat;
import android.util.MutableLong;
import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
class FileHolder implements Closeable {
// All methods in this class has to be synchronized
// This is only for testing purpose
private static final boolean FORCE_NO_SPLICE = false;
FileDescriptor fd;
FileDescriptor read;
FileDescriptor write;
private ByteBuffer buf;
private StructStat st;
private ByteBuffer getBuf() {
if (buf == null)
buf = ByteBuffer.allocateDirect(FileSystemService.PIPE_CAPACITY);
buf.clear();
return buf;
}
private StructStat getStat() throws ErrnoException {
if (st == null)
st = Os.fstat(fd);
return st;
}
@Override
public void close() {
if (fd != null) {
try {
Os.close(fd);
} catch (ErrnoException ignored) {
}
fd = null;
}
if (read != null) {
try {
Os.close(read);
} catch (ErrnoException ignored) {
}
read = null;
}
if (write != null) {
try {
Os.close(write);
} catch (ErrnoException ignored) {
}
write = null;
}
}
void ensureOpen() throws ClosedChannelException {
if (fd == null)
throw new ClosedChannelException();
}
int pread(int len, long offset) throws ErrnoException, IOException {
if (fd == null || write == null)
throw new ClosedChannelException();
final long result;
if (!FORCE_NO_SPLICE && Build.VERSION.SDK_INT >= 28) {
Int64Ref inOff = offset < 0 ? null : new Int64Ref(offset);
result = FileUtils.splice(fd, inOff, write, null, len, 0);
} else {
StructStat st = getStat();
if (OsConstants.S_ISREG(st.st_mode) || OsConstants.S_ISBLK(st.st_mode)) {
// sendfile only supports reading from mmap-able files
MutableLong inOff = offset < 0 ? null : new MutableLong(offset);
result = FileUtils.sendfile(write, fd, inOff, len);
} else {
// Fallback to copy into internal buffer
ByteBuffer buf = getBuf();
buf.limit(Math.min(len, buf.capacity()));
if (offset < 0) {
Os.read(fd, buf);
} else {
Os.pread(fd, buf, offset);
}
buf.flip();
result = buf.remaining();
// Need to write all bytes
for (int sz = (int) result; sz > 0; ) {
sz -= Os.write(write, buf);
}
}
}
return (int) result;
}
int pwrite(int len, long offset, boolean exact) throws ErrnoException, IOException {
if (fd == null || read == null)
throw new ClosedChannelException();
if (!FORCE_NO_SPLICE && Build.VERSION.SDK_INT >= 28) {
Int64Ref outOff = offset < 0 ? null : new Int64Ref(offset);
if (exact) {
int sz = len;
while (sz > 0) {
sz -= FileUtils.splice(read, null, fd, outOff, sz, 0);
}
return len;
} else {
return (int) FileUtils.splice(read, null, fd, outOff, len, 0);
}
} else {
// Unfortunately, sendfile does not allow reading from pipes.
// Manually read into an internal buffer then write to output.
ByteBuffer buf = getBuf();
int sz = 0;
buf.limit(len);
if (exact) {
while (len > sz) {
sz += Os.read(read, buf);
}
} else {
sz = Os.read(read, buf);
}
len = sz;
buf.flip();
while (sz > 0) {
if (offset < 0) {
sz -= Os.write(fd, buf);
} else {
int w = Os.pwrite(fd, buf, offset);
sz -= w;
offset += w;
}
}
return len;
}
}
}

View File

@ -16,8 +16,11 @@
package com.topjohnwu.superuser.internal;
import static android.system.OsConstants.O_APPEND;
import static android.system.OsConstants.O_CREAT;
import static android.system.OsConstants.O_NONBLOCK;
import static android.system.OsConstants.O_RDONLY;
import static android.system.OsConstants.O_TRUNC;
import static android.system.OsConstants.O_WRONLY;
import static android.system.OsConstants.SEEK_CUR;
import static android.system.OsConstants.SEEK_END;
@ -25,33 +28,21 @@ import static android.system.OsConstants.SEEK_SET;
import android.annotation.SuppressLint;
import android.os.Binder;
import android.os.Build;
import android.os.IBinder;
import android.os.RemoteException;
import android.system.ErrnoException;
import android.system.Int64Ref;
import android.system.Os;
import android.system.OsConstants;
import android.system.StructStat;
import android.util.LruCache;
import android.util.MutableLong;
import android.util.SparseArray;
import androidx.annotation.NonNull;
import java.io.Closeable;
import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class FileSystemService extends IFileSystemService.Stub {
// This is only for testing purpose
private static final boolean FORCE_NO_SPLICE = false;
private static final int PIPE_CAPACITY = 16 * 4096;
static final int PIPE_CAPACITY = 16 * 4096;
private final LruCache<String, File> mCache = new LruCache<String, File>(100) {
@Override
@ -220,110 +211,8 @@ class FileSystemService extends IFileSystemService.Stub {
// I/O APIs
static class FileHolder implements Closeable {
FileDescriptor fd;
FileDescriptor read;
FileDescriptor write;
private ByteBuffer buf;
private StructStat st;
@Override
public void close() {
if (fd != null) {
try { Os.close(fd); } catch (ErrnoException ignored) {}
fd = null;
}
if (read != null) {
try { Os.close(read); } catch (ErrnoException ignored) {}
read = null;
}
if (write != null) {
try { Os.close(write); } catch (ErrnoException ignored) {}
write = null;
}
}
void ensureOpen() throws ClosedChannelException {
if (fd == null || read == null || write == null)
throw new ClosedChannelException();
}
synchronized ByteBuffer getBuf() {
if (buf == null)
buf = ByteBuffer.allocateDirect(PIPE_CAPACITY);
buf.clear();
return buf;
}
synchronized StructStat getStat() throws ErrnoException {
if (st == null)
st = Os.fstat(fd);
return st;
}
}
static class FileContainer {
private final static String ERROR_MSG = "Requested file was not opened!";
private int nextHandle = 0;
// pid -> handle -> holder
private final SparseArray<SparseArray<FileHolder>> files = new SparseArray<>();
@NonNull
synchronized FileHolder get(int handle) throws IOException {
int pid = Binder.getCallingPid();
SparseArray<FileHolder> pidFiles = files.get(pid);
if (pidFiles == null)
throw new IOException(ERROR_MSG);
FileHolder h = pidFiles.get(handle);
if (h == null)
throw new IOException(ERROR_MSG);
return h;
}
synchronized int put(FileHolder h) {
int pid = Binder.getCallingPid();
SparseArray<FileHolder> pidFiles = files.get(pid);
if (pidFiles == null) {
pidFiles = new SparseArray<>();
files.put(pid, pidFiles);
}
int handle = nextHandle++;
pidFiles.append(handle, h);
return handle;
}
synchronized void remove(int handle) {
int pid = Binder.getCallingPid();
SparseArray<FileHolder> pidFiles = files.get(pid);
if (pidFiles == null)
return;
FileHolder h = pidFiles.get(handle);
if (h == null)
return;
pidFiles.remove(handle);
synchronized (h) {
h.close();
}
}
synchronized void pidDied(int pid) {
SparseArray<FileHolder> pidFiles = files.get(pid);
if (pidFiles == null)
return;
files.remove(pid);
for (int i = 0; i < pidFiles.size(); ++i) {
FileHolder h = pidFiles.valueAt(i);
synchronized (h) {
h.close();
}
}
}
}
private final FileContainer openFiles = new FileContainer();
private final ExecutorService ioPool = Executors.newCachedThreadPool();
@Override
public void register(IBinder client) {
@ -335,7 +224,7 @@ class FileSystemService extends IFileSystemService.Stub {
@SuppressWarnings("OctalInteger")
@Override
public ParcelValues open(String path, int mode, String fifo) {
public ParcelValues openChannel(String path, int mode, String fifo) {
ParcelValues values = new ParcelValues();
values.add(null);
FileHolder h = new FileHolder();
@ -351,6 +240,58 @@ class FileSystemService extends IFileSystemService.Stub {
return values;
}
@Override
public ParcelValues openReadStream(String path, String fifo) {
ParcelValues values = new ParcelValues();
values.add(null);
FileHolder h = new FileHolder();
try {
h.fd = Os.open(path, O_RDONLY, 0);
ioPool.execute(() -> {
synchronized (h) {
try {
h.write = Os.open(fifo, O_WRONLY, 0);
while (h.pread(PIPE_CAPACITY, -1) > 0);
} catch (ErrnoException | IOException ignored) {
} finally {
h.close();
}
}
});
} catch (ErrnoException e) {
values.set(0, e);
h.close();
}
return values;
}
@SuppressWarnings("OctalInteger")
@Override
public ParcelValues openWriteStream(String path, String fifo, boolean append) {
ParcelValues values = new ParcelValues();
values.add(null);
FileHolder h = new FileHolder();
try {
int mode = O_CREAT | O_WRONLY | (append ? O_APPEND : O_TRUNC);
h.fd = Os.open(path, mode, 0666);
ioPool.execute(() -> {
synchronized (h) {
try {
h.read = Os.open(fifo, O_RDONLY, 0);
while (h.pwrite(PIPE_CAPACITY, -1, false) > 0);
} catch (ErrnoException | IOException ignored) {
} finally {
h.close();
}
}
});
} catch (ErrnoException e) {
values.set(0, e);
h.close();
}
return values;
}
@Override
public void close(int handle) {
openFiles.remove(handle);
@ -362,37 +303,9 @@ class FileSystemService extends IFileSystemService.Stub {
values.add(null);
try {
final FileHolder h = openFiles.get(handle);
final long result;
synchronized (h) {
h.ensureOpen();
if (!FORCE_NO_SPLICE && Build.VERSION.SDK_INT >= 28) {
Int64Ref inOff = offset < 0 ? null : new Int64Ref(offset);
result = FileUtils.splice(h.fd, inOff, h.write, null, len, 0);
} else {
StructStat st = h.getStat();
if (OsConstants.S_ISREG(st.st_mode) || OsConstants.S_ISBLK(st.st_mode)) {
// sendfile only supports reading from mmap-able files
MutableLong inOff = offset < 0 ? null : new MutableLong(offset);
result = FileUtils.sendfile(h.write, h.fd, inOff, len);
} else {
// Fallback to copy into internal buffer
ByteBuffer buf = h.getBuf();
buf.limit(Math.min(len, buf.capacity()));
if (offset < 0) {
Os.read(h.fd, buf);
} else {
Os.pread(h.fd, buf, offset);
}
buf.flip();
result = buf.remaining();
// Need to write all bytes
for (int sz = (int) result; sz > 0;) {
sz -= Os.write(h.write, buf);
}
}
}
values.add(h.pread(len, offset));
}
values.add((int) result);
} catch (IOException | ErrnoException e) {
values.set(0, e);
}
@ -406,35 +319,7 @@ class FileSystemService extends IFileSystemService.Stub {
try {
final FileHolder h = openFiles.get(handle);
synchronized (h) {
h.ensureOpen();
if (!FORCE_NO_SPLICE && Build.VERSION.SDK_INT >= 28) {
Int64Ref outOff = offset < 0 ? null : new Int64Ref(offset);
int sz = len;
while (sz > 0) {
// Need to write exactly len bytes
sz -= FileUtils.splice(h.read, null, h.fd, outOff, sz, 0);
}
} else {
// Unfortunately, sendfile does not allow reading from pipes.
// Manually read into an internal buffer then write to output.
ByteBuffer buf = h.getBuf();
int sz = 0;
buf.limit(len);
// Need to read and write exactly len bytes
while (len > sz) {
sz += Os.read(h.read, buf);
}
buf.flip();
while (sz > 0) {
if (offset < 0) {
sz -= Os.write(h.fd, buf);
} else {
int w = Os.pwrite(h.fd, buf, offset);
sz -= w;
offset += w;
}
}
}
h.pwrite(len, offset, true);
}
} catch (IOException | ErrnoException e) {
values.set(0, e);

View File

@ -16,22 +16,18 @@
package com.topjohnwu.superuser.internal;
import static android.os.ParcelFileDescriptor.MODE_APPEND;
import static android.os.ParcelFileDescriptor.MODE_CREATE;
import static android.os.ParcelFileDescriptor.MODE_READ_ONLY;
import static android.os.ParcelFileDescriptor.MODE_TRUNCATE;
import static android.os.ParcelFileDescriptor.MODE_WRITE_ONLY;
import android.os.RemoteException;
import android.system.ErrnoException;
import android.system.OsConstants;
import androidx.annotation.NonNull;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
class RemoteFile extends FileImpl<RemoteFile> {
@ -342,16 +338,33 @@ class RemoteFile extends FileImpl<RemoteFile> {
@Override
public InputStream newInputStream() throws IOException {
return Channels.newInputStream(new RemoteFileChannel(fs, this, MODE_READ_ONLY));
File fifo = null;
try {
fifo = FileUtils.createTempFIFO();
FileUtils.checkException(fs.openReadStream(getPath(), fifo.getPath()));
return new FileInputStream(fifo);
} catch (RemoteException | ErrnoException e) {
throw new IOException(e);
} finally {
// Once both sides opened the pipe, it can be unlinked
if (fifo != null)
fifo.delete();
}
}
@Override
public OutputStream newOutputStream(boolean append) throws IOException {
int mode = MODE_WRITE_ONLY | MODE_CREATE;
if (append)
mode |= MODE_APPEND;
else
mode |= MODE_TRUNCATE;
return Channels.newOutputStream(new RemoteFileChannel(fs, this, mode));
File fifo = null;
try {
fifo = FileUtils.createTempFIFO();
FileUtils.checkException(fs.openWriteStream(getPath(), fifo.getPath(), append));
return new FileOutputStream(fifo);
} catch (RemoteException | ErrnoException e) {
throw new IOException(e);
} finally {
// Once both sides opened the pipe, it can be unlinked
if (fifo != null)
fifo.delete();
}
}
}

View File

@ -65,7 +65,7 @@ class RemoteFileChannel extends FileChannel {
// Open the file on the remote process
int posixMode = FileUtils.modeToPosix(mode);
handle = FileUtils.tryAndGet(fs.open(file.getAbsolutePath(), posixMode, fifo.getPath()));
handle = FileUtils.tryAndGet(fs.openChannel(file.getAbsolutePath(), posixMode, fifo.getPath()));
// Since we do not have the machinery to interrupt native pthreads, we
// have to make sure none of our I/O can block in all operations.