Update progress.py (#88)

* Speed up DLL function analysis by ~30%

* Fix Dockerfile when building with existing user

* Update progress.py to count DLL functions and emit data for progress site

* Add shield in README for progress

* Fix progress.py executable bit

* Add new shields to README
This commit is contained in:
Ethan Lafrenais 2022-05-10 21:25:12 -04:00 committed by GitHub
parent c3c9dceaf3
commit 2388f299bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 343 additions and 144 deletions

View File

@ -16,10 +16,10 @@ RUN pip3 install -r requirements.txt
# Symlink dino.py
RUN ln -s /dino/dino.py /usr/local/bin/dino
# Set up user
# Set up user (if they don't exist)
ARG login=sabre
ARG uid=1000
RUN adduser --system --uid $uid --group $login
RUN id -u $uid &>/dev/null || adduser --system --uid $uid --group $login
# Set entrypoint
RUN echo "#!/bin/bash\nexec \"\$@\"" > /entrypoint.sh && chmod +x /entrypoint.sh

View File

@ -1,6 +1,9 @@
![Dinosaur Planet Decompilation](docs/banner.png)
[![](https://img.shields.io/badge/Discord-Dinosaur%20Planet%20Community-5865F2?logo=discord)](https://discord.gg/H6WGkznZBc)
[![](https://img.shields.io/badge/Discord-Dinosaur%20Planet%20Community-5865F2?logo=discord)](https://discord.gg/H6WGkznZBc)
![](https://img.shields.io/endpoint?url=https%3A%2F%2Fraw.githubusercontent.com%2FFrancessco121%2Fdino-status%2Fgh-pages%2Ftotal.shield.json)
![](https://img.shields.io/endpoint?url=https%3A%2F%2Fraw.githubusercontent.com%2FFrancessco121%2Fdino-status%2Fgh-pages%2Fcore.shield.json)
![](https://img.shields.io/endpoint?url=https%3A%2F%2Fraw.githubusercontent.com%2FFrancessco121%2Fdino-status%2Fgh-pages%2Fdlls.shield.json)
A WIP decompilation of Dinosaur Planet for the Nintendo 64, as released by Forest of Illusion on Feb. 20, 2021.

View File

@ -1,7 +1,6 @@
ansiwrap
capstone
colorama>=0.4.4,<0.5
colour
cxxfilt
GitPython
ninja

View File

@ -185,6 +185,8 @@ class DLLRelocationTable:
return DLLRelocationTable(True, global_offset_table, gp_relocations, data_relocations)
class DLLInst:
label: "str | None" = None
def __init__(self,
original: CsInsn,
address: int,
@ -192,7 +194,6 @@ class DLLInst:
op_str: str,
is_branch_delay_slot: bool,
has_relocation: bool,
label: "str | None",
ref: "str | None") -> None:
self.original = original
self.address = address
@ -200,7 +201,6 @@ class DLLInst:
self.op_str = op_str
self.is_branch_delay_slot = is_branch_delay_slot
self.has_relocation = has_relocation
self.label = label
self.ref = ref
def is_op_modified(self):
@ -232,12 +232,6 @@ class DLLFunction:
self.relocations = relocations
"""All instruction relocations in the function, sorted by their position in the original DLL's GOT."""
def __mnemonic_has_delay_slot(mnemonic: str) -> bool:
return (mnemonic.startswith("b") or mnemonic.startswith("j")) and mnemonic != "break"
def __mnemonic_is_branch(mnemonic: str) -> bool:
return (mnemonic.startswith("b") or mnemonic == "j") and mnemonic != "break"
def parse_functions(data: bytearray,
dll: DLL,
reloc_table: DLLRelocationTable,
@ -256,15 +250,6 @@ def parse_functions(data: bytearray,
md = Cs(CS_ARCH_MIPS, CS_MODE_MIPS64 + CS_MODE_BIG_ENDIAN)
insts = [i for i in md.disasm(data[header.size:text_end], 0x0)]
# Extract all branches
branches: "list[tuple[int, int]]" = []
branch_dests: "set[int]" = set()
for i in insts:
if __mnemonic_is_branch(i.mnemonic):
branch_target = int(i.op_str.split(" ")[-1], 0)
branches.append((i.address, branch_target))
branch_dests.add(branch_target)
# Extract functions
funcs: "list[DLLFunction]" = []
cur_func_insts: "list[DLLInst]" = []
@ -275,22 +260,54 @@ def parse_functions(data: bytearray,
cur_func_auto_syms: "OrderedDict[str, int]" = OrderedDict()
cur_func_relocs: "list[DLLRelocation]" = []
cur_func_inst_index = 0
cur_func_branch_dests: "list[int]" = []
cur_func_forward_branches: "set[int]" = set()
def add_function():
if cur_func_name == "":
return
# Discard trailing nops
for idx in range(len(cur_func_insts) - 1, 0, -1):
i = cur_func_insts[idx]
if i.mnemonic == "nop" and not i.is_branch_delay_slot:
cur_func_insts.pop(idx)
else:
break
# Ensure function ends with jr $ra
# Otherwise, it's not a function
if len(cur_func_insts) >= 2:
jr = cur_func_insts[-2] # -2 to account for the delay slot after jr
if jr.mnemonic != "jr" or jr.op_str != "$ra":
return
# Sort relocations by GOT index
cur_func_relocs.sort(key=lambda r: r.got_index)
# Add branch labels
for addr in cur_func_branch_dests:
idx = (addr - cur_func_addr) // 4
if idx >= 0 and idx < len(cur_func_insts):
cur_func_insts[idx].label = ".L{:X}".format(addr)
# Add function
funcs.append(DLLFunction(
insts=cur_func_insts,
address=cur_func_addr,
symbol=cur_func_name,
is_static=cur_func_is_static,
auto_symbols=cur_func_auto_syms,
relocations=cur_func_relocs
))
for idx, i in enumerate(insts):
# Check if this instruction is a branch delay slot of the previous instruction
is_delay_slot = last_mnemonic is not None and __mnemonic_has_delay_slot(last_mnemonic)
if new_func and i.mnemonic != "nop" and not is_delay_slot:
# Add previous function
if cur_func_name != "":
cur_func_relocs.sort(key=lambda r: r.got_index)
funcs.append(DLLFunction(
insts=cur_func_insts,
address=cur_func_addr,
symbol=cur_func_name,
is_static=cur_func_is_static,
auto_symbols=cur_func_auto_syms,
relocations=cur_func_relocs
))
add_function()
# New function, determine name and type
if i.address == header.ctor_offset:
@ -306,8 +323,10 @@ def parse_functions(data: bytearray,
cur_func_has_gp_init = False
cur_func_auto_syms = OrderedDict()
cur_func_relocs = []
new_func = False
cur_func_branch_dests = []
cur_func_forward_branches = set()
cur_func_inst_index = 0
new_func = False
# Pre-process instruction
mnemonic = i.mnemonic
@ -322,6 +341,12 @@ def parse_functions(data: bytearray,
branch_target = int(operands[-1], 0)
op_label = ".L{:X}".format(branch_target)
op_str = ", ".join(operands[:-1] + [op_label])
# Save target
cur_func_branch_dests.append(branch_target)
# If the branch target is ahead of this instruction, save it to assist in
# detecting the function end
if branch_target > i.address:
cur_func_forward_branches.add(branch_target)
elif cur_func_inst_index < 2 and num_operands > 0 and operands[0] == "$gp":
# Add _gp_disp to $gp initializer stub
# Note: The $gp initializer stub gets modified when compiled,
@ -349,7 +374,7 @@ def parse_functions(data: bytearray,
got_index = offset // 4
symbol_addr = reloc_table.global_offset_table[got_index]
# Determine if this is a CALL16 or GOT16 relocation
is_call16 = is_reloc_call16(idx, insts)
is_call16 = __is_reloc_call16(idx, insts)
# Make symbol
if got_index == 0:
symbol = ".text"
@ -397,15 +422,6 @@ def parse_functions(data: bytearray,
mnemonic = "addu"
else:
raise NotImplementedError(f"INVALID INSTRUCTION {i} {opcode}")
elif mnemonic in ["mtc0", "mfc0", "mtc2", "mfc2"]:
# TODO: what is this doing?
rd = (i.bytes[2] & 0xF8) >> 3
op_str = op_str.split(" ")[0] + " $" + str(rd)
# Determine whether this instruction address is branched to
label: "str | None" = None
if i.address in branch_dests:
label = ".L{:X}".format(i.address)
# Add instruction
cur_func_insts.append(DLLInst(
@ -414,40 +430,35 @@ def parse_functions(data: bytearray,
mnemonic=mnemonic,
op_str=op_str,
is_branch_delay_slot=is_delay_slot,
label=label,
ref=ref,
has_relocation=has_relocation
))
# If we reached a branch target, pop it
if i.address in cur_func_forward_branches:
cur_func_forward_branches.remove(i.address)
# Check for function end
# TODO: this is very slow for large functions
if mnemonic == "jr" and i.op_str == "$ra":
if mnemonic == "jr" and i.op_str == "$ra" and len(cur_func_forward_branches) == 0:
# Reached a jr $ra and we're not inside of a branch, must be the function end
new_func = True
for branch in branches:
if (branch[0] > i.address and branch[1] <= i.address) or (branch[0] <= i.address and branch[1] > i.address):
# jr falls within a known branch, so there's more to this function
new_func = False
break
# Track last instruction
last_mnemonic = mnemonic
cur_func_inst_index += 1
# Add final function
if cur_func_name != "":
cur_func_relocs.sort(key=lambda r: r.got_index)
funcs.append(DLLFunction(
insts=cur_func_insts,
address=cur_func_addr,
symbol=cur_func_name,
is_static=cur_func_is_static,
auto_symbols=cur_func_auto_syms,
relocations=cur_func_relocs
))
add_function()
return funcs
def is_reloc_call16(idx: int, insts: "list[CsInsn]") -> bool:
def __mnemonic_has_delay_slot(mnemonic: str) -> bool:
return (mnemonic.startswith("b") or mnemonic.startswith("j")) and mnemonic != "break"
def __mnemonic_is_branch(mnemonic: str) -> bool:
return (mnemonic.startswith("b") or mnemonic == "j") and mnemonic != "break"
def __is_reloc_call16(idx: int, insts: "list[CsInsn]") -> bool:
# GOT value must be stored in $t9
if not insts[idx].op_str.startswith("$t9"):
return False

350
tools/progress.py Normal file → Executable file
View File

@ -1,119 +1,305 @@
#!/usr/bin/env python3
# Computes and displays progress for the decompilation project
import argparse
import git
from git.repo import Repo
from glob import glob
from io import TextIOWrapper
import json
import os
from pathlib import Path
import re
import subprocess
import sys
from colour import Color
def set_version(version):
global script_dir, root_dir, asm_dir, build_dir, elf_path
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.join(script_dir, "..")
asm_dir = os.path.join(root_dir, "asm", "nonmatchings")
build_dir = os.path.join(root_dir, "build")
elf_path = os.path.join(build_dir, "dino.elf")
from dino.dll import DLL
def get_func_sizes():
SCRIPT_DIR = Path(os.path.dirname(os.path.realpath(__file__)))
ROOT_DIR = Path(os.path.abspath(os.path.join(SCRIPT_DIR, "..")))
ASM_PATH = ROOT_DIR.joinpath("asm")
BIN_PATH = ROOT_DIR.joinpath("bin")
BUILD_PATH = ROOT_DIR.joinpath("build")
SRC_PATH = ROOT_DIR.joinpath("src")
symbol_pattern = re.compile(r"(\S+)\s*=\s*(\S+);")
class DLLProgress:
def __init__(self,
number: str,
total_bytes: int,
total_funcs: int,
matching_bytes: int,
matching_funcs: int) -> None:
self.number = number
self.total_bytes = total_bytes
self.total_funcs = total_funcs
self.matching_bytes = matching_bytes
self.matching_funcs = matching_funcs
class CoreProgress:
def __init__(self,
total_bytes: int,
total_funcs: int,
matching_bytes: int,
matching_funcs: int) -> None:
self.total_bytes = total_bytes
self.total_funcs = total_funcs
self.matching_bytes = matching_bytes
self.matching_funcs = matching_funcs
class OverallProgress:
def __init__(self,
core: CoreProgress,
dlls: "list[DLLProgress]") -> None:
self.core = core
self.dlls = dlls
# Compute total DLL progress
self.dll_total_bytes = 0
self.dll_total_funcs = 0
self.dll_matching_bytes = 0
self.dll_matching_funcs = 0
for progress in dlls:
self.dll_total_bytes += progress.total_bytes
self.dll_total_funcs += progress.total_funcs
self.dll_matching_bytes += progress.matching_bytes
self.dll_matching_funcs += progress.matching_funcs
# Compute overall progress
self.total_bytes = core.total_bytes + self.dll_total_bytes
self.total_funcs = core.total_funcs + self.dll_total_funcs
self.matching_bytes = core.matching_bytes + self.dll_matching_bytes
self.matching_funcs = core.matching_funcs + self.dll_matching_funcs
# Compute ratios
self.core_matching_funcs_ratio = core.matching_funcs / core.total_funcs
self.core_matching_bytes_ratio = core.matching_bytes / core.total_bytes
self.dll_matching_funcs_ratio = self.dll_matching_funcs / self.dll_total_funcs
self.dll_matching_bytes_ratio = self.dll_matching_bytes / self.dll_total_bytes
self.matching_funcs_ratio = self.matching_funcs / self.total_funcs
self.matching_bytes_ratio = self.matching_bytes / self.total_bytes
def get_core_func_sizes(elf_path: Path) -> "tuple[dict[str, int], int]":
# Get functions and their sizes from the given .elf
try:
result = subprocess.run(['objdump', '-x', elf_path], stdout=subprocess.PIPE)
nm_lines = result.stdout.decode().split("\n")
result = subprocess.run(['mips-linux-gnu-readelf', '--symbols', elf_path], stdout=subprocess.PIPE)
lines = result.stdout.decode().split("\n")
except:
print(f"Error: Could not run objdump on {elf_path} - make sure that the project is built")
print(f"Error: Could not run mips-linux-gnu-readelf on {elf_path} - make sure that the project is built")
sys.exit(1)
sizes = {}
total = 0
for line in nm_lines:
if " F " in line:
components = line.split()
size = int(components[4], 16)
name = components[5]
for line in [l for l in lines if "FUNC" in l]:
components = line.split()
size = int(components[2])
name = components[7]
# Include asm functions (which have a size of 0),
# but exclude branch labels (which also count as funcs and have a size of 0)
if size > 0 or not name.startswith("L8"):
total += size
sizes[name] = size
return sizes, total
def get_nonmatching_funcs():
def get_core_nonmatching_funcs() -> "set[str]":
nonmatching_path = ASM_PATH.joinpath("nonmatchings")
funcs = set()
for root, dirs, files in os.walk(asm_dir):
for f in files:
if f.endswith(".s"):
funcs.add(f[:-2])
for asm_path in nonmatching_path.rglob("*.s"):
# Skip DLL nonmatchings
if asm_path.relative_to(nonmatching_path).parts[0] == "dlls":
continue
# Add
funcs.add(asm_path.stem)
return funcs
def get_funcs_sizes(sizes, matchings, nonmatchings):
msize = 0
nmsize = 0
for func in matchings:
msize += sizes[func]
for func in nonmatchings:
if func not in sizes:
pass
# print(func)
else:
nmsize += sizes[func]
return msize, nmsize
def lerp(a, b, alpha):
return a + (b - a) * alpha
def main(args):
set_version(args.version)
func_sizes, total_size = get_func_sizes()
def get_core_progress() -> CoreProgress:
# Get all core functions and their sizes from the final .elf
dino_elf_path = BUILD_PATH.joinpath("dino.elf")
func_sizes, total_bytes = get_core_func_sizes(dino_elf_path)
all_funcs = set(func_sizes.keys())
nonmatching_funcs = get_nonmatching_funcs()
# Get nonmatching functions
nonmatching_funcs = get_core_nonmatching_funcs()
# Compute matching amount
matching_funcs = all_funcs - nonmatching_funcs
matching_bytes = 0
for func in matching_funcs:
matching_bytes += func_sizes[func]
matching_size, nonmatching_size = get_funcs_sizes(func_sizes, matching_funcs, nonmatching_funcs)
# Done
return CoreProgress(
total_bytes=total_bytes,
total_funcs=len(all_funcs),
matching_bytes=matching_bytes,
matching_funcs=len(matching_funcs)
)
if len(all_funcs) == 0:
funcs_matching_ratio = 0.0
matching_ratio = 0.0
def read_dll_symbols_txt(path: Path) -> "dict[int, str]":
symbols: "dict[int, str]" = {}
with open(path, "r", encoding="utf-8") as syms_file:
for line in syms_file.readlines():
pairs = symbol_pattern.findall(line.strip())
for pair in pairs:
addr_str: str = pair[1]
if addr_str.lower().startswith("0x"):
addr = int(addr_str, base=16)
else:
addr = int(addr_str)
symbols[addr] = pair[0]
return symbols
def get_dll_progress(dll_path: Path, number: str) -> DLLProgress:
known_symbols: "dict[int, str]" = {}
nonmatching_funcs: "set[str]" = set()
has_src = False
# To determine progress we need to check if the DLL has a src directory
# If it does, we need its syms.txt and we need to check the respective asm/nonmatchings directory
syms_path = SRC_PATH.joinpath(f"dlls/{number}/syms.txt")
if syms_path.exists():
has_src = True
# Get a list of known symbols for the DLL (we need the function symbols)
known_symbols = read_dll_symbols_txt(syms_path)
# Get list of functions that aren't matching
nonmatchings_dir = ASM_PATH.joinpath(f"nonmatchings/dlls/{number}")
if nonmatchings_dir.exists():
for asm_file in nonmatchings_dir.iterdir():
if asm_file.name.endswith(".s"):
nonmatching_funcs.add(asm_file.name[:-2])
# Get all DLL functions and their sizes
with open(dll_path, "rb") as dll_file:
dll = DLL.parse(bytearray(dll_file.read()), number, include_funcs=True, known_symbols=known_symbols)
assert dll.functions is not None
func_sizes: "dict[str, int]" = {}
total_bytes = 0
for func in dll.functions:
size = len(func.insts) * 4
func_sizes[func.symbol] = size
total_bytes += size
# Compute matching amounts
if has_src:
matching_funcs = set(func_sizes.keys()) - nonmatching_funcs
matching_bytes = 0
for func in matching_funcs:
matching_bytes += func_sizes[func]
else:
funcs_matching_ratio = (len(matching_funcs) / len(all_funcs)) * 100
matching_ratio = (matching_size / total_size) * 100
matching_funcs = []
matching_bytes = 0
if args.csv:
version = 1
git_object = git.Repo().head.object
timestamp = str(git_object.committed_date)
git_hash = git_object.hexsha
csv_list = [str(version), timestamp, git_hash, str(len(all_funcs)), str(len(nonmatching_funcs)),
str(len(matching_funcs)), str(total_size), str(nonmatching_size), str(matching_size)]
print(",".join(csv_list))
elif args.shield_json:
import json
# Done
return DLLProgress(
number,
total_bytes=total_bytes,
total_funcs=len(func_sizes),
matching_bytes=matching_bytes,
matching_funcs=len(matching_funcs)
)
# https://shields.io/endpoint
color = Color("#50ca22", hue=lerp(0, 105/255, matching_ratio / 100))
print(json.dumps({
"schemaVersion": 1,
"label": f"progress ({args.version})",
"message": f"{matching_ratio:.2f}%",
"color": color.hex,
}))
else:
if matching_size + nonmatching_size != total_size:
print("Warning: category/total size mismatch!\n")
print(f"{len(matching_funcs)} matched functions / {len(all_funcs)} total ({funcs_matching_ratio:.2f}%)")
print(f"{matching_size} matching bytes / {total_size} total ({matching_ratio:.2f}%)")
def get_all_dll_progress() -> "list[DLLProgress]":
dlls_dir = BIN_PATH.joinpath("assets/dlls")
progress: "list[DLLProgress]" = []
# Get progress of each .dll asset
for dll_path in [Path(p) for p in glob(f"{dlls_dir}/*.dll")]:
number = dll_path.name.split(".")[0]
progress.append(get_dll_progress(dll_path, number))
return progress
def get_overall_progress() -> OverallProgress:
# Get core progress
core = get_core_progress()
# Get DLL progress
dlls = get_all_dll_progress()
# Return overall
return OverallProgress(core, dlls)
def output_json(p: OverallProgress, file: TextIOWrapper):
# Get current commit info
repo = Repo()
git_head_obj = repo.head.object
git_commit_hash = git_head_obj.hexsha
git_commit_hash_short = repo.git.rev_parse(git_commit_hash, short=7)
git_commit_timestamp = git_head_obj.committed_date
# Build JSON data
data = {
"total": {
"matching_ratio": p.matching_bytes_ratio,
"matching_funcs": p.matching_funcs,
"matching_bytes": p.matching_bytes,
"total_funcs": p.total_funcs,
"total_bytes": p.total_bytes,
},
"core": {
"matching_ratio": p.core_matching_bytes_ratio,
"matching_funcs": p.core.matching_funcs,
"matching_bytes": p.core.matching_bytes,
"total_funcs": p.core.total_funcs,
"total_bytes": p.core.total_bytes,
},
"dll": {
"matching_ratio": p.dll_matching_bytes_ratio,
"matching_funcs": p.dll_matching_funcs,
"matching_bytes": p.dll_matching_bytes,
"total_funcs": p.dll_total_funcs,
"total_bytes": p.dll_total_bytes,
},
"git": {
"commit_hash": git_commit_hash,
"commit_hash_short": git_commit_hash_short,
"commit_timestamp": git_commit_timestamp
},
}
# Output
json.dump(data, file, indent=2)
def print_progress(p: OverallProgress):
print(f"{p.core.matching_funcs} matched core functions / {p.core.total_funcs} total ({p.core_matching_funcs_ratio * 100:.2f}%)")
print(f"{p.core.matching_bytes} matching core bytes / {p.core.total_bytes} total ({p.core_matching_bytes_ratio * 100:.2f}%)")
print()
print(f"{p.dll_matching_funcs} matched DLL functions / {p.dll_total_funcs} total ({p.dll_matching_funcs_ratio * 100:.2f}%)")
print(f"{p.dll_matching_bytes} matching DLL bytes / {p.dll_total_bytes} total ({p.dll_matching_bytes_ratio * 100:.2f}%)")
print()
print(f"{p.matching_funcs} matched overall functions / {p.total_funcs} total ({p.matching_funcs_ratio * 100:.2f}%)")
print(f"{p.matching_bytes} matching overall bytes / {p.total_bytes} total ({p.matching_bytes_ratio * 100:.2f}%)")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Reports progress for the project")
parser.add_argument("version", default="current", nargs="?")
parser.add_argument("--csv", action="store_true")
parser.add_argument("--shield-json", action="store_true")
args = parser.parse_args()
parser = argparse.ArgumentParser(description="Computes and reports progress for the project.")
parser.add_argument("-q", "--quiet", action="store_true", help="Don't print messages to stdout.", default=False)
parser.add_argument("--json", type=argparse.FileType("w", encoding="utf-8"), help="File to write the current progress to as JSON.")
main(args)
args = parser.parse_args()
# Compute progress
if not args.quiet:
print("Calculating progress...")
progress = get_overall_progress()
# Emit JSON
if args.json:
with args.json as json_file:
output_json(progress, json_file)
# Print progress
if not args.quiet:
print()
print_progress(progress)