[WIP] Convert Bash scripts to Python #147 (#338)

* Convert most Bash scripts to Python

* Port cmake/install-share to python

* Some cleanup

* Replaced install-share in cmake file with fully working python version

* Attemp to fix compile error

* Fix compile for older python version

* Try calling python 3 instead

* Make install-share.py executable
Compilable decompiler version

* Decompiler script now runs successfully until unpacking

* Running until calling bin2llvmir now

* [skip ci] Integrate @silverbacknet changes + some fixes

* [skip ci] Use CmdRunner.run_cmd everywhere in retdec_decompiler
Small fixes and cleanup
Early out if an error occurs

* [skip ci] Latest fixes
add retdec_tests_runner.py

* [skip ci] Check that options are correct + cleanup and fixes

* [skip ci] Fixed various errors

* Try to fix running install-share script

* Should now work on every os

* Fix compile error

* Convert compile-yara to python

* [skip ci] Make test runner more portable

* [skip ci] Use correct code style

* [skip ci] Decompiler script now runs successfully

* Now generates the same output as the bash script

* Try fixing Travis on macOS

* Upgrade python instead

* Test scripts in travis

* Fix build

* Fix path

* Update build
Small cleanup

* Fix error in decompiler script

* Try to debug failure reason
Fix test runner
Use Python 3.5 on ubuntu

* Use newer Python version and fix some errors

* [skip ci] Little cleanup to make the code more clear
Don't parse_args twice

* [skip ci] First version of reimplementing logging

* [skip ci] Some fixes and cleanup

* [skip ci] Print memory usage, print output from unpacker, match code convention and some other fixes

* [skip ci] Fix crash when using cleanup option; fix crash when using color ida

* [skip ci] Fix --backend-aggressive-opts argument

* [skip ci] Fix error when file arch is followed by a comment

* [skip ci] Match Bash script more closely

* [skip ci] Fix a few comments

* [skip ci] Add some comments

* [skip ci] Add early type_extractor/gen_cstdlib_and_linux_jsons.py and type_extractor/gen_windows_and_windrivers_jsons.py version

* Try Unit tests

* Try to fix test

* Use absolute path instead

* [skip ci] Add check for python scripts

* scripts/retdec_decompiler.py: use output if specified via -o option
This commit is contained in:
Michael Kösel 2018-07-12 16:36:16 +02:00 committed by Peter Matula
parent 386abd59b2
commit 4de4f4984d
15 changed files with 3225 additions and 12 deletions

View File

@ -11,6 +11,7 @@ matrix:
addons:
apt:
sources:
- deadsnakes
- ubuntu-toolchain-r-test
packages:
- build-essential
@ -19,7 +20,7 @@ matrix:
- g++-4.9
- cmake
- perl
- python3
- python3.5
- flex
- bison
- autoconf
@ -55,7 +56,9 @@ install:
- if [ "$TRAVIS_OS_NAME" = "osx" ]; then brew install gnu-getopt; fi
# bash 4
- if [ "$TRAVIS_OS_NAME" = "osx" ]; then brew install bash; fi
# python 3
- if [ "$TRAVIS_OS_NAME" = "osx" ]; then brew upgrade python; fi
before_script:
- eval "${MATRIX_EVAL}"
# We need to use newer versions of Flex and Bison on MacOS X (the ones from Homebrew).
@ -79,18 +82,15 @@ script:
- cd ..
- rm -rf build
# Run unit tests.
- if [ "$TRAVIS_OS_NAME" = "osx" ]; then /usr/local/bin/bash retdec-install/bin/retdec-tests-runner.sh; fi
- if [ "$TRAVIS_OS_NAME" = "linux" ]; then ./retdec-install/bin/retdec-tests-runner.sh; fi
- python3 retdec-install/bin/retdec_tests_runner.py
# Run the decompilation script.
- if [ "$TRAVIS_OS_NAME" = "osx" ]; then /usr/local/bin/bash retdec-install/bin/retdec-decompiler.sh --help; fi
- if [ "$TRAVIS_OS_NAME" = "linux" ]; then ./retdec-install/bin/retdec-decompiler.sh --help; fi
- python3 retdec-install/bin/retdec_decompiler.py --help
# Run a simple decompilation.
- echo -e '#include <stdio.h>\n#include <stdlib.h>\nint main()\n{\n printf("hello world\\n");\n return 0;\n}\n' > hello-orig.c
- cat hello-orig.c
- gcc -m32 -o hello hello-orig.c
- ./hello
- if [ "$TRAVIS_OS_NAME" = "osx" ]; then /usr/local/bin/bash retdec-install/bin/retdec-decompiler.sh hello; fi
- if [ "$TRAVIS_OS_NAME" = "linux" ]; then ./retdec-install/bin/retdec-decompiler.sh hello; fi
- python3 retdec-install/bin/retdec_decompiler.py hello
- cat hello.c
- grep "int main(int argc, char \*\* argv)" hello.c

View File

