diff --git a/CMakeLists.txt b/CMakeLists.txt index a6f327c..e5fd3bc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -67,7 +67,7 @@ target_include_directories(${DECODER_CLI_NAME} PRIVATE src) if (BALLISTIC_ENABLE_BUILD_TESTS) set(TESTS_NAME "ballistic_tests") enable_testing() - add_executable(${TESTS_NAME} tests/test_decoder.c) + add_executable(${TESTS_NAME} tests/test_decoder_fuzzer.c) target_link_libraries(${TESTS_NAME} PRIVATE ${PROJECT_NAME}) target_include_directories(${TESTS_NAME} PRIVATE src) add_test(NAME DecoderTest COMMAND ${TESTS_NAME}) diff --git a/tests/test_decoder.c b/tests/test_decoder.c deleted file mode 100644 index 26485a8..0000000 --- a/tests/test_decoder.c +++ /dev/null @@ -1,60 +0,0 @@ -#include "decoder.h" -#include -#include - -typedef struct -{ - uint32_t machine_code; - const char *expected_mnemonic; -} test_case_t; - -int -main (void) -{ - printf("Starting Decoder Tests...\n"); - test_case_t tests[] = { - { 0xD503201F, "NOP" }, - { 0x8B020020, "ADD" }, - { 0x00000000, "UDF" }, - { 0XD65F03C0, "RET" }, - { 0x17fffffF, "B"}, // Regular Branch - { 0xf9400108, "LDR"}, - { 0x54000302, "B"}, // Branch if Carry - }; - size_t num_tests = sizeof(tests) / sizeof(tests[0]); - int failed = 0; - - for (size_t i = 0; i < num_tests; ++i) - { - uint32_t code = tests[i].machine_code; - const char *mnemonic = tests[i].expected_mnemonic; - const bal_decoder_instruction_metadata_t *result - = bal_decoder_arm64_decode(code); - - if (NULL == result) - { - printf("[FAIL] %08x: Expected %s, got NULL\n", code, mnemonic); - ++failed; - continue; - } - - if (strcmp(result->name, mnemonic) != 0) - { - printf("[FAIL] %08X: Expected %s, got %s\n", code, mnemonic, - result->name); - ++failed; - } - else - { - printf("[PASS] %08X: %s\n", code, result->name); - } - } - - if (failed > 0) - { - printf("FAILED %d tests. \n", failed); - return 1; - } - printf("All tests passed.\n"); - return 0; -} diff --git a/tests/test_decoder_fuzzer.c b/tests/test_decoder_fuzzer.c new file mode 100644 index 0000000..51c3907 --- /dev/null +++ b/tests/test_decoder_fuzzer.c @@ -0,0 +1,72 @@ +#include "decoder.h" +#include +#include +#include +#include +#include +#include + +static uint32_t rng_state = 0x87654321; + +/* + * Seeding function to set the initial state. + */ +static void +fast_srand (uint32_t seed) +{ + if (seed == 0) + { + seed = 0x87654321; + } + rng_state = seed; +} +/* + * Xorshift32 Algorithm + */ +static inline uint32_t +fast_rand (void) +{ + uint32_t x = rng_state; + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + rng_state = x; + return x; +} + +int +main (void) +{ + printf("Starting Decoder Fuzzer Test...\n"); + fast_srand((uint32_t)time(0)); + + int failed = 0; + for (size_t i = 0; i < 100000; ++i) + { + uint32_t random_instruction = fast_rand(); + const bal_decoder_instruction_metadata_t *meta + = bal_decoder_arm64_decode(random_instruction); + + if (NULL != meta) + { + if ((random_instruction & meta->mask) != meta->expected) + { + printf("[FAIL] %s, 0x%08x & 0x%08x != 0x%08x", + meta->name, + random_instruction, + meta->mask, + meta->expected); + ++failed; + continue; + } + } + } + + if (failed > 0) + { + printf("FAILED %d tests. \n", failed); + return 1; + } + printf("All tests passed.\n"); + return 0; +} diff --git a/tools/fuzz_decoder.py b/tools/fuzz_decoder.py deleted file mode 100644 index ab554aa..0000000 --- a/tools/fuzz_decoder.py +++ /dev/null @@ -1,137 +0,0 @@ -""" -If you built the entire project in a build/ folder at the root directory, -you can run this program like this: - -python3 fuzz_decoder.py ../build/decoder_cli -""" - -import os -import re -import subprocess -import argparse -import multiprocessing -from typing import List, Tuple - -MAX_ITER = 1_000 -BINARY_FILE = "fuzz_decoder.bin" - -# Global variable for the worker processes to access the binary path -DECODER_BIN_PATH = "" - - -def init_worker(binary_path): - """Initialize globals in worker processes.""" - global DECODER_BIN_PATH - DECODER_BIN_PATH = binary_path - - -def check_instruction(data: Tuple[int, str]) -> Tuple[bool, str]: - """ - Worker function to check a single instruction. - Input: (instruction_int_value, hex_string_for_cli) - Output: (success_bool, error_message_or_None) - """ - instr_val, hex_str = data - - proc = subprocess.run([DECODER_BIN_PATH, hex_str], capture_output=True) - - # Byte comparison faster than string. - output: bytes = proc.stdout - if output.strip() == b"UNDEFINED": - return True, None - - try: - out_str = output.decode("utf-8") - parts = out_str.split(" - ") - - mask = int(parts[1].split(": ")[1], 16) - expected = int(parts[2].split(": ")[1], 16) - - if (instr_val & mask) != expected: - mnemonic = parts[0].split(": ")[1].strip() - return False, ( - f"Integrity Violation For {mnemonic}: " - f"{hex_str} & {hex(mask)} != {hex(expected)}" - ) - - except (IndexError, ValueError) as e: - return False, f"Output Parsing Error for {hex_str}: {out_str} ({e})" - - return True, None - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Ballistic Decoder Fuzzer (Optimized)") - parser.add_argument("decoder_binary", help="Path to the decoder cli binary file") - parser.add_argument("--count", help="Instruction count", default=MAX_ITER, type=int) - args = parser.parse_args() - - print(f"Generating {args.count} instructions...") - numbers = [int.from_bytes(os.urandom(4), "big") for _ in range(args.count)] - - with open(BINARY_FILE, "wb") as f: - for number in numbers: - f.write(number.to_bytes(4, byteorder="little")) - - print(f"View with this command: objdump -D -b binary -m aarch64 {BINARY_FILE}") - print("Make sure binutils is installed on your system before running") - - print(f"Disassembling with objdump...") - result = subprocess.run( - [ - "objdump", - "-D", - "-b", - "binary", - "-m", - "aarch64", - "-M", - "no-aliases", - BINARY_FILE, - ], - capture_output=True, - text=True, - ) - - pattern = re.compile(r"^\s*[0-9a-fA-F]+:\s+([0-9a-fA-F]{8})\s+(\S+)") - work_items: List[Tuple[int, str]] = [] - - for line in result.stdout.splitlines(): - match = pattern.search(line) - if match: - raw_hex = match.group(1) - int_val = int(raw_hex, 16) - # Format as '0x...' string once here so we don't do it in the loop - hex_str = f"0x{raw_hex}" - work_items.append((int_val, hex_str)) - - cpu_count = os.cpu_count() or 4 - print(f"Fuzzing using {cpu_count} cores...") - - successful_count = 0 - failures = [] - - # Passing larger chunks to workers reduces IPC overhead - chunk_size = max(1, len(work_items) // (cpu_count * 4)) - - with multiprocessing.Pool( - processes=cpu_count, initializer=init_worker, initargs=(args.decoder_binary,) - ) as pool: - for success, error_msg in pool.imap_unordered( - check_instruction, work_items, chunksize=chunk_size - ): - if success: - successful_count += 1 - else: - failures.append(error_msg) - print(error_msg) # Print failures immediately - - # Progress report every 10% - if successful_count % (max(1, len(work_items) // 10)) == 0: - print(f"Progress: {successful_count}/{len(work_items)}") - - if not failures: - print(f"SUCCESS: Decoded all {successful_count} instructions successfully.") - else: - print(f"FAILURE: Found {len(failures)} violations.") - exit(1)