@ -1,8 +1,16 @@
install(CODE "
execute_process(
COMMAND sh \"${CMAKE_SOURCE_DIR}/cmake/install-share.sh\" \"${CMAKE_INSTALL_PREFIX}\"
RESULT_VARIABLE INSTALL_SHARE_RES
)
if (WIN32)
execute_process(
COMMAND py -3 \"${CMAKE_SOURCE_DIR}/cmake/install-share.py\" \"${CMAKE_INSTALL_PREFIX}\"
RESULT_VARIABLE INSTALL_SHARE_RES
)
else()
execute_process(
COMMAND python3 \"${CMAKE_SOURCE_DIR}/cmake/install-share.py\" \"${CMAKE_INSTALL_PREFIX}\"
RESULT_VARIABLE INSTALL_SHARE_RES
)
endif()
if(INSTALL_SHARE_RES)
message(FATAL_ERROR \"RetDec share directory installation FAILED\")
endif()

115
cmake/install-share.py Executable file
View File

@ -0,0 +1,115 @@
#!/usr/bin/env python3
"""
Get RetDec share directory.
"""
import sys
import hashlib
import os
import shutil
import tarfile
import urllib.request
# Check arguments.
if len(sys.argv) != 2:
print('ERROR: Unexpected number of arguments.')
sys.exit(1)
###############################################################################
version_filename = 'version.txt'
arch_suffix = 'tar.xz'
sha256hash_ref = 'b54ba07e2f28143c9afe34a9d5b4114fb61f3c1175b9807caced471fec82001e'
version = '2018-02-08'
###############################################################################
arch_name = 'retdec-support' + '_' + version + '.' + arch_suffix
# Get install path from script options.
install_path = sys.argv[1]
share_dir = os.path.join(install_path, 'share')
share_retdec_dir = os.path.join(share_dir, 'retdec')
support_dir = os.path.join(share_retdec_dir, 'support')
arch_path = os.path.join(support_dir, arch_name)
###############################################################################
def cleanup():
if os.path.exists(support_dir):
for n in os.listdir(support_dir):
p = os.path.join(support_dir, n)
if os.path.isdir(p):
shutil.rmtree(p)
else:
os.unlink(p)
# Share directory exists.
if os.path.exists(support_dir):
# Version file exists.
if os.path.isfile(os.path.join(support_dir, version_filename)):
with open(os.path.join(support_dir, version_filename)) as version_file:
version_from_file = version_file.read().split('\n')[0]
if version == version_from_file:
print('%s already exists, version is ok' % support_dir)
sys.exit(0)
else:
print('versions is not as expected -> replace with expected version')
cleanup()
# Make sure destination directory exists.
os.makedirs(support_dir, exist_ok=True)
# Download archive
arch_url = 'https://github.com/avast-tl/retdec-support/releases/download/%s/%s' % (version, arch_name)
print('Downloading archive from %s ...' % arch_url)
try:
urllib.request.urlretrieve(arch_url, arch_path)
except (urllib.request.HTTPError, urllib.request.URLError):
print('ERROR: download failed')
cleanup()
sys.exit(1)
# Compute hash of the downloaded archive.
print('Verfifying archive\'s checksum ...')
sha256 = hashlib.sha256()
with open(arch_path, 'rb') as f:
try:
sha256.update(f.read())
except IOError:
print('ERROR: failed to compute the SHA-256 hash of the archive')
cleanup()
sys.exit(1)
sha256hash = sha256.hexdigest()
# Check that hash is ok.
if sha256hash != sha256hash_ref:
print('ERROR: downloaded archive is invalid (SHA-256 hash check failed)')
cleanup()
sys.exit(1)
# Unpack archive.
print('Unpacking archive ...')
with tarfile.open(arch_path) as tar:
try:
tar.extractall(support_dir)
except tarfile.ExtractError:
print('ERROR: failed to unpack the archive')
cleanup()
sys.exit(1)
# Remove archive.
os.remove(arch_path)
print('RetDec support directory downloaded OK')
sys.exit(0)

View File

@ -9,3 +9,15 @@ endif()
install(PROGRAMS "retdec-signature-from-library-creator.sh" DESTINATION bin)
install(PROGRAMS "retdec-unpacker.sh" DESTINATION bin)
install(PROGRAMS "retdec-utils.sh" DESTINATION bin)
# copy python scripts
install(PROGRAMS "retdec_config.py" DESTINATION bin)
install(PROGRAMS "retdec_archive_decompiler.py" DESTINATION bin)
install(PROGRAMS "retdec_decompiler.py" DESTINATION bin)
install(PROGRAMS "retdec_fileinfo.py" DESTINATION bin)
if(RETDEC_TESTS)
install(PROGRAMS "retdec_tests_runner.py" DESTINATION bin)
endif()
install(PROGRAMS "retdec_signature_from_library_creator.py" DESTINATION bin)
install(PROGRAMS "retdec_unpacker.py" DESTINATION bin)
install(PROGRAMS "retdec_utils.py" DESTINATION bin)

View File

@ -0,0 +1,197 @@
#!/usr/bin/env python3
import argparse
import os
import re
import subprocess
import sys
import retdec_config as config
from retdec_utils import Utils
from retdec_utils import CmdRunner
def parse_args(args):
parser = argparse.ArgumentParser(description='Runs the decompilation script with the given optional arguments over'
' all files in the given static library or prints list of files in'
' plain text with --plain argument or in JSON format with'
' --json argument. You can pass arguments for decompilation after'
' double-dash -- argument.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("file",
metavar='FILE',
help='File to analyze.')
parser.add_argument("--plain",
dest="plain_format",
help="print list of files in plain text")
parser.add_argument("--json",
dest="json_format",
help="print list of files in json format")
parser.add_argument("--list",
dest="list_mode",
help="list")
parser.add_argument("--",
nargs='+',
dest="arg_list",
help="args passed to the decompiler")
return parser.parse_args(args)
class ArchiveDecompiler:
def __init__(self, _args):
self.args = parse_args(_args)
self.decompiler_sh_args = ''
self.timeout = 300
self.tmp_archive = ''
self.use_json_format = False
self.use_plain_format = False
self.enable_list_mode = False
self.library_path = ''
self.file_count = 0
def _print_error_plain_or_json(self, error):
"""Prints error in either plain text or JSON format.
One argument required: error message.
"""
if self.use_json_format:
message = re.escape(error)
print('{')
print(' \'error\' : \'' + message + '\'')
print('}')
else:
# Otherwise print in plain text.
Utils.print_error(error)
def _cleanup(self):
"""Cleans up all temporary files.
No arguments accepted.
"""
Utils.remove_dir_forced(self.tmp_archive)
def _check_arguments(self):
if self.args.list_mode:
self.enable_list_mode = True
if self.args.plain_format:
if self.use_json_format:
Utils.print_error('Arguments --plain and --json are mutually exclusive.')
return False
else:
self.enable_list_mode = True
self.use_plain_format = True
if self.args.json_format:
if self.args.args.plain_format:
Utils.print_error('Arguments --plain and --json are mutually exclusive.')
return False
else:
self.enable_list_mode = True
self.use_json_format = True
if self.args.arg_list:
self.decompiler_sh_args = self.args.arg_list
if self.args.file:
if not os.path.isfile(self.args.file):
Utils.print_error('Input %s is not a valid file.' % self.args.file)
return False
self.library_path = self.args.file
if self.library_path == '':
self._print_error_plain_or_json('No input file.')
return False
return True
def decompile_archive(self):
# Check arguments
if not self._check_arguments():
return 1
# Check for archives packed in Mach-O Universal Binaries.
if Utils.is_macho_archive(self.library_path):
if self.enable_list_mode:
if self.use_json_format:
subprocess.call([config.EXTRACT, '--objects', '--json', self.library_path], shell=True)
else:
subprocess.call([config.EXTRACT, '--objects', self.library_path], shell=True)
return 1
self.tmp_archive = self.library_path + '.a'
subprocess.call([config.EXTRACT, '--best', '--out', self.tmp_archive, self.library_path], shell=True)
self.library_path = self.tmp_archive
# Check for thin archives.
if Utils.has_thin_archive_signature(self.library_path) == 0:
self._print_error_plain_or_json('File is a thin archive and cannot be decompiled.')
return 1
# Check if file is archive
if not Utils.is_valid_archive(self.library_path):
self._print_error_plain_or_json('File is not supported archive or is not readable.')
return 1
# Check number of files.
self.file_count = Utils.archive_object_count(self.library_path)
if self.file_count <= 0:
self._print_error_plain_or_json('No files found in archive.')
return 1
# List only mode.
if self.enable_list_mode:
if self.use_json_format:
Utils.archive_list_numbered_content_json(self.library_path)
else:
Utils.archive_list_numbered_content(self.library_path)
self._cleanup()
return 0
# Run the decompilation script over all the found files.
print('Running \`%s' % config.DECOMPILER, end='')
if self.decompiler_sh_args:
print(' '.join(self.decompiler_sh_args), end='')
print('\` over %d files with timeout %d s. (run \`kill %d \` to terminate this script)...' % (
self.file_count, self.timeout, os.getpid()), file=sys.stderr)
cmd = CmdRunner()
for i in range(self.file_count):
file_index = (i + 1)
print('%d/%d\t\t' % (file_index, self.file_count))
# We have to use indexes instead of names because archives can contain multiple files with same name.
log_file = self.library_path + '.file_' + str(file_index) + '.log.verbose'
# Do not escape!
output, _, timeouted = cmd.run_cmd([sys.executable, config.DECOMPILER, '--ar-index=' + str(i), '-o',
self.library_path + '.file_' + str(file_index) + '.c',
self.library_path, *self.decompiler_sh_args], timeout=self.timeout)
with open(log_file, 'wb') as f:
f.write(output)
if timeouted:
print('[TIMEOUT]')
else:
print('[OK]')
self._cleanup()
return 0
if __name__ == '__main__':
archive_decompiler = ArchiveDecompiler(sys.argv[1:])
sys.exit(archive_decompiler.decompile_archive())

86
scripts/retdec_config.py Normal file
View File

@ -0,0 +1,86 @@
#!/usr/bin/env python3
import os
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
"""Paths (everything has to be without the ending slash '/').
Paths relative from script path.
"""
INSTALL_BIN_DIR = SCRIPT_DIR
UNIT_TESTS_DIR = INSTALL_BIN_DIR
INSTALL_SHARE_DIR = os.path.join(INSTALL_BIN_DIR, '..', 'share', 'retdec')
INSTALL_SUPPORT_DIR = os.path.join(INSTALL_SHARE_DIR, 'support')
INSTALL_SHARE_YARA_DIR = os.path.join(INSTALL_SUPPORT_DIR, 'generic', 'yara_patterns')
# generic configuration
GENERIC_TYPES_DIR = os.path.join(INSTALL_SUPPORT_DIR, 'generic', 'types')
GENERIC_SIGNATURES_DIR = os.path.join(INSTALL_SHARE_YARA_DIR, 'static-code')
# ARM-specific configuration
ARM_ORDS_DIR = os.path.join(INSTALL_SUPPORT_DIR, 'arm', 'ords')
# X86-specific configuration
X86_ORDS_DIR = os.path.join(INSTALL_SUPPORT_DIR, 'x86', 'ords')
"""BIN2LLVMIR parameters
The following list of passes is -O3
* with -disable-inlining -disable-simplify-libcalls -constprop -die -dce -ipconstprop -instnamer
* without -internalize -inline -inline-cost -notti -deadargelim -argpromotion -simplify-libcalls -loop-unroll
-loop-unswitch -sroa -tailcallelim -functionattrs -memcpyopt -prune-eh
The following options are useful during debugging of bin2llvmirl optimizations.
parameters beginning with -disable-* may be included only once, which is the
* -print-after-all -debug-only=idioms -print-before=idioms -print-after=idioms
-unreachable-funcs is automatically removed in decompilation script when the
-k/--keep-unreachable-funcs parameter is used.
- We need to run -instcombine after -dead-global-assign to eliminate dead
instructions after this optimization.
- Optimization -phi2seq is needed to be run at the end and not to run two
times. This is the reason why it is placed at the very end.
"""
BIN2LLVMIR_PARAMS_DISABLES = ['-disable-inlining', '-disable-simplify-libcalls']
BIN2LLVMIR_LLVM_PASSES_ONLY = ['-instcombine', '-tbaa', '-targetlibinfo', '-basicaa', '-domtree', '-simplifycfg',
'-domtree', '-early-cse', '-lower-expect', '-targetlibinfo', '-tbaa', '-basicaa',
'-globalopt', '-mem2reg', '-instcombine', '-simplifycfg', '-basiccg', '-domtree',
'-early-cse', '-lazy-value-info', '-jump-threading', '-correlated-propagation',
'-simplifycfg', '-instcombine', '-simplifycfg', '-reassociate', '-domtree', '-loops',
'-loop-simplify', '-lcssa', '-loop-rotate', '-licm', '-lcssa', '-instcombine',
'-scalar-evolution', '-loop-simplifycfg', '-loop-simplify', '-aa', '-loop-accesses',
'-loop-load-elim', '-lcssa', '-indvars', '-loop-idiom', '-loop-deletion', '-memdep',
'-gvn', '-memdep', '-sccp', '-instcombine', '-lazy-value-info', '-jump-threading',
'-correlated-propagation', '-domtree', '-memdep', '-dse', '-dce', '-bdce', '-adce',
'-die', '-simplifycfg', '-instcombine', '-strip-dead-prototypes', '-globaldce',
'-constmerge', '-constprop', '-instnamer', '-domtree', '-instcombine']
BIN2LLVMIR_PARAMS = ['-provider-init', '-decoder', '-verify', '-main-detection', '-idioms-libgcc', '-inst-opt',
'-register', '-cond-branch-opt', '-syscalls', '-stack', '-constants', '-param-return',
'-local-vars', '-inst-opt', '-simple-types', '-generate-dsm', '-remove-asm-instrs',
'-class-hierarchy', '-select-fncs', '-unreachable-funcs', '-inst-opt', '-value-protect',
*BIN2LLVMIR_LLVM_PASSES_ONLY, *BIN2LLVMIR_LLVM_PASSES_ONLY, '-simple-types',
'-stack-ptr-op-remove', '-inst-opt', '-idioms', '-global-to-local', '-dead-global-assign',
'-instcombine', '-phi2seq', '-value-protect', *BIN2LLVMIR_PARAMS_DISABLES]
# Paths to tools.
FILEINFO = os.path.join(INSTALL_BIN_DIR, 'retdec-fileinfo')
FILEINFO_EXTERNAL_YARA_PRIMARY_CRYPTO_DATABASES = [os.path.join(INSTALL_SHARE_YARA_DIR, 'signsrch', 'signsrch.yara')]
FILEINFO_EXTERNAL_YARA_EXTRA_CRYPTO_DATABASES = [
os.path.join(INSTALL_SHARE_YARA_DIR, 'signsrch', 'signsrch_regex.yara')]
AR = os.path.join(INSTALL_BIN_DIR, 'retdec-ar-extractor')
BIN2PAT = os.path.join(INSTALL_BIN_DIR, 'retdec-bin2pat')
PAT2YARA = os.path.join(INSTALL_BIN_DIR, 'retdec-pat2yara')
CONFIGTOOL = os.path.join(INSTALL_BIN_DIR, 'retdec-config')
EXTRACT = os.path.join(INSTALL_BIN_DIR, 'retdec-macho-extractor')
DECOMPILER = os.path.join(INSTALL_BIN_DIR, 'retdec_decompiler.py')
ARCHIVE_DECOMPILER = os.path.join(INSTALL_BIN_DIR, 'retdec_archive_decompiler.py')
SIG_FROM_LIB = os.path.join(INSTALL_BIN_DIR, 'retdec_signature_from_library_creator.py')
UNPACK = os.path.join(INSTALL_BIN_DIR, 'retdec_unpacker.py')
LLVMIR2HLL = os.path.join(INSTALL_BIN_DIR, 'retdec-llvmir2hll')
BIN2LLVMIR = os.path.join(INSTALL_BIN_DIR, 'retdec-bin2llvmir')
IDA_COLORIZER = os.path.join(INSTALL_BIN_DIR, 'retdec-color-c.py')
UNPACKER = os.path.join(INSTALL_BIN_DIR, 'retdec-unpacker')

1315
scripts/retdec_decompiler.py Executable file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
"""When analyzing an archive, use the archive decompilation script `--list` instead of
`fileinfo` because fileinfo is currently unable to analyze archives.
First, we have to find path to the input file. We take the first parameter
that does not start with a dash. This is a simplification and may not work in
all cases. A proper solution would need to parse fileinfo parameters, which
would be complex.
"""
import argparse
import subprocess
import sys
import retdec_config as config
from retdec_utils import Utils
from retdec_archive_decompiler import ArchiveDecompiler
def parse_args():
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('file',
metavar='FILE',
help='File to analyze.')
parser.add_argument('-j', '--json',
dest='json',
action='store_true',
help='Set to forward --json to the archive decompilation script.')
parser.add_argument('--use-external-patterns',
dest='external_patterns',
action='store_true',
help='Should use external patterns')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
if Utils.has_archive_signature(args.file):
# The input file is not an archive.
# The input file is an archive, so use the archive decompilation script
# instead of fileinfo.
archive_decompiler_args = [args.file, '--list']
if args.json:
archive_decompiler_args.append('--json')
decompiler = ArchiveDecompiler(archive_decompiler_args)
sys.exit(decompiler.decompile_archive())
# We are not analyzing an archive, so proceed to fileinfo.
fileinfo_params = [args.file]
for par in config.FILEINFO_EXTERNAL_YARA_PRIMARY_CRYPTO_DATABASES:
fileinfo_params.extend(['--crypto', par])
if args.external_patterns:
for par in config.FILEINFO_EXTERNAL_YARA_EXTRA_CRYPTO_DATABASES:
fileinfo_params.extend(['--crypto', par])
subprocess.call([config.FILEINFO, *fileinfo_params], shell=True)

View File

@ -0,0 +1,176 @@
#! /usr/bin/env python3
"""Create Yara rules file from static libraries."""
import argparse
import shutil
import sys
import os
import subprocess
import tempfile
from pathlib import Path
import retdec_config as config
from retdec_utils import Utils
def parse_args(args):
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('input',
nargs='+',
metavar='FILE',
help='Input file(s)')
parser.add_argument('-n', '--no-cleanup',
dest='no_cleanup',
action='store_true',
help='Temporary .pat files will be kept.')
parser.add_argument('-o', '--output',
dest='output',
metavar='FILE',
required=True,
help='Where result(s) will be stored.')
parser.add_argument('-m', '--min-pure',
dest='min_pure',
default=16,
help='Minimum pure information needed for patterns.')
parser.add_argument('-i', '--ignore-nops',
dest='ignore_nops',
help='Ignore trailing NOPs when computing (pure) size.')
parser.add_argument('-l', '--logfile',
dest='logfile',
action='store_true',
help='Add log-file with \'.log\' suffix from pat2yara.')
parser.add_argument('-b', '--bin2pat-only',
dest='bin_to_pat_only',
action='store_true',
help='Stop after bin2pat.')
return parser.parse_args(args)
class SigFromLib:
def __init__(self, _args):
self.args = parse_args(_args)
self.ignore_nop = ''
self.file_path = ''
self.tmp_dir_path = ''
def print_error_and_cleanup(self, message):
"""Print error message and clean up temporary files.
"""
# Cleanup.
if not self.args.no_cleanup:
Utils.remove_dir_forced(self.tmp_dir_path)
Utils.print_error(message + '.')
def _check_arguments(self):
for f in self.args.input:
if not os.path.isfile(f):
self.print_error_and_cleanup('input %s is not a valid file nor argument' % f)
return False
self.file_path = self.args.output
dir_name = os.path.dirname(os.path.abspath(self.file_path))
self.tmp_dir_path = os.path.join(dir_name, 'XXXXXXXXX')
if self.args.ignore_nops:
self.ignore_nop = '--ignore-nops'
return True
def run(self):
if not self._check_arguments():
return 1
pattern_files = []
object_dirs = []
# Create .pat files for every library.
for lib_path in self.args.input:
# Check for invalid archives.
if not Utils.is_valid_archive(lib_path):
print('ignoring file %s - not valid archive' % lib_path)
continue
# Get library name for .pat file.
lib_name = os.path.splitext(lib_path)[0]
# Create sub-directory for object files.
object_dir = os.path.join(self.tmp_dir_path, lib_name) + '-objects'
object_dirs = [object_dir]
os.makedirs(object_dir, exist_ok=True)
# Extract all files to temporary folder.
subprocess.call([config.AR, lib_path, '--extract', '--output', object_dir], shell=True)
# List all extracted objects.
objects = []
for root, dirs, files in os.walk(object_dir):
for f in files:
fname = os.path.join(root, f)
if os.path.isfile(fname):
objects.append(fname)
# Extract patterns from library.
pattern_file = os.path.join(self.tmp_dir_path, lib_name) + '.pat'
pattern_files = [pattern_file]
result = subprocess.call([config.BIN2PAT, '-o', pattern_file, *objects], shell=True)
if result != 0:
self.print_error_and_cleanup('utility bin2pat failed when processing %s' % lib_path)
return 1
# Remove extracted objects continuously.
if not self.args.no_cleanup:
if os.path.exists(object_dir):
shutil.rmtree(object_dir)
# Skip second step - only .pat files will be created.
if self.args.bin_to_pat_only:
if not self.args.no_cleanup:
for d in object_dirs:
if os.path.exists(d):
shutil.rmtree(d)
return 0
# Create final .yara file from .pat files.
if self.args.logfile:
result = subprocess.call(
[config.PAT2YARA, *pattern_files, '--min-pure', str(self.args.min_pure), '-o', self.file_path, '-l',
self.file_path + '.log', self.ignore_nop,
str(self.args.ignore_nops)],
shell=True)
if result != 0:
self.print_error_and_cleanup('utility pat2yara failed')
else:
result = subprocess.call(
[config.PAT2YARA, *pattern_files, '--min-pure', str(self.args.min_pure), '-o', self.file_path,
self.ignore_nop, str(self.args.ignore_nops)], shell=True)
if result != 0:
self.print_error_and_cleanup('utility pat2yara failed')
return 1
# Do cleanup.
if not self.args.no_cleanup:
Utils.remove_dir_forced(self.tmp_dir_path)
return result
if __name__ == '__main__':
sig = SigFromLib(sys.argv[1:])
sys.exit(sig.run())

View File

@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""Runs all the installed unit tests."""
import sys
import os
import subprocess
import retdec_config as config
from retdec_utils import CmdRunner
"""First argument can be verbose."""
if len(sys.argv) > 1:
if sys.argv[1] in ['-v', '--verbose']:
verbose = True
else:
verbose = False
def print_colored(message, color):
"""Emits a colored version of the given message to the standard output (without
a new line).
2 string argument are needed:
$1 message to be colored
$2 color (red, green, yellow)
If the color is unknown, it emits just $1.
"""
if color == 'red':
print('\033[22;31m' + message + '\033[0m')
elif color == 'green':
print('\033[22;32m' + message + '\033[0m')
elif color == 'yellow':
print('\033[01;33m' + message + '\033[0m')
else:
print(message + '\n')
def unit_tests_in_dir(path):
"""Prints paths to all unit tests in the given directory.
1 string argument is needed:
path-path to the directory with unit tests
"""
tests = []
for file in os.listdir(path):
file_name = os.path.basename(file)
if file_name.startswith('retdec-tests-') and not file.endswith('.sh') and not file.endswith('.py'):
tests.append(os.path.abspath(file))
tests.sort()
return tests
def run_unit_tests_in_dir(path):
"""Runs all unit tests in the given directory.
1 string argument is needed:
path - path to the directory with unit tests
Returns 0 if all tests passed, 1 otherwise.
"""
tests_failed = False
tests_run = False
for unit_test in unit_tests_in_dir(path):
print()
unit_test_name = os.path.basename(unit_test)
print_colored(unit_test_name, 'yellow')
print()
# TODO verbose support
cmd = CmdRunner()
output, return_code, _ = cmd.run_cmd([unit_test, '--gtest_color=yes'])
print(output)
if return_code != 0:
tests_failed = True
if return_code >= 127:
# Segfault, floating-point exception, etc.
print_colored('FAILED (return code %d)\n' % return_code, 'red')
tests_run = True
if tests_failed or not tests_run:
return 1
else:
return 0
if not os.path.isdir(config.UNIT_TESTS_DIR):
"""Run all binaries in unit test dir."""
sys.stderr.write('error: no unit tests found in %s' % config.UNIT_TESTS_DIR)
sys.exit(1)
print('Running all unit tests in %s...' % config.UNIT_TESTS_DIR)
sys.exit(run_unit_tests_in_dir(config.UNIT_TESTS_DIR))

215
scripts/retdec_unpacker.py Normal file
View File

@ -0,0 +1,215 @@
#!/usr/bin/env python3
"""
The script tries to unpack the given executable file by using any
of the supported unpackers, which are at present:
* generic unpacker
* upx
Required argument:
* (packed) binary file
Optional arguments:
* desired name of unpacked file
* use extended exit codes
Returns:
0 successfully unpacked
"""
import argparse
import os
import shutil
import sys
import retdec_config as config
from retdec_utils import CmdRunner
from retdec_utils import Utils
def parse_args(_args):
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('file',
metavar='FILE',
help='The input file.')
parser.add_argument('-e', '--extended-exit-codes',
dest='extended_exit_codes',
action='store_true',
help='Use more granular exit codes than just 0/1.')
parser.add_argument('-o', '--output',
dest='output',
metavar='FILE',
help='Output file (default: file-unpacked).')
parser.add_argument('--max-memory',
dest='max_memory',
help='Limit the maximal memory of retdec-unpacker to N bytes.')
parser.add_argument('--max-memory-half-ram',
dest='max_memory_half_ram',
action='store_true',
help='Limit the maximal memory of retdec-unpacker to half of system RAM.')
return parser.parse_args(_args)
class Unpacker:
RET_UNPACK_OK = 0
# 1 generic unpacker - nothing to do; upx succeeded (--extended-exit-codes only)
RET_UNPACKER_NOTHING_TO_DO_OTHERS_OK = 1
# 2 not packed or unknown packer
RET_NOTHING_TO_DO = 2
# 3 generic unpacker failed; upx succeeded (--extended-exit-codes only)
RET_UNPACKER_FAILED_OTHERS_OK = 3
# 4 generic unpacker failed; upx not succeeded
RET_UNPACKER_FAILED = 4
UNPACKER_EXIT_CODE_OK = 0
# 1 There was not found matching plugin.
UNPACKER_EXIT_CODE_NOTHING_TO_DO = 1
# 2 At least one plugin failed at the unpacking of the file.
UNPACKER_EXIT_CODE_UNPACKING_FAILED = 2
# 3 Error with preprocessing of input file before unpacking.
UNPACKER_EXIT_CODE_PREPROCESSING_ERROR = 3
def __init__(self, _args):
self.args = parse_args(_args)
self.input = ''
self.output = ''
def _check_arguments(self):
"""Check proper combination of input arguments.
"""
# Check whether the input file was specified.
if self.args.file is None:
Utils.print_error('No input file was specified')
return False
if not os.access(self.args.file, os.R_OK):
Utils.print_error('The input file %s does not exist or is not readable' % self.args.file)
return False
# Conditional initialization.
if not self.args.output:
self.output = self.args.file + '-unpacked'
else:
self.output = self.args.output
if self.args.max_memory is not None:
try:
max_memory = int(self.args.max_memory)
if max_memory > 0:
return True
except ValueError:
Utils.print_error('Invalid value for --max-memory: %s (expected a positive integer)'
% self.args.max_memory)
return False
# Convert to absolute paths.
self.input = os.path.abspath(self.args.file)
self.output = os.path.abspath(self.output)
return True
def _unpack(self, output):
"""Try to unpack the given file.
"""
unpacker_params = [self.input, '-o', output]
if self.args.max_memory:
unpacker_params.extend(['--max-memory', self.args.max_memory])
elif self.args.max_memory_half_ram:
unpacker_params.append('--max-memory-half-ram')
print()
print('##### Trying to unpack ' + self.input + ' into ' + output + ' by using generic unpacker...')
print('RUN: ' + config.UNPACKER + ' '.join(unpacker_params))
cmd = CmdRunner()
unpacker_output, unpacker_rc, _ = cmd.run_cmd([config.UNPACKER, *unpacker_params])
print(unpacker_output)
if unpacker_rc == self.UNPACKER_EXIT_CODE_OK:
print('##### Unpacking by using generic unpacker: successfully unpacked')
return unpacker_output, self.RET_UNPACK_OK
elif unpacker_rc == self.UNPACKER_EXIT_CODE_NOTHING_TO_DO:
print('##### Unpacking by using generic unpacker: nothing to do')
else:
# Do not return -> try the next unpacker
# UNPACKER_EXIT_CODE_UNPACKING_FAILED
# UNPACKER_EXIT_CODE_PREPROCESSING_ERROR
print('##### Unpacking by using generic unpacker: failed')
if Utils.tool_exists('upx'):
# Do not return -> try the next unpacker
# Try to unpack via UPX
print()
print('##### Trying to unpack ' + self.input + ' into ' + output + ' by using UPX...')
print('RUN: upx -d ' + self.input + ' -o ' + output)
unpacker_output, upx_rc, _ = cmd.run_cmd(['upx', '-d', self.input, '-o', output])
print(unpacker_output)
if upx_rc == 0:
print('##### Unpacking by using UPX: successfully unpacked')
if self.args.extended_exit_codes:
if unpacker_rc == self.UNPACKER_EXIT_CODE_NOTHING_TO_DO:
return unpacker_output, self.RET_UNPACKER_NOTHING_TO_DO_OTHERS_OK
elif unpacker_rc >= self.UNPACKER_EXIT_CODE_UNPACKING_FAILED:
return unpacker_output, self.RET_UNPACKER_FAILED_OTHERS_OK
else:
return unpacker_output, self.RET_UNPACK_OK
else:
# We cannot distinguish whether upx failed or the input file was
# not upx-packed
print('##### Unpacking by using UPX: nothing to do')
else:
print('##### \'upx\' not available: nothing to do')
# Do not return -> try the next unpacker
# Return.
if unpacker_rc >= self.UNPACKER_EXIT_CODE_UNPACKING_FAILED:
return unpacker_output, self.RET_UNPACKER_FAILED
else:
return unpacker_output, self.RET_NOTHING_TO_DO
def unpack_all(self):
# Check arguments and set default values for unset options.
if not self._check_arguments():
return '', -1
res_rc = -1
res_out = ''
tmp_output = self.output + '.tmp'
while True:
unpacker_out, return_code = self._unpack(tmp_output)
res_out += unpacker_out + '\n'
if return_code == self.RET_UNPACK_OK or return_code == self.RET_UNPACKER_NOTHING_TO_DO_OTHERS_OK \
or return_code == self.RET_UNPACKER_FAILED_OTHERS_OK:
res_rc = return_code
shutil.move(tmp_output, self.output)
self.input = self.output
else:
# Remove the temporary file, just in case some of the unpackers crashed
# during unpacking and left it on the disk (e.g. upx).
if os.path.exists(tmp_output):
os.remove(tmp_output)
break
return (res_out, return_code) if res_rc == -1 else (res_out, res_rc)
if __name__ == '__main__':
unpacker = Unpacker(sys.argv[1:])
_, rc = unpacker.unpack_all()
sys.exit(rc)

415
scripts/retdec_utils.py Normal file
View File

@ -0,0 +1,415 @@
#!/usr/bin/env python3
"""Compilation and decompilation utility functions.
"""
import os
import re
import shutil
import signal
import subprocess
import sys
import time
import retdec_config as config
"""Taken from https://github.com/avast-tl/retdec-regression-tests-framework/blob/master/regression_tests/cmd_runner.py
"""
class CmdRunner:
"""A runner of external commands."""
def run_cmd(self, cmd, input=b'', timeout=None, input_encoding='utf-8',
output_encoding='utf-8', strip_shell_colors=True):
"""Runs the given command (synchronously).
:param list cmd: Command to be run as a list of arguments (strings).
:param bytes input: Input to be used when running the command.
:param int timeout: Number of seconds after which the command should be
terminated.
:param str input_encoding: Encode the command's output in this encoding.
:param str output_encoding: Decode the command's output in this encoding.
:param bool strip_shell_colors: Should shell colors be stripped from
the output?
:returns: A triple (`output`, `return_code`, `timeouted`).
The meaning of the items in the return value are:
* `output` contains the combined output from the standard outputs and
standard error,
* `return_code` is the return code of the command,
* `timeouted` is either `True` or `False`, depending on whether the
command has timeouted.
If `input` is a string (`str`), not `bytes`, it is decoded into `bytes`
by using `input_encoding`.
If `output_encoding` is not ``None``, the returned data are decoded in
that encoding. Also, all line endings are converted to ``'\\n'``, and
if ``strip_shell_colors`` is ``True``, shell colors are stripped.
Otherwise, if `output_encoding` is ``None``, the data are directly
returned as raw bytes without any conversions.
To disable the timeout, pass ``None`` as `timeout` (the default).
If the timeout expires before the command finishes, the value of `output`
is the command's output generated up to the timeout.
"""
_, output, return_code, timeouted = self._run_cmd(cmd, input, timeout, input_encoding, output_encoding,
strip_shell_colors, False)
return output, return_code, timeouted
def run_measured_cmd(self, command):
"""Runs the given command (synchronously) and measure its time and memory.
:param list command: Command to be run as a list of arguments (strings).
:returns: A quadruple (`memory`, `elapsed_time`, `output`, `return_code`).
"""
cmd = CmdRunner()
start = time.time()
memory, output, rc, _ = cmd._run_cmd(command, track_memory=True)
elapsed = time.time() - start
return memory, int(elapsed), output, rc
def _run_cmd(self, cmd, input=b'', timeout=None, input_encoding='utf-8',
output_encoding='utf-8', strip_shell_colors=True, track_memory=False):
def decode(output):
if output_encoding is not None:
output = output.decode(output_encoding, errors='replace')
output = re.sub(r'\r\n?', '\n', output)
if strip_shell_colors:
return re.sub(r'\x1b[^m]*m', '', output)
return output
# The communicate() call below expects the input to be in bytes, so
# convert it unless it is already in bytes.
if not isinstance(input, bytes):
input = input.encode(input_encoding)
memory = 0
try:
p = self.start(cmd)
if track_memory:
try:
import psutil
proc = psutil.Process(p.pid)
memory = int(proc.memory_info().rss / float(1 << 20))
except ImportError:
memory = 0
output, _ = p.communicate(input, timeout)
return memory, decode(output).rstrip(), p.returncode, False
except subprocess.TimeoutExpired:
# Kill the process, along with all its child processes.
p.kill()
# Finish the communication to obtain the output.
output, _ = p.communicate()
return memory, decode(output).rstrip(), p.returncode, True
def start(self, cmd, discard_output=False, stdout=subprocess.STDOUT):
"""Starts the given command and returns a handler to it.
:param list cmd: Command to be run as a list of arguments (strings).
:param bool discard_output: Should the output be discarded instead of
being buffered so it can be obtained later?
:param int stdout: If discard_output is True, errors will be redirectected
to the stdout param.
:returns: A handler to the started command (``subprocess.Popen``).
If the output is irrelevant for you, you should set `discard_output` to
``True``.
"""
# The implementation is platform-specific because we want to be able to
# kill the children alongside with the process.
kwargs = dict(
args=cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL if discard_output else subprocess.PIPE,
stderr=subprocess.DEVNULL if discard_output else stdout
)
if Utils.is_windows():
return _WindowsProcess(**kwargs)
else:
return _LinuxProcess(**kwargs)
class _LinuxProcess(subprocess.Popen):
"""An internal wrapper around ``subprocess.Popen`` for Linux."""
def __init__(self, **kwargs):
# To ensure that all the process' children terminate when the process
# is killed, we use a process group so as to enable sending a signal to
# all the processes in the group. For that, we attach a session ID to
# the parent process of the spawned child processes. This will make it
# the group leader of the processes. When a signal is sent to the
# process group leader, it's transmitted to all of the child processes
# of this group.
#
# os.setsid is passed in the argument preexec_fn so it's run after
# fork() and before exec().
#
# This solution is based on http://stackoverflow.com/a/4791612.
kwargs['preexec_fn'] = os.setsid
super().__init__(**kwargs)
def kill(self):
"""Kills the process, including its children."""
os.killpg(self.pid, signal.SIGTERM)
class _WindowsProcess(subprocess.Popen):
"""An internal wrapper around ``subprocess.Popen`` for Windows."""
def __init__(self, **kwargs):
# Shell scripts need to be run with 'sh' on Windows. Simply running the
# script by its path doesn't work. That is, for example, instead of
#
# /path/to/retdec-decompiler.sh
#
# we need to run
#
# sh /path/to/retdec-decompiler.sh
#
if 'args' in kwargs and kwargs['args'] and kwargs['args'][0].endswith('.sh'):
kwargs['args'].insert(0, 'sh')
super().__init__(**kwargs)
def kill(self):
"""Kills the process, including its children."""
# Since os.setsid() and os.killpg() are not available on Windows, we
# have to do this differently. More specifically, we do this by calling
# taskkill, which also kills the process' children.
#
# This solution is based on
# http://mackeblog.blogspot.cz/2012/05/killing-subprocesses-on-windows-in.html
cmd = ['taskkill', '/F', '/T', '/PID', str(self.pid)]
subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
class Utils:
@staticmethod
def tool_exists(tool_name):
return shutil.which(tool_name) is not None
@staticmethod
def remove_file_forced(file):
if os.path.exists(file):
os.remove(file)
@staticmethod
def remove_dir_forced(path):
if os.path.exists(path):
for n in os.listdir(path):
p = os.path.join(path, n)
if os.path.isdir(p):
shutil.rmtree(p, ignore_errors=True)
else:
os.unlink(p)
@staticmethod
def is_windows():
return sys.platform in ('win32', 'msys')
@staticmethod
def print_error(error):
"""Print error message to stderr.
"""
print('Error: %s' % error, file=sys.stdout)
@staticmethod
def print_warning(warning):
"""Print warning message to stderr.
"""
sys.stderr.write('Warning: %s' % warning)
@staticmethod
def has_archive_signature(path):
"""Check if file has any ar signature.
1 argument is needed - file path
Returns - True if file has ar signature
False no signature
"""
ret = subprocess.call([config.AR, path, '--arch-magic'], shell=True)
return ret == 0
@staticmethod
def has_thin_archive_signature(path):
"""Check if file has thin ar signature.
1 argument is needed - file path
Returns - True if file has thin ar signature
False no signature
"""
ret = subprocess.call([config.AR, path, '--thin-magic'], shell=True)
return ret == 0
@staticmethod
def is_valid_archive(path):
"""Check if file is an archive we can work with.
1 argument is needed - file path
Returns - True if file is valid archive
False if file is invalid archive
"""
# We use our own messages so throw original output away.
ret = subprocess.call([config.AR, path, '--valid'], shell=True, stderr=subprocess.STDOUT,
stdout=None)
return ret == 0
@staticmethod
def archive_object_count(path):
"""Counts object files in archive.
1 argument is needed - file path
Returns - 1 if error occurred
"""
cmd = CmdRunner()
output, rc, _ = cmd.run_cmd([config.AR, path, '--object-count'])
return int(output) if rc == 0 else 1
@staticmethod
def archive_list_content(path):
"""Print content of archive.
1 argument is needed - file path
"""
cmd = CmdRunner()
output, _, _ = cmd.run_cmd([config.AR, path, '--list', '--no-numbers'])
print(output)
@staticmethod
def archive_list_numbered_content(path):
"""Print numbered content of archive.
1 argument is needed - file path
"""
print('Index\tName')
cmd = CmdRunner()
output, _, _ = cmd.run_cmd([config.AR, path, '--list'])
print(output)
@staticmethod
def archive_list_numbered_content_json(path):
"""Print numbered content of archive in JSON format.
1 argument is needed - file path
"""
cmd = CmdRunner()
output, _, _ = cmd.run_cmd([config.AR, path, '--list', '--json'])
print(output)
@staticmethod
def archive_get_by_name(path, name, output):
"""Get a single file from archive by name.
3 arguments are needed - path to the archive
- name of the file
- output path
"""
ret = subprocess.call([config.AR, path, '--name', name, '--output', output],
shell=True, stderr=subprocess.STDOUT, stdout=None)
return ret != 2
@staticmethod
def archive_get_by_index(archive, index, output):
"""Get a single file from archive by index.
3 arguments are needed - path to the archive
- index of the file
- output path
"""
ret = subprocess.call([config.AR, archive, '--index', index, '--output', output],
shell=True, stderr=subprocess.STDOUT, stdout=None)
return ret != 2
@staticmethod
def is_macho_archive(path):
"""Check if file is Mach-O universal binary with archives.
1 argument is needed - file path
Returns - True if file is archive
False if file is not archive
"""
ret = subprocess.call([config.EXTRACT, '--check-archive', path], shell=True,
stderr=subprocess.STDOUT, stdout=subprocess.DEVNULL)
return ret == 0
@staticmethod
def is_decimal_number(num):
"""Check string is a valid decimal number.
1 argument is needed - string to check.
Returns - 0 if string is a valid decimal number.
1 otherwise
"""
if re.search('^[0-9]+$', str(num)):
return True
else:
return False
@staticmethod
def is_hexadecimal_number(num):
"""Check string is a valid hexadecimal number.
1 argument is needed - string to check.
Returns - 0 if string is a valid hexadecimal number.
1 otherwise
"""
if re.search('^0x[0-9a-fA-F]+$', str(num)):
return True
else:
return False
@staticmethod
def is_number(num):
"""Check string is a valid number (decimal or hexadecimal).
1 argument is needed - string to check.
Returns - 0 if string is a valid number.
1 otherwise
"""
if Utils.is_decimal_number(num):
return True
if Utils.is_hexadecimal_number(num):
return True
return False
@staticmethod
def is_decimal_range(num):
"""Check string is a valid decimal range.
1 argument is needed - string to check.
Returns - 0 if string is a valid decimal range.
1 otherwise
"""
if re.search('^[0-9]+-[0-9]+$', str(num)):
return True
else:
return False
@staticmethod
def is_hexadecimal_range(num):
"""Check string is a valid hexadecimal range
1 argument is needed - string to check.
Returns - 0 if string is a valid hexadecimal range
1 otherwise
"""
if re.search('^0x[0-9a-fA-F]+-0x[0-9a-fA-F]+$', str(num)):
return True
else:
return False
@staticmethod
def is_range(num):
"""Check string is a valid range (decimal or hexadecimal).
1 argument is needed - string to check.
Returns - 0 if string is a valid range
1 otherwise
"""
if Utils.is_decimal_range(num):
return True
if Utils.is_hexadecimal_range(num):
return True
return False

View File

@ -0,0 +1,198 @@
#! /usr/bin/env python3
"""Generator of JSON files containing C-types information for C standard library and other header files in /usr/include/ directory."""
import argparse
import shutil
import sys
import os
import subprocess
import glob
#
# C standard library headers.
#
CSTDLIB_HEADERS = [
'assert.h',
'complex.h',
'ctype.h',
'errno.h',
'fenv.h',
'float.h',
'inttypes.h',
'iso646.h',
'limits.h',
'locale.h',
'math.h',
'setjmp.h',
'signal.h',
'stdalign.h',
'stdarg.h',
'stdatomic.h',
'stdbool.h',
'stddef.h',
'stdint.h',
'stdio.h',
'stdlib.h',
'stdnoreturn.h',
'string.h',
'tgmath.h',
'threads.h',
'time.h',
'uchar.h',
'wchar.h',
'wctype.h'
]
#
# Files we don't want in JSONs.
#
FILES_PATTERNS_TO_FILTER_OUT=[
'GL/',
'Qt.*/',
'SDL.*/',
'X11/',
'alsa/',
'c\\+\\+/',
'dbus.*/',
'glib.*/',
'libdrm/',
'libxml2/',
'llvm.*/',
'mirclient/',
'php[0-9.-]*/',
'pulse/',
'python.*/',
'ruby.*/',
'wayland.*/',
'xcb/'
]
#SEP = '\\|'
FILES_FILTER = '|'.join(FILES_PATTERNS_TO_FILTER_OUT)
#FILES_FILTER = (FILES_FILTER:Expand.hash()SEP)
#
# Paths.
#
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SCRIPT_NAME = __name__
EXTRACTOR = os.path.join(SCRIPT_DIR, 'extract_types.py')
MERGER = os.path.join(SCRIPT_DIR, 'merge_jsons.py')
INCLUDE_DIR = '/usr/include/'
OUT_DIR = '.'
STD_LIB_OUT_DIR = os.path.join(OUT_DIR, 'gen_tmp_cstdlib')
STD_LIB_JSON = os.path.join(OUT_DIR, 'cstdlib.json')
LINUX_OUT_DIR = os.path.join(OUT_DIR, 'gen_tmp_linux')
LINUX_JSON = os.path.join(OUT_DIR, 'linux.json')
CSTDLIB_PRIORITY_OUT_DIR = os.path.join(OUT_DIR, 'gen_tmp_cstdlib_priority')
LINUX_PRIORITY_OUT_DIR = os.path.join(OUT_DIR, 'gen_tmp_linux_priority')
def parse_args():
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('-i', '--json-indent',
dest='json_indent',
default=1,
help='Set indentation in JSON files.')
parser.add_argument('-f', '--files-filter',
dest='file_filter',
help='Pattern to ignore specific header files.')
parser.add_argument('-n', '--no-cleanup',
dest='no_cleanup',
default=True,
action='store_true',
help='Do not remove dirs with JSONs for individual header files.')
parser.add_argument('--cstdlib-headers',
dest='cstdlib_headers',
help='Set path to the C standard library headers with high-priority types info.')
parser.add_argument('--linux-headers',
dest='linux_headers',
help='Set path to the Linux headers with high-priority types info.')
return parser.parse_args()
args = parse_args()
#
# Prints the given error message ($1) to stderr and exits.
#
def print_error_and_die (error) :
sys.stderr.write('Error: ' + error)
sys.exit(1)
def remove_dir(path):
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path)
elif os.path.exists(path):
os.remove(path)
#
# Initial cleanup.
#
remove_dir(STD_LIB_OUT_DIR)
os.mkdir(STD_LIB_OUT_DIR)
remove_dir(LINUX_OUT_DIR)
os.mkdir(LINUX_OUT_DIR)
remove_dir(CSTDLIB_PRIORITY_OUT_DIR)
os.mkdir(CSTDLIB_PRIORITY_OUT_DIR)
remove_dir(LINUX_PRIORITY_OUT_DIR)
os.mkdir(LINUX_PRIORITY_OUT_DIR)
#
# Generate JSONs for whole /usr/include path.
# Filter out unwanted headers.
# Move standard headers to other dir.
#
if args.file_filter:
FILES_FILTER += '|' + args.file_filter
subprocess.call([EXTRACTOR, INCLUDE_DIR, '-o', LINUX_OUT_DIR], shell = True)
FILES_FILTER = (FILES_FILTER//\//_)
subprocess.call(['find', LINUX_OUT_DIR + '/', '-regex', LINUX_OUT_DIR + '/.*\(' + FILES_FILTER + '\).*', '-delete'], shell = True)
#
# Move standard library headers to other directory.
# Edit standard header paths to look like type-extractor generated jsons.
#
for header in CSTDLIB_HEADERS:
for f in os.popen('find \'' + INCLUDE_DIR + '\' -name \'' + header + '\'').read().rip('\n'):
f = (f#INCLUDE_DIR)
f = (f////_)
f = (f/%\.h/.json)
if os.path.isfile(LINUX_OUT_DIR + '/' + f):
shutil.move(LINUX_OUT_DIR + '/' + f, STD_LIB_OUT_DIR)
#
# Extract types info from high-priority cstdlib and linux headers if paths were given.
#
if args.cstdlib_headers:
subprocess.call([EXTRACTOR, args.cstdlib_headers, '-o', CSTDLIB_PRIORITY_OUT_DIR], shell = True)
if args.linux_headers:
subprocess.call([EXTRACTOR, args.linux_headers, '-o', LINUX_PRIORITY_OUT_DIR], shell = True)
#
# Merging.
# Priority headers must be first.
# Cstdlib priority headers are merged to the C standard library JSON,
# Linux priority headers to the Linux JSON.
#
subprocess.call([MERGER, CSTDLIB_PRIORITY_OUT_DIR, STD_LIB_OUT_DIR, '-o', STD_LIB_JSON, '--json-indent', args.json_indent], shell = True)
subprocess.call([MERGER, LINUX_PRIORITY_OUT_DIR, LINUX_OUT_DIR, '-o', LINUX_JSON, '--json-indent', args.json_indent], shell = True)
#
# Optional cleanup at the end.
#
if not args.no_cleanup:
remove_dir(STD_LIB_OUT_DIR)
remove_dir(LINUX_OUT_DIR)
remove_dir(args.cstdlib_headers)
remove_dir(CSTDLIB_PRIORITY_OUT_DIR)
remove_dir(args.linux_headers)

View File

@ -0,0 +1,194 @@
#!/usr/bin/env python3
"""Generates JSON files from includes in Windows SDK and Windows Drivers Kit."""
import argparse
import shutil
import sys
import os
import subprocess
#
# Paths.
#
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SCRIPT_NAME = __name__
EXTRACTOR = os.path.join(SCRIPT_DIR, 'extract_types.py')
MERGER = os.path.join(SCRIPT_DIR, 'merge_jsons.py')
OUT_DIR = '.'
#
# Windows SDK paths.
#
WIN_UCRT_OUT_DIR = os.path.join(OUT_DIR, 'windows_ucrt')
WIN_SHARED_OUT_DIR = os.path.join(OUT_DIR, 'windows_shared')
WIN_UM_OUT_DIR = os.path.join(OUT_DIR, 'windows_um')
WIN_WINRT_OUT_DIR = os.path.join(OUT_DIR, 'windows_winrt')
WIN_NETFX_OUT_DIR = os.path.join(OUT_DIR, 'windows_netfx')
WIN_OUT_JSON = os.path.join(OUT_DIR, 'windows.json')
WIN_OUT_JSON_WITH_UNUSED_TYPES = os.path.join(OUT_DIR, 'windows_all_types.json')
#
# Windows Drivers Kit paths.
#
WDK_KM_OUT_DIR = os.path.join(OUT_DIR, 'windrivers_km')
WDK_MMOS_OUT_DIR = os.path.join(OUT_DIR, 'windrivers_mmos')
WDK_SHARED_OUT_DIR = os.path.join(OUT_DIR, 'windrivers_shared')
WDK_UM_OUT_DIR = os.path.join(OUT_DIR, 'windrivers_um')
WDK_KMDF_OUT_DIR = os.path.join(OUT_DIR, 'windrivers_kmdf')
WDK_UMDF_OUT_DIR = os.path.join(OUT_DIR, 'windrivers_umdf')
WDK_OUT_JSON = os.path.join(OUT_DIR, 'windrivers.json')
def parse_args():
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('-i', '--json-indent',
dest='json_indent',
default=1,
help='Set indentation in JSON files.')
parser.add_argument('-n', '--no-cleanup',
dest='no_cleanup',
default=True,
help='Do not remove dirs with JSONs for individual header files.')
parser.add_argument('--sdk',
dest='sdk',
required=True,
help='SDK dir')
parser.add_argument('--wdk',
dest='wdk',
required=True,
help='WDK dir')
return parser.parse_args()
args = parse_args()
#
# Prints the given error message ($1) to stderr and exits.
#
def print_error_and_die(error):
sys.stderr.write('Error: ' + error)
sys.exit(1)
def remove_dir(path):
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path)
elif os.path.exists(path):
os.remove(path)
#
# Removes temporary dirs and files used to generate JSONS that are merged later.
#
def remove_tmp_dirs_and_files():
remove_dir(WIN_UCRT_OUT_DIR)
remove_dir(WIN_SHARED_OUT_DIR)
remove_dir(WIN_UM_OUT_DIR)
remove_dir(WIN_WINRT_OUT_DIR)
remove_dir(WIN_NETFX_OUT_DIR)
remove_dir(WIN_OUT_JSON_WITH_UNUSED_TYPES)
remove_dir(WDK_KM_OUT_DIR)
remove_dir(WDK_MMOS_OUT_DIR)
remove_dir(WDK_SHARED_OUT_DIR)
remove_dir(WDK_UM_OUT_DIR)
remove_dir(WDK_KMDF_OUT_DIR)
remove_dir(WDK_UMDF_OUT_DIR)
WIN_UCRT_IN_DIR = os.path.join(args.sdk, '10', 'Include', '10.0.10150.0', 'ucrt')
WIN_SHARED_IN_DIR = os.path.join(args.sdk, '10', 'Include', '10.0.10240.0', 'shared')
WIN_UM_IN_DIR = os.path.join(args.sdk, '10', 'Include', '10.0.10240.0', 'um')
WIN_WINRT_IN_DIR = os.path.join(args.sdk, '10', 'Include', '10.0.10240.0', 'winrt')
WIN_NETFX_IN_DIR = os.path.join(args.sdk, 'NETFXSDK', '4.6', 'Include', 'um')
WDK_KM_IN_DIR = os.path.join(args.wdk, '10.0.10586.0', 'km')
WDK_MMOS_IN_DIR = os.path.join(args.wdk, '10.0.10586.0', 'mmos')
WDK_SHARED_IN_DIR = os.path.join(args.wdk, '10.0.10586.0', 'shared')
WDK_UM_IN_DIR = os.path.join(args.wdk, '10.0.10586.0', 'um')
WDK_KMDF_IN_DIR = os.path.join(args.wdk, 'wdf', 'kmdf')
WDK_UMDF_IN_DIR = os.path.join(args.wdk, 'wdf', 'umdf')
#
# Initial cleanup.
#
remove_tmp_dirs_and_files()
os.makedirs(WIN_UCRT_OUT_DIR, exist_ok=True)
os.makedirs(WIN_SHARED_OUT_DIR, exist_ok=True)
os.makedirs(WIN_UM_OUT_DIR, exist_ok=True)
os.makedirs(WIN_WINRT_OUT_DIR, exist_ok=True)
os.makedirs(WIN_NETFX_OUT_DIR, exist_ok=True)
os.makedirs(WDK_KM_OUT_DIR, exist_ok=True)
os.makedirs(WDK_MMOS_OUT_DIR, exist_ok=True)
os.makedirs(WDK_SHARED_OUT_DIR, exist_ok=True)
os.makedirs(WDK_UM_OUT_DIR, exist_ok=True)
os.makedirs(WDK_KMDF_OUT_DIR, exist_ok=True)
os.makedirs(WDK_UMDF_OUT_DIR, exist_ok=True)
#
# Parse the includes in the given Windows SDK directory and merge the generated
# JSON files.
#
subprocess.call([sys.executable, EXTRACTOR, WIN_UCRT_IN_DIR, '-o', WIN_UCRT_OUT_DIR], shell=True)
subprocess.call([sys.executable, EXTRACTOR, WIN_SHARED_IN_DIR, '-o', WIN_SHARED_OUT_DIR], shell=True)
subprocess.call([sys.executable, EXTRACTOR, WIN_UM_IN_DIR, '-o', WIN_UM_OUT_DIR], shell=True)
subprocess.call([sys.executable, EXTRACTOR, WIN_WINRT_IN_DIR, '-o', WIN_WINRT_OUT_DIR], shell=True)
subprocess.call([sys.executable, EXTRACTOR, WIN_NETFX_IN_DIR, '-o', WIN_NETFX_OUT_DIR], shell=True)
subprocess.call([sys.executable, MERGER, WIN_SHARED_OUT_DIR, WIN_UM_OUT_DIR, WIN_UCRT_OUT_DIR, WIN_WINRT_OUT_DIR,
WIN_NETFX_OUT_DIR, '-o', WIN_OUT_JSON, '--json-indent', args.json_indent], shell=True)
#
# Parse the includes in the given WDK directory and merge the generated
# JSON files.
#
subprocess.call([sys.executable, EXTRACTOR, WDK_KM_IN_DIR, '-o', WDK_KM_OUT_DIR], shell=True)
subprocess.call([sys.executable, EXTRACTOR, WDK_MMOS_IN_DIR, '-o', WDK_MMOS_OUT_DIR], shell=True)
subprocess.call([sys.executable, EXTRACTOR, WDK_SHARED_IN_DIR, '-o', WDK_SHARED_OUT_DIR], shell=True)
subprocess.call([sys.executable, EXTRACTOR, WDK_UM_IN_DIR, '-o', WDK_UM_OUT_DIR], shell=True)
for d in os.listdir(WDK_KMDF_IN_DIR):
subprocess.call([sys.executable, EXTRACTOR, os.path.join(WDK_KMDF_IN_DIR, d), '-o', WDK_KMDF_OUT_DIR], shell=True)
for d in os.listdir(WDK_UMDF_IN_DIR):
subprocess.call([sys.executable, EXTRACTOR, os.path.join(WDK_UMDF_IN_DIR, d), '-o', WDK_UMDF_OUT_DIR], shell=True)
subprocess.call([sys.executable, MERGER, WDK_SHARED_OUT_DIR, WDK_UM_OUT_DIR, WDK_KM_OUT_DIR, WDK_MMOS_OUT_DIR,
WDK_KMDF_OUT_DIR, WDK_UMDF_OUT_DIR, '-o', WDK_OUT_JSON, '--json-indent', args.json_indent],
shell=True)
#
# WDK uses many types defined in Windows SDK. We need SDK JSON with all types extracted
# and merge it with WDK. SDK functions must be removed!
#
subprocess.call([sys.executable, MERGER, WIN_SHARED_OUT_DIR, WIN_UM_OUT_DIR, WIN_UCRT_OUT_DIR, WIN_WINRT_OUT_DIR,
WIN_NETFX_OUT_DIR, '-o', WIN_OUT_JSON_WITH_UNUSED_TYPES, '--json-indent', args.json_indent,
'--keep-unused-types'], shell=True)
if args.json_indent == 0:
subprocess.call(['sed', '-i', '-e', 's/^.*\}, \'types\': \{/\{\'functions\': \{\}, \'types\': \{/',
WIN_OUT_JSON_WITH_UNUSED_TYPES], shell=True)
else:
TYPES_LINE_NUMBER = 0 # (os.popen('egrep -n \'^s*'types': {\' \''+(WIN_OUT_JSON_WITH_UNUSED_TYPES)+'\' | cut -f1 -d:').read().rip('\n'))
TYPES_LINE_NUMBER = (TYPES_LINE_NUMBER - 1)
subprocess.call(['sed', '-i', '-e', '1,' + TYPES_LINE_NUMBER + ' d', WIN_OUT_JSON_WITH_UNUSED_TYPES], shell=True)
subprocess.call(['sed', '-i', '-e', '1s/^/\{\'functions\': \{\},\n/', WIN_OUT_JSON_WITH_UNUSED_TYPES], shell=True)
subprocess.call(
[sys.executable, MERGER, WDK_OUT_JSON, WIN_OUT_JSON_WITH_UNUSED_TYPES, '-o', WDK_OUT_JSON, '--json-indent',
args.json_indent],
shell=True)
#
# Optional cleanup at the end.
#
if not args.no_cleanup:
remove_tmp_dirs_and_files()

View File

@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""Compile and install tool signatures.
Usage: compile-yara.py yarac-path source-path install-path
"""
import os
import shutil
import subprocess
import sys
###############################################################################
def print_error_and_die(error):
"""Print error and exit with non-zero value.
error - error message.
"""
print('Error: %s.' % error)
sys.exit(1)
def compile_files(input_folder, output_folder):
"""Compile yara signatures.
input_folder - input folder
output_folder - output file
"""
p = subprocess.Popen([yarac, '-w', input_folder + '/*.yara', output_folder])
out, _ = p.communicate()
if p.returncode != 0:
print_error_and_die('yarac failed during compilation of file' + input_folder)
# Check for errors in output - yarac returns 0 when it should not.
if 'error' in out:
print_error_and_die('yarac failed during compilation of file ' + input_folder)
###############################################################################
if len(sys.argv) < 2:
print_error_and_die('missing path to yarac')
yarac = sys.argv[1]
if len(sys.argv) < 3:
print_error_and_die('missing path to rules folder')
rules_dir = sys.argv[2]
if len(sys.argv) < 4:
print_error_and_die('missing path to install folder')
install_dir = sys.argv[3]
###############################################################################
# Directory paths.
rules_dir = os.path.join(rules_dir, 'support', 'yara_patterns', 'tools')
install_dir = os.path.join(install_dir, 'share', 'retdec', 'support', 'generic', 'yara_patterns', 'tools')
###############################################################################
# Remove old files if present.
if os.path.isfile(install_dir) or os.path.islink(install_dir):
os.unlink(install_dir)
else:
shutil.rmtree(install_dir, ignore_errors=True)
# Prepare directory structure.
os.makedirs(os.path.join(install_dir, 'pe'), exist_ok=True)
os.makedirs(os.path.join(install_dir, 'elf'), exist_ok=True)
os.makedirs(os.path.join(install_dir, 'macho'), exist_ok=True)
###############################################################################
print('compiling yara signatures...')
# Compile PE32 signatures.
compile_files(os.path.join(rules_dir, 'pe', 'x86'), os.path.join(install_dir, 'pe', 'x86.yarac'))
compile_files(os.path.join(rules_dir, 'pe', 'arm'), os.path.join(install_dir, 'pe', 'arm.yarac'))
# Compile PE32+ signatures.
compile_files(os.path.join(rules_dir, 'pe', 'x64'), os.path.join(install_dir, 'pe', 'x64.yarac'))
# Compile ELF signatures.
compile_files(os.path.join(rules_dir, 'elf', 'x86'), os.path.join(install_dir, 'elf', 'x86.yarac'))
compile_files(os.path.join(rules_dir, 'elf', 'arm'), os.path.join(install_dir, 'elf', 'arm.yarac'))
compile_files(os.path.join(rules_dir, 'elf', 'ppc'), os.path.join(install_dir, 'elf', 'ppc.yarac'))
compile_files(os.path.join(rules_dir, 'elf', 'mips'), os.path.join(install_dir, 'elf', 'mips.yarac'))
# Compile ELF64 signatures.
compile_files(os.path.join(rules_dir, 'elf', 'x64'), os.path.join(install_dir, 'elf', 'x64.yarac'))
compile_files(os.path.join(rules_dir, 'elf', 'arm64'), os.path.join(install_dir, 'elf', 'arm64.yarac'))
compile_files(os.path.join(rules_dir, 'elf', 'ppc64'), os.path.join(install_dir, 'elf', 'ppc64.yarac'))
compile_files(os.path.join(rules_dir, 'elf', 'mips64'), os.path.join(install_dir, 'elf', 'mips64.yarac'))
# Compile Mach-O signatures.
compile_files(os.path.join(rules_dir, 'macho', 'x86'), os.path.join(install_dir, 'macho', 'x86.yarac'))
compile_files(os.path.join(rules_dir, 'macho', 'arm'), os.path.join(install_dir, 'macho', 'arm.yarac'))
compile_files(os.path.join(rules_dir, 'macho', 'ppc'), os.path.join(install_dir, 'macho', 'ppc.yarac'))
# Compile 64-bit Mach-O signatures.
compile_files(os.path.join(rules_dir, 'macho', 'x64'), os.path.join(install_dir, 'macho', 'x64.yarac'))
compile_files(os.path.join(rules_dir, 'macho', 'ppc64'), os.path.join(install_dir, 'macho', 'ppc64.yarac'))
print('signatures compiled successfully')
sys.exit(0)