Merge branch 'master' into sign

This commit is contained in:
erev0s 2024-09-23 09:02:34 +03:00 committed by GitHub
commit c2f6cd9bbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 579 additions and 445 deletions

3
.github/FUNDING.yml vendored Normal file
View File

@ -0,0 +1,3 @@
# These are supported funding model platforms
github: [androguard]

32
.github/workflows/tests_mac_arm.yml vendored Normal file
View File

@ -0,0 +1,32 @@
name: Run Tests on Mac ARM
on:
workflow_dispatch:
jobs:
build:
runs-on: macos-14
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.10'
- name: Install Poetry
run: |
pip install poetry
poetry config virtualenvs.create false
- name: Install Dependencies
run: poetry install
- name: Run unittest tests
run: poetry run python -m unittest discover -s tests -p 'test_a*.py'
- name: Build with Poetry
run: |
poetry build

View File

@ -6,6 +6,7 @@
![PyPI - Version](https://img.shields.io/pypi/v/androguard)
![Static Badge](https://img.shields.io/badge/Documentation-InProgress-red)
New tool: Goauld [Dynamic injection tool for Linux/Android ](https://github.com/androguard/goauld)
## Installation
Quick installation:

View File

@ -11,7 +11,6 @@ from loguru import logger
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters.terminal import TerminalFormatter
from oscrypto import asymmetric
# internal modules
from androguard.core.axml import ARSCParser
@ -22,6 +21,7 @@ from androguard.core.axml import AXMLPrinter
from androguard.core.dex import get_bytecodes_method
from androguard.util import readFile
from androguard.ui import DynamicUI
from androguard.util import parse_public, calculate_fingerprint
def androaxml_main(
inp:str,
@ -375,12 +375,12 @@ def androsign_main(args_apk:list[str], args_hash:str, args_all:bool, show:bool)
for public_key in pkeys:
if show:
x509_public_key = asymmetric.load_public_key(public_key)
print("PublicKey Algorithm:", x509_public_key.algorithm)
print("Bit Size:", x509_public_key.bit_size)
print("Fingerprint:", binascii.hexlify(x509_public_key.fingerprint))
parsed_key = parse_public(public_key)
print(f"Algorithm: {parsed_key.algorithm}")
print(f"Bit size: {parsed_key.bit_size}")
print(f"Fingerprint: {calculate_fingerprint(parsed_key).hex()}")
try:
print("Hash Algorithm:", x509_public_key.asn1.hash_algo)
print(f"Hash Algorithm: {parsed_key.hash_algo}")
except ValueError as ve:
# RSA pkey does not have a hash algorithm
pass

View File

@ -1356,7 +1356,7 @@ class APK:
"Unknown permission from android reference"]
else:
# Is there a valid case not belonging to the above two?
logger.error(f"Unknown permission {i}")
logger.info(f"Unknown permission {i}")
return self._fill_deprecated_permissions(l)
def get_requested_aosp_permissions(self) -> list[str]:

View File

@ -445,7 +445,7 @@ class AXMLParser:
# The file can still be parsed up to the point where the chunk should end.
self.axml_tampered = True
logger.warning("Declared filesize ({}) is smaller than total file size ({}). "
"Was something appended to the file? Trying to parse it anyways.".format(self.filesize, self.buff.size()))
"Was something appended to the file? Trying to parse it anyways.".format(self.filesize, self.buff_size))
# Not that severe of an error, we have plenty files where this is not
# set correctly
@ -843,7 +843,7 @@ class AXMLParser:
res = self.sb[name]
# If the result is a (null) string, we need to look it up.
if name <= len(self.m_resourceIDs):
if name < len(self.m_resourceIDs):
attr = self.m_resourceIDs[name]
if attr in public.SYSTEM_RESOURCES['attributes']['inverse']:
res = public.SYSTEM_RESOURCES['attributes']['inverse'][attr].replace("_",
@ -976,6 +976,9 @@ class AXMLPrinter:
logger.debug("DEBUG ARSC TYPE {}".format(_type))
if _type == START_TAG:
if not self.axml.name: # Check if the name is empty
logger.debug("Empty tag name, skipping to next element")
continue # Skip this iteration
uri = self._print_namespace(self.axml.namespace)
uri, name = self._fix_name(uri, self.axml.name)
tag = "{}{}".format(uri, name)
@ -1024,6 +1027,10 @@ class AXMLPrinter:
if _type == END_TAG:
if not cur:
logger.warning("Too many END_TAG! No more elements available to attach to!")
else:
if not self.axml.name: # Check if the name is empty
logger.debug("Empty tag name at END_TAG, skipping to next element")
continue
name = self.axml.name
uri = self._print_namespace(self.axml.namespace)
@ -1407,7 +1414,7 @@ class ARSCParser:
self.buff_size = self.buff.raw.getbuffer().nbytes
if self.buff_size < 8 or self.buff_size > 0xFFFFFFFF:
raise ResParserError("Invalid file size {} for a resources.arsc file!".format(self.buff.size()))
raise ResParserError("Invalid file size {} for a resources.arsc file!".format(self.buff_size))
self.analyzed = False
self._resolved_strings = None

View File

@ -8262,7 +8262,7 @@ class DEX:
:rtype: a list with all strings used in the format (types, names ...)
"""
return [i.get() for i in self.strings]
return [i.get() for i in self.strings] if self.strings is not None else []
def get_len_strings(self) -> int:
"""

View File

@ -22,397 +22,6 @@ from androguard.core.dex.dex_types import TYPE_DESCRIPTOR
from loguru import logger
def array_access(arr, ind) -> list:
return ['ArrayAccess', [arr, ind]]
def array_creation(tn, params, dim):
return ['ArrayCreation', [tn] + params, dim]
def array_initializer(params, tn=None):
return ['ArrayInitializer', params, tn]
def assignment(lhs, rhs, op=''):
return ['Assignment', [lhs, rhs], op]
def binary_infix(op, left, right):
return ['BinaryInfix', [left, right], op]
def cast(tn, arg):
return ['Cast', [tn, arg]]
def field_access(triple, left):
return ['FieldAccess', [left], triple]
def literal(result, tt):
return ['Literal', result, tt]
def local(name):
return ['Local', name]
def method_invocation(triple, name, base, params):
if base is None:
return ['MethodInvocation', params, triple, name, False]
return ['MethodInvocation', [base] + params, triple, name, True]
def parenthesis(expr):
return ['Parenthesis', [expr]]
def typen(baset: str, dim: int) -> list:
return ['TypeName', (baset, dim)]
def unary_prefix(op, left):
return ['Unary', [left], op, False]
def unary_postfix(left, op):
return ['Unary', [left], op, True]
def var_decl(typen, var):
return [typen, var]
def dummy(*args):
return ['Dummy', args]
################################################################################
def expression_stmt(expr):
return ['ExpressionStatement', expr]
def local_decl_stmt(expr, decl):
return ['LocalDeclarationStatement', expr, decl]
def return_stmt(expr):
return ['ReturnStatement', expr]
def throw_stmt(expr):
return ['ThrowStatement', expr]
def jump_stmt(keyword):
return ['JumpStatement', keyword, None]
def loop_stmt(isdo, cond_expr, body):
type_ = 'DoStatement' if isdo else 'WhileStatement'
return [type_, None, cond_expr, body]
def try_stmt(tryb, pairs):
return ['TryStatement', None, tryb, pairs]
def if_stmt(cond_expr, scopes):
return ['IfStatement', None, cond_expr, scopes]
def switch_stmt(cond_expr, ksv_pairs):
return ['SwitchStatement', None, cond_expr, ksv_pairs]
# Create empty statement block (statements to be appended later)
# Note, the code below assumes this can be modified in place
def statement_block():
return ['BlockStatement', None, []]
# Add a statement to the end of a statement block
def _append(sb, stmt):
assert (sb[0] == 'BlockStatement')
if stmt is not None:
sb[2].append(stmt)
def parse_descriptor(desc: str) -> list:
dim = 0
while desc and desc[0] == '[':
desc = desc[1:]
dim += 1
if desc in TYPE_DESCRIPTOR:
return typen('.' + TYPE_DESCRIPTOR[desc], dim)
if desc and desc[0] == 'L' and desc[-1] == ';':
return typen(desc[1:-1], dim)
# invalid descriptor (probably None)
return dummy(str(desc))
# Note: the literal_foo functions (and dummy) are also imported by decompile.py
def literal_string(s):
return literal(str(s), ('java/lang/String', 0))
def literal_class(desc):
return literal(parse_descriptor(desc), ('java/lang/Class', 0))
def literal_bool(b):
return literal(str(b).lower(), ('.boolean', 0))
def literal_int(b):
return literal(str(b), ('.int', 0))
def literal_hex_int(b):
return literal(hex(b), ('.int', 0))
def literal_long(b):
return literal(str(b) + 'L', ('.long', 0))
def literal_float(f):
return literal(str(f) + 'f', ('.float', 0))
def literal_double(f):
return literal(str(f), ('.double', 0))
def literal_null():
return literal('null', ('.null', 0))
def visit_decl(var, init_expr=None):
t = parse_descriptor(var.get_type())
v = local('v{}'.format(var.name))
return local_decl_stmt(init_expr, var_decl(t, v))
def visit_arr_data(value):
data = value.get_data()
tab = []
elem_size = value.element_width
if elem_size == 4:
for i in range(0, value.size * 4, 4):
tab.append(struct.unpack('<i', data[i:i + 4])[0])
else: # FIXME: other cases
for i in range(value.size):
tab.append(data[i])
return array_initializer(list(map(literal_int, tab)))
def write_inplace_if_possible(lhs, rhs):
if isinstance(rhs, instruction.BinaryExpression) and lhs == rhs.var_map[rhs.arg1]:
exp_rhs = rhs.var_map[rhs.arg2]
# post increment/decrement
if rhs.op in '+-' and isinstance(exp_rhs, instruction.Constant) and exp_rhs.get_int_value() == 1:
return unary_postfix(visit_expr(lhs), rhs.op * 2)
# compound assignment
return assignment(visit_expr(lhs), visit_expr(exp_rhs), op=rhs.op)
return assignment(visit_expr(lhs), visit_expr(rhs))
def visit_expr(op):
if isinstance(op, instruction.ArrayLengthExpression):
expr = visit_expr(op.var_map[op.array])
return field_access([None, 'length', None], expr)
if isinstance(op, instruction.ArrayLoadExpression):
array_expr = visit_expr(op.var_map[op.array])
index_expr = visit_expr(op.var_map[op.idx])
return array_access(array_expr, index_expr)
if isinstance(op, instruction.ArrayStoreInstruction):
array_expr = visit_expr(op.var_map[op.array])
index_expr = visit_expr(op.var_map[op.index])
rhs = visit_expr(op.var_map[op.rhs])
return assignment(array_access(array_expr, index_expr), rhs)
if isinstance(op, instruction.AssignExpression):
lhs = op.var_map.get(op.lhs)
rhs = op.rhs
if lhs is None:
return visit_expr(rhs)
return write_inplace_if_possible(lhs, rhs)
if isinstance(op, instruction.BaseClass):
if op.clsdesc is None:
assert (op.cls == "super")
return local(op.cls)
return parse_descriptor(op.clsdesc)
if isinstance(op, instruction.BinaryExpression):
lhs = op.var_map.get(op.arg1)
rhs = op.var_map.get(op.arg2)
expr = binary_infix(op.op, visit_expr(lhs), visit_expr(rhs))
if not isinstance(op, instruction.BinaryCompExpression):
expr = parenthesis(expr)
return expr
if isinstance(op, instruction.CheckCastExpression):
lhs = op.var_map.get(op.arg)
return parenthesis(cast(parse_descriptor(op.clsdesc), visit_expr(lhs)))
if isinstance(op, instruction.ConditionalExpression):
lhs = op.var_map.get(op.arg1)
rhs = op.var_map.get(op.arg2)
return binary_infix(op.op, visit_expr(lhs), visit_expr(rhs))
if isinstance(op, instruction.ConditionalZExpression):
arg = op.var_map[op.arg]
if isinstance(arg, instruction.BinaryCompExpression):
arg.op = op.op
return visit_expr(arg)
expr = visit_expr(arg)
atype = str(arg.get_type())
if atype == 'Z':
if op.op == opcode_ins.Op.EQUAL:
expr = unary_prefix('!', expr)
elif atype in 'VBSCIJFD':
expr = binary_infix(op.op, expr, literal_int(0))
else:
expr = binary_infix(op.op, expr, literal_null())
return expr
if isinstance(op, instruction.Constant):
if op.type == 'Ljava/lang/String;':
return literal_string(op.cst)
elif op.type == 'Z':
return literal_bool(op.cst == 0)
elif op.type in 'ISCB':
return literal_int(op.cst2)
elif op.type in 'J':
return literal_long(op.cst2)
elif op.type in 'F':
return literal_float(op.cst)
elif op.type in 'D':
return literal_double(op.cst)
elif op.type == 'Ljava/lang/Class;':
return literal_class(op.clsdesc)
return dummy('??? Unexpected constant: ' + str(op.type))
if isinstance(op, instruction.FillArrayExpression):
array_expr = visit_expr(op.var_map[op.reg])
rhs = visit_arr_data(op.value)
return assignment(array_expr, rhs)
if isinstance(op, instruction.FilledArrayExpression):
tn = parse_descriptor(op.type)
params = [visit_expr(op.var_map[x]) for x in op.args]
return array_initializer(params, tn)
if isinstance(op, instruction.InstanceExpression):
triple = op.clsdesc[1:-1], op.name, op.ftype
expr = visit_expr(op.var_map[op.arg])
return field_access(triple, expr)
if isinstance(op, instruction.InstanceInstruction):
triple = op.clsdesc[1:-1], op.name, op.atype
lhs = field_access(triple, visit_expr(op.var_map[op.lhs]))
rhs = visit_expr(op.var_map[op.rhs])
return assignment(lhs, rhs)
if isinstance(op, instruction.InvokeInstruction):
base = op.var_map[op.base]
params = [op.var_map[arg] for arg in op.args]
params = list(map(visit_expr, params))
if op.name == '<init>':
if isinstance(base, instruction.ThisParam):
keyword = 'this' if base.type[1:-1] == op.triple[0] else 'super'
return method_invocation(op.triple, keyword, None, params)
elif isinstance(base, instruction.NewInstance):
return ['ClassInstanceCreation', op.triple, params,
parse_descriptor(base.type)]
else:
assert (isinstance(base, instruction.Variable))
# fallthrough to create dummy <init> call
return method_invocation(op.triple, op.name, visit_expr(base), params)
# for unmatched monitor instructions, just create dummy expressions
if isinstance(op, instruction.MonitorEnterExpression):
return dummy("monitor enter(", visit_expr(op.var_map[op.ref]), ")")
if isinstance(op, instruction.MonitorExitExpression):
return dummy("monitor exit(", visit_expr(op.var_map[op.ref]), ")")
if isinstance(op, instruction.MoveExpression):
lhs = op.var_map.get(op.lhs)
rhs = op.var_map.get(op.rhs)
return write_inplace_if_possible(lhs, rhs)
if isinstance(op, instruction.MoveResultExpression):
lhs = op.var_map.get(op.lhs)
rhs = op.var_map.get(op.rhs)
return assignment(visit_expr(lhs), visit_expr(rhs))
if isinstance(op, instruction.NewArrayExpression):
tn = parse_descriptor(op.type[1:])
expr = visit_expr(op.var_map[op.size])
return array_creation(tn, [expr], 1)
# create dummy expression for unmatched newinstance
if isinstance(op, instruction.NewInstance):
return dummy("new ", parse_descriptor(op.type))
if isinstance(op, instruction.Param):
if isinstance(op, instruction.ThisParam):
return local('this')
return local('p{}'.format(op.v))
if isinstance(op, instruction.StaticExpression):
triple = op.clsdesc[1:-1], op.name, op.ftype
return field_access(triple, parse_descriptor(op.clsdesc))
if isinstance(op, instruction.StaticInstruction):
triple = op.clsdesc[1:-1], op.name, op.ftype
lhs = field_access(triple, parse_descriptor(op.clsdesc))
rhs = visit_expr(op.var_map[op.rhs])
return assignment(lhs, rhs)
if isinstance(op, instruction.SwitchExpression):
return visit_expr(op.var_map[op.src])
if isinstance(op, instruction.UnaryExpression):
lhs = op.var_map.get(op.arg)
if isinstance(op, instruction.CastExpression):
expr = cast(parse_descriptor(op.clsdesc), visit_expr(lhs))
else:
expr = unary_prefix(op.op, visit_expr(lhs))
return parenthesis(expr)
if isinstance(op, instruction.Variable):
# assert(op.declared)
return local('v{}'.format(op.name))
return dummy('??? Unexpected op: ' + type(op).__name__)
def visit_ins(op, isCtor=False):
if isinstance(op, instruction.ReturnInstruction):
expr = None if op.arg is None else visit_expr(op.var_map[op.arg])
return return_stmt(expr)
elif isinstance(op, instruction.ThrowExpression):
return throw_stmt(visit_expr(op.var_map[op.ref]))
elif isinstance(op, instruction.NopExpression):
return None
# Local var decl statements
if isinstance(op, (instruction.AssignExpression, instruction.MoveExpression,
instruction.MoveResultExpression)):
lhs = op.var_map.get(op.lhs)
rhs = op.rhs if isinstance(
op, instruction.AssignExpression) else op.var_map.get(op.rhs)
if isinstance(lhs, instruction.Variable) and not lhs.declared:
lhs.declared = True
expr = visit_expr(rhs)
return visit_decl(lhs, expr)
# skip this() at top of constructors
if isCtor and isinstance(op, instruction.AssignExpression):
op2 = op.rhs
if op.lhs is None and isinstance(op2, instruction.InvokeInstruction):
if op2.name == '<init>' and len(op2.args) == 0:
if isinstance(op2.var_map[op2.base], instruction.ThisParam):
return None
# MoveExpression is skipped when lhs = rhs
if isinstance(op, instruction.MoveExpression):
if op.var_map.get(op.lhs) is op.var_map.get(op.rhs):
return None
return expression_stmt(visit_expr(op))
class JSONWriter:
def __init__(self, graph, method):
self.graph = graph
@ -436,7 +45,7 @@ class JSONWriter:
# which pushes a statement block on to the context stack and assigns it to foo
# within the with block, all added instructions will be added to foo
def __enter__(self):
self.context.append(statement_block())
self.context.append(self.statement_block())
return self.context[-1]
def __exit__(self, *args):
@ -445,10 +54,10 @@ class JSONWriter:
# Add a statement to the current context
def add(self, val):
_append(self.context[-1], val)
self._append(self.context[-1], val)
def visit_ins(self, op):
self.add(visit_ins(op, isCtor=self.constructor))
self.add(self._visit_ins(op, isCtor=self.constructor))
# Note: this is a mutating operation
def get_ast(self):
@ -470,9 +79,9 @@ class JSONWriter:
paramdecls = []
for ptype, name in zip(m.params_type, params):
t = parse_descriptor(ptype)
v = local('p{}'.format(name))
paramdecls.append(var_decl(t, v))
t = self.parse_descriptor(ptype)
v = self.local('p{}'.format(name))
paramdecls.append(self.var_decl(t, v))
if self.graph is None:
body = None
@ -483,7 +92,7 @@ class JSONWriter:
return {
'triple': m.triple,
'flags': flags,
'ret': parse_descriptor(m.type),
'ret': self.parse_descriptor(m.type),
'params': paramdecls,
'comments': [],
'body': body,
@ -492,10 +101,10 @@ class JSONWriter:
def _visit_condition(self, cond):
if cond.isnot:
cond.cond1.neg()
left = parenthesis(self.get_cond(cond.cond1))
right = parenthesis(self.get_cond(cond.cond2))
left = self.parenthesis(self.get_cond(cond.cond1))
right = self.parenthesis(self.get_cond(cond.cond2))
op = '&&' if cond.isand else '||'
res = binary_infix(op, left, right)
res = self.binary_infix(op, left, right)
return res
def get_cond(self, node):
@ -506,7 +115,7 @@ class JSONWriter:
else:
assert (type(node) == basic_blocks.CondBlock)
assert (len(node.ins) == 1)
return visit_expr(node.ins[-1])
return self.visit_expr(node.ins[-1])
def visit_node(self, node):
if node in (self.if_follow[-1], self.switch_follow[-1],
@ -518,7 +127,7 @@ class JSONWriter:
self.visited_nodes.add(node)
for var in node.var_to_declare:
if not var.declared:
self.add(visit_decl(var))
self.add(self.visit_decl(var))
var.declared = True
node.visit(self)
@ -539,7 +148,7 @@ class JSONWriter:
elif loop.looptype.is_endless:
isDo = False
cond_expr = literal_bool(True)
cond_expr = self.literal_bool(True)
with self as body:
self.loop_follow.append(follow)
@ -558,7 +167,7 @@ class JSONWriter:
self.visit_node(loop.latch)
assert (cond_expr is not None and isDo is not None)
self.add(loop_stmt(isDo, cond_expr, body))
self.add(self.loop_stmt(isDo, cond_expr, body))
if follow is not None:
self.visit_node(follow)
@ -568,7 +177,7 @@ class JSONWriter:
follow = cond.follow['if']
if cond.false is cond.true:
self.add(expression_stmt(self.get_cond(cond)))
self.add(self.expression_stmt(self.get_cond(cond)))
self.visit_node(cond.true)
return
@ -579,14 +188,14 @@ class JSONWriter:
if self.loop_follow[-1] in (cond.true, cond.false):
cond_expr = self.get_cond(cond)
with self as scope:
self.add(jump_stmt('break'))
self.add(self.jump_stmt('break'))
scopes.append(scope)
with self as scope:
self.visit_node(cond.false)
scopes.append(scope)
self.add(if_stmt(cond_expr, scopes))
self.add(self.if_stmt(cond_expr, scopes))
elif follow is not None:
if cond.true in (follow, self.next_case) or \
cond.num > cond.true.num:
@ -607,7 +216,7 @@ class JSONWriter:
scopes.append(scope)
self.if_follow.pop()
self.add(if_stmt(cond_expr, scopes))
self.add(self.if_stmt(cond_expr, scopes))
self.visit_node(follow)
else:
cond_expr = self.get_cond(cond)
@ -618,7 +227,7 @@ class JSONWriter:
with self as scope:
self.visit_node(cond.false)
scopes.append(scope)
self.add(if_stmt(cond_expr, scopes))
self.add(self.if_stmt(cond_expr, scopes))
def visit_switch_node(self, switch):
lins = switch.get_ins()
@ -626,7 +235,7 @@ class JSONWriter:
self.visit_ins(ins)
switch_ins = switch.get_ins()[-1]
cond_expr = visit_expr(switch_ins)
cond_expr = self.visit_expr(switch_ins)
ksv_pairs = []
follow = switch.follow['switch']
@ -650,7 +259,7 @@ class JSONWriter:
with self as body:
self.visit_node(node)
if self.need_break:
self.add(jump_stmt('break'))
self.add(self.jump_stmt('break'))
else:
self.need_break = True
ksv_pairs.append((cur_ks, body))
@ -660,7 +269,7 @@ class JSONWriter:
self.visit_node(default)
ksv_pairs.append(([None], body))
self.add(switch_stmt(cond_expr, ksv_pairs))
self.add(self.switch_stmt(cond_expr, ksv_pairs))
self.switch_follow.pop()
self.visit_node(follow)
@ -670,7 +279,7 @@ class JSONWriter:
self.visit_ins(ins)
if len(sucs) == 1:
if sucs[0] is self.loop_follow[-1]:
self.add(jump_stmt('break'))
self.add(self.jump_stmt('break'))
elif sucs[0] is self.next_case:
self.need_break = False
else:
@ -694,13 +303,13 @@ class JSONWriter:
else:
ctype = catch_node.catch_type
name = '_'
catch_decl = var_decl(parse_descriptor(ctype), local(name))
catch_decl = self.var_decl(self.parse_descriptor(ctype), self.local(name))
with self as body:
self.visit_node(catch_node.catch_start)
pairs.append((catch_decl, body))
self.add(try_stmt(tryb, pairs))
self.add(self.try_stmt(tryb, pairs))
self.visit_node(self.try_follow.pop())
def visit_return_node(self, ret):
@ -711,3 +320,387 @@ class JSONWriter:
def visit_throw_node(self, throw):
for ins in throw.get_ins():
self.visit_ins(ins)
def _visit_ins(self, op, isCtor=False):
if isinstance(op, instruction.ReturnInstruction):
expr = None if op.arg is None else self.visit_expr(op.var_map[op.arg])
return self.return_stmt(expr)
elif isinstance(op, instruction.ThrowExpression):
return self.throw_stmt(self.visit_expr(op.var_map[op.ref]))
elif isinstance(op, instruction.NopExpression):
return None
# Local var decl statements
if isinstance(op, (instruction.AssignExpression, instruction.MoveExpression,
instruction.MoveResultExpression)):
lhs = op.var_map.get(op.lhs)
rhs = op.rhs if isinstance(
op, instruction.AssignExpression) else op.var_map.get(op.rhs)
if isinstance(lhs, instruction.Variable) and not lhs.declared:
lhs.declared = True
expr = self.visit_expr(rhs)
return self.visit_decl(lhs, expr)
# skip this() at top of constructors
if isCtor and isinstance(op, instruction.AssignExpression):
op2 = op.rhs
if op.lhs is None and isinstance(op2, instruction.InvokeInstruction):
if op2.name == '<init>' and len(op2.args) == 0:
if isinstance(op2.var_map[op2.base], instruction.ThisParam):
return None
# MoveExpression is skipped when lhs = rhs
if isinstance(op, instruction.MoveExpression):
if op.var_map.get(op.lhs) is op.var_map.get(op.rhs):
return None
return self.expression_stmt(self.visit_expr(op))
def write_inplace_if_possible(self, lhs, rhs):
if isinstance(rhs, instruction.BinaryExpression) and lhs == rhs.var_map[rhs.arg1]:
exp_rhs = rhs.var_map[rhs.arg2]
# post increment/decrement
if rhs.op in '+-' and isinstance(exp_rhs,
instruction.Constant) and exp_rhs.get_int_value() == 1:
return self.unary_postfix(self.visit_expr(lhs), rhs.op * 2)
# compound assignment
return self.assignment(self.visit_expr(lhs), self.visit_expr(exp_rhs), op=rhs.op)
return self.assignment(self.visit_expr(lhs), self.visit_expr(rhs))
def visit_expr(self, op):
if isinstance(op, instruction.ArrayLengthExpression):
expr = self.visit_expr(op.var_map[op.array])
return self.field_access([None, 'length', None], expr)
if isinstance(op, instruction.ArrayLoadExpression):
array_expr = self.visit_expr(op.var_map[op.array])
index_expr = self.visit_expr(op.var_map[op.idx])
return self.array_access(array_expr, index_expr)
if isinstance(op, instruction.ArrayStoreInstruction):
array_expr = self.visit_expr(op.var_map[op.array])
index_expr = self.visit_expr(op.var_map[op.index])
rhs = self.visit_expr(op.var_map[op.rhs])
return self.assignment(self.array_access(array_expr, index_expr), rhs)
if isinstance(op, instruction.AssignExpression):
lhs = op.var_map.get(op.lhs)
rhs = op.rhs
if lhs is None:
return self.visit_expr(rhs)
return self.write_inplace_if_possible(lhs, rhs)
if isinstance(op, instruction.BaseClass):
if op.clsdesc is None:
assert (op.cls == "super")
return self.local(op.cls)
return self.parse_descriptor(op.clsdesc)
if isinstance(op, instruction.BinaryExpression):
lhs = op.var_map.get(op.arg1)
rhs = op.var_map.get(op.arg2)
expr = self.binary_infix(op.op, self.visit_expr(lhs), self.visit_expr(rhs))
if not isinstance(op, instruction.BinaryCompExpression):
expr = self.parenthesis(expr)
return expr
if isinstance(op, instruction.CheckCastExpression):
lhs = op.var_map.get(op.arg)
return self.parenthesis(self.cast(self.parse_descriptor(op.clsdesc),
self.visit_expr(lhs)))
if isinstance(op, instruction.ConditionalExpression):
lhs = op.var_map.get(op.arg1)
rhs = op.var_map.get(op.arg2)
return self.binary_infix(op.op, self.visit_expr(lhs), self.visit_expr(rhs))
if isinstance(op, instruction.ConditionalZExpression):
arg = op.var_map[op.arg]
if isinstance(arg, instruction.BinaryCompExpression):
arg.op = op.op
return self.visit_expr(arg)
expr = self.visit_expr(arg)
atype = str(arg.get_type())
if atype == 'Z':
if op.op == opcode_ins.Op.EQUAL:
expr = self.unary_prefix('!', expr)
elif atype in 'VBSCIJFD':
expr = self.binary_infix(op.op, expr, self.literal_int(0))
else:
expr = self.binary_infix(op.op, expr, self.literal_null())
return expr
if isinstance(op, instruction.Constant):
if op.type == 'Ljava/lang/String;':
return self.literal_string(op.cst)
elif op.type == 'Z':
return self.literal_bool(op.cst == 0)
elif op.type in 'ISCB':
return self.literal_int(op.cst2)
elif op.type in 'J':
return self.literal_long(op.cst2)
elif op.type in 'F':
return self.literal_float(op.cst)
elif op.type in 'D':
return self.literal_double(op.cst)
elif op.type == 'Ljava/lang/Class;':
return self.literal_class(op.clsdesc)
return self.dummy('??? Unexpected constant: ' + str(op.type))
if isinstance(op, instruction.FillArrayExpression):
array_expr = self.visit_expr(op.var_map[op.reg])
rhs = self.visit_arr_data(op.value)
return self.assignment(array_expr, rhs)
if isinstance(op, instruction.FilledArrayExpression):
tn = self.parse_descriptor(op.type)
params = [self.visit_expr(op.var_map[x]) for x in op.args]
return self.array_initializer(params, tn)
if isinstance(op, instruction.InstanceExpression):
triple = op.clsdesc[1:-1], op.name, op.ftype
expr = self.visit_expr(op.var_map[op.arg])
return self.field_access(triple, expr)
if isinstance(op, instruction.InstanceInstruction):
triple = op.clsdesc[1:-1], op.name, op.atype
lhs = self.field_access(triple, self.visit_expr(op.var_map[op.lhs]))
rhs = self.visit_expr(op.var_map[op.rhs])
return self.assignment(lhs, rhs)
if isinstance(op, instruction.InvokeInstruction):
base = op.var_map[op.base]
params = [op.var_map[arg] for arg in op.args]
params = list(map(self.visit_expr, params))
if op.name == '<init>':
if isinstance(base, instruction.ThisParam):
keyword = 'this' if base.type[1:-1] == op.triple[0] else 'super'
return self.method_invocation(op.triple, keyword, None, params)
elif isinstance(base, instruction.NewInstance):
return ['ClassInstanceCreation', op.triple, params,
self.parse_descriptor(base.type)]
else:
assert (isinstance(base, instruction.Variable))
# fallthrough to create dummy <init> call
return self.method_invocation(op.triple, op.name, self.visit_expr(base), params)
# for unmatched monitor instructions, just create dummy expressions
if isinstance(op, instruction.MonitorEnterExpression):
return self.dummy("monitor enter(", self.visit_expr(op.var_map[op.ref]), ")")
if isinstance(op, instruction.MonitorExitExpression):
return self.dummy("monitor exit(", self.visit_expr(op.var_map[op.ref]), ")")
if isinstance(op, instruction.MoveExpression):
lhs = op.var_map.get(op.lhs)
rhs = op.var_map.get(op.rhs)
return self.write_inplace_if_possible(lhs, rhs)
if isinstance(op, instruction.MoveResultExpression):
lhs = op.var_map.get(op.lhs)
rhs = op.var_map.get(op.rhs)
return self.assignment(self.visit_expr(lhs), self.visit_expr(rhs))
if isinstance(op, instruction.NewArrayExpression):
tn = self.parse_descriptor(op.type[1:])
expr = self.visit_expr(op.var_map[op.size])
return self.array_creation(tn, [expr], 1)
# create dummy expression for unmatched newinstance
if isinstance(op, instruction.NewInstance):
return self.dummy("new ", self.parse_descriptor(op.type))
if isinstance(op, instruction.Param):
if isinstance(op, instruction.ThisParam):
return self.local('this')
return self.local('p{}'.format(op.v))
if isinstance(op, instruction.StaticExpression):
triple = op.clsdesc[1:-1], op.name, op.ftype
return self.field_access(triple, self.parse_descriptor(op.clsdesc))
if isinstance(op, instruction.StaticInstruction):
triple = op.clsdesc[1:-1], op.name, op.ftype
lhs = self.field_access(triple, self.parse_descriptor(op.clsdesc))
rhs = self.visit_expr(op.var_map[op.rhs])
return self.assignment(lhs, rhs)
if isinstance(op, instruction.SwitchExpression):
return self.visit_expr(op.var_map[op.src])
if isinstance(op, instruction.UnaryExpression):
lhs = op.var_map.get(op.arg)
if isinstance(op, instruction.CastExpression):
expr = self.cast(self.parse_descriptor(op.clsdesc), self.visit_expr(lhs))
else:
expr = self.unary_prefix(op.op, self.visit_expr(lhs))
return self.parenthesis(expr)
if isinstance(op, instruction.Variable):
# assert(op.declared)
return self.local('v{}'.format(op.name))
return self.dummy('??? Unexpected op: ' + type(op).__name__)
def visit_arr_data(self, value):
data = value.get_data()
tab = []
elem_size = value.element_width
if elem_size == 4:
for i in range(0, value.size * 4, 4):
tab.append(struct.unpack('<i', data[i:i + 4])[0])
else: # FIXME: other cases
for i in range(value.size):
tab.append(data[i])
return self.array_initializer(list(map(self.literal_int, tab)))
def visit_decl(self, var, init_expr=None):
t = self.parse_descriptor(var.get_type())
v = self.local('v{}'.format(var.name))
return self.local_decl_stmt(init_expr, self.var_decl(t, v))
@staticmethod
def literal_null():
return JSONWriter.literal('null', ('.null', 0))
@staticmethod
def literal_double(f):
return JSONWriter.literal(str(f), ('.double', 0))
@staticmethod
def literal_float(f):
return JSONWriter.literal(str(f) + 'f', ('.float', 0))
@staticmethod
def literal_long(b):
return JSONWriter.literal(str(b) + 'L', ('.long', 0))
@staticmethod
def literal_hex_int(b):
return JSONWriter.literal(hex(b), ('.int', 0))
@staticmethod
def literal_int(b):
return JSONWriter.literal(str(b), ('.int', 0))
@staticmethod
def literal_bool(b):
return JSONWriter.literal(str(b).lower(), ('.boolean', 0))
@staticmethod
def literal_class(desc):
return JSONWriter.literal(JSONWriter.parse_descriptor(desc), ('java/lang/Class', 0))
@staticmethod
def literal_string(s):
return JSONWriter.literal(str(s), ('java/lang/String', 0))
@staticmethod
def parse_descriptor(desc: str) -> list:
dim = 0
while desc and desc[0] == '[':
desc = desc[1:]
dim += 1
if desc in TYPE_DESCRIPTOR:
return JSONWriter.typen('.' + TYPE_DESCRIPTOR[desc], dim)
if desc and desc[0] == 'L' and desc[-1] == ';':
return JSONWriter.typen(desc[1:-1], dim)
# invalid descriptor (probably None)
return JSONWriter.dummy(str(desc))
@staticmethod
def _append(sb, stmt):
# Add a statement to the end of a statement block
assert (sb[0] == 'BlockStatement')
if stmt is not None:
sb[2].append(stmt)
@staticmethod
def statement_block():
# Create empty statement block (statements to be appended later)
# Note, the code below assumes this can be modified in place
return ['BlockStatement', None, []]
@staticmethod
def switch_stmt(cond_expr, ksv_pairs):
return ['SwitchStatement', None, cond_expr, ksv_pairs]
@staticmethod
def if_stmt(cond_expr, scopes):
return ['IfStatement', None, cond_expr, scopes]
@staticmethod
def try_stmt(tryb, pairs):
return ['TryStatement', None, tryb, pairs]
@staticmethod
def loop_stmt(isdo, cond_expr, body):
type_ = 'DoStatement' if isdo else 'WhileStatement'
return [type_, None, cond_expr, body]
@staticmethod
def jump_stmt(keyword):
return ['JumpStatement', keyword, None]
@staticmethod
def throw_stmt(expr):
return ['ThrowStatement', expr]
@staticmethod
def return_stmt(expr):
return ['ReturnStatement', expr]
@staticmethod
def local_decl_stmt(expr, decl):
return ['LocalDeclarationStatement', expr, decl]
@staticmethod
def expression_stmt(expr):
return ['ExpressionStatement', expr]
@staticmethod
def dummy(*args):
return ['Dummy', args]
@staticmethod
def var_decl(typen, var):
return [typen, var]
@staticmethod
def unary_postfix(left, op):
return ['Unary', [left], op, True]
@staticmethod
def unary_prefix(op, left):
return ['Unary', [left], op, False]
@staticmethod
def typen(baset: str, dim: int) -> list:
return ['TypeName', (baset, dim)]
@staticmethod
def parenthesis(expr):
return ['Parenthesis', [expr]]
@staticmethod
def method_invocation(triple, name, base, params):
if base is None:
return ['MethodInvocation', params, triple, name, False]
return ['MethodInvocation', [base] + params, triple, name, True]
@staticmethod
def local(name):
return ['Local', name]
@staticmethod
def literal(result, tt):
return ['Literal', result, tt]
@staticmethod
def field_access(triple, left):
return ['FieldAccess', [left], triple]
@staticmethod
def cast(tn, arg):
return ['Cast', [tn, arg]]
@staticmethod
def binary_infix(op, left, right):
return ['BinaryInfix', [left, right], op]
@staticmethod
def assignment(lhs, rhs, op=''):
return ['Assignment', [lhs, rhs], op]
@staticmethod
def array_initializer(params, tn=None):
return ['ArrayInitializer', params, tn]
@staticmethod
def array_creation(tn, params, dim):
return ['ArrayCreation', [tn] + params, dim]
@staticmethod
def array_access(arr, ind) -> list:
return ['ArrayAccess', [arr, ind]]

View File

@ -35,13 +35,7 @@ import androguard.decompiler.util as util
from androguard.core.analysis import analysis
from androguard.core import apk, dex
from androguard.decompiler.control_flow import identify_structures
from androguard.decompiler.dast import (
JSONWriter,
parse_descriptor,
literal_string,
literal_hex_int,
dummy
)
from androguard.decompiler.dast import JSONWriter
from androguard.decompiler.dataflow import (
build_def_use,
place_declarations,
@ -63,17 +57,17 @@ def get_field_ast(field: EncodedField) -> dict:
expr = None
if field.init_value:
val = field.init_value.value
expr = dummy(str(val))
expr = JSONWriter.dummy(str(val))
if val is not None:
if field.get_descriptor() == 'Ljava/lang/String;':
expr = literal_string(val)
expr = JSONWriter.literal_string(val)
elif field.proto == 'B':
expr = literal_hex_int(struct.unpack('<b', struct.pack("B", val))[0])
expr = JSONWriter.literal_hex_int(struct.unpack('<b', struct.pack("B", val))[0])
return {
'triple': triple,
'type': parse_descriptor(field.get_descriptor()),
'type': JSONWriter.parse_descriptor(field.get_descriptor()),
'flags': util.get_access_field(field.get_access_flags()),
'expr': expr,
}
@ -314,11 +308,11 @@ class DvClass:
isInterface = 'interface' in self.access
return {
'rawname': self.thisclass[1:-1],
'name': parse_descriptor(self.thisclass),
'super': parse_descriptor(self.superclass),
'name': JSONWriter.parse_descriptor(self.thisclass),
'super': JSONWriter.parse_descriptor(self.superclass),
'flags': self.access,
'isInterface': isInterface,
'interfaces': list(map(parse_descriptor, self.interfaces)),
'interfaces': list(map(JSONWriter.parse_descriptor, self.interfaces)),
'fields': fields,
'methods': methods,
}

View File

@ -23,7 +23,7 @@ class Session:
>>> Should we go back to pickling or proceed further with the dataset ?<<<
"""
def __init__(self, export_ipython:bool=False) -> None:
def __init__(self, export_ipython:bool=False, db_url:str='sqlite:///androguard.db') -> None:
"""
Create a new Session object
@ -33,7 +33,7 @@ class Session:
self._setup_objects()
self.export_ipython = export_ipython
self.db = dataset.connect('sqlite:///androguard.db')
self.db = dataset.connect(db_url)
logger.info("Opening database {}".format(self.db))
self.table_information = self.db["information"]
self.table_session = self.db["session"]

View File

@ -1,7 +1,9 @@
import sys
from typing import Union, BinaryIO
from asn1crypto import keys, x509
import hashlib
import binascii
#  External dependencies
# import asn1crypto
@ -93,3 +95,95 @@ def get_certificate_name_string(name:Union[dict,Name], short:bool=False, delimit
'organization_identifier': ("organizationIdentifier", "organizationIdentifier"),
}
return delimiter.join(["{}={}".format(_.get(attr, (attr, attr))[0 if short else 1], name[attr]) for attr in name])
def parse_public(data):
from asn1crypto import pem, keys, x509
"""
Loads a public key from a DER or PEM-formatted input.
Supports RSA, DSA, EC public keys, and X.509 certificates.
:param data: A byte string of the public key or certificate
:raises ValueError: If the input data is not a known format
:return: A keys.PublicKeyInfo object containing the parsed public key
"""
# Check if the data is in PEM format (starts with "-----")
if pem.detect(data):
type_name, _, der_bytes = pem.unarmor(data)
if type_name in ['PRIVATE KEY', 'RSA PRIVATE KEY']:
raise ValueError("The data specified appears to be a private key, not a public key.")
else:
# If not PEM, assume it's DER-encoded
der_bytes = data
# Try to parse the data as PublicKeyInfo (standard public key structure)
try:
public_key_info = keys.PublicKeyInfo.load(der_bytes)
public_key_info.native # Fully parse the object (asn1crypto is lazy)
return public_key_info
except ValueError:
pass # Not a PublicKeyInfo structure
# Try to parse the data as an X.509 certificate
try:
certificate = x509.Certificate.load(der_bytes)
public_key_info = certificate['tbs_certificate']['subject_public_key_info']
public_key_info.native # Fully parse the object
return public_key_info
except ValueError:
pass # Not a certificate
# Try to parse the data as RSAPublicKey
try:
rsa_public_key = keys.RSAPublicKey.load(der_bytes)
rsa_public_key.native # Fully parse the object
# Wrap the RSAPublicKey in PublicKeyInfo
return keys.PublicKeyInfo.wrap(rsa_public_key, 'rsa')
except ValueError:
pass # Not an RSAPublicKey structure
raise ValueError("The data specified does not appear to be a known public key or certificate format.")
def calculate_fingerprint(key_object):
"""
Calculates a SHA-256 fingerprint of the public key based on its components.
:param key_object: A keys.PublicKeyInfo object containing the parsed public key
:return: The fingerprint of the public key as a byte string
"""
to_hash = None
# RSA Public Key
if key_object.algorithm == 'rsa':
key = key_object['public_key'].parsed
# Prepare string with modulus and public exponent
to_hash = '%d:%d' % (key['modulus'].native, key['public_exponent'].native)
# DSA Public Key
elif key_object.algorithm == 'dsa':
key = key_object['public_key'].parsed
params = key_object['algorithm']['parameters']
# Prepare string with p, q, g, and public key
to_hash = '%d:%d:%d:%d' % (
params['p'].native,
params['q'].native,
params['g'].native,
key.native,
)
# EC Public Key
elif key_object.algorithm == 'ec':
public_key = key_object['public_key'].native
# Prepare byte string with curve name and public key
to_hash = '%s:' % key_object.curve[1]
to_hash = to_hash.encode('utf-8') + public_key
# Ensure to_hash is encoded as bytes if it's a string
if isinstance(to_hash, str):
to_hash = to_hash.encode('utf-8')
# Return the SHA-256 hash of the formatted key data
return hashlib.sha256(to_hash).digest()

View File

@ -28,7 +28,6 @@ apkInspector = ">=1.1.7"
matplotlib = "*"
networkx = "*"
pyyaml = "*"
oscrypto = ">=1.3.0"
[tool.setuptools.package_data]
"androguard.core.api_specific_resources" = ["aosp_permissions/*.json", "api_permission_mappings/*.json"]

View File

@ -738,5 +738,16 @@ class APKTest(unittest.TestCase):
self.assertEqual(a.get_app_name(locale='ru-rRU'), "values-ru-rRU")
def testPublicKeysofApk(self):
a = APK(os.path.join(test_dir, 'data/APK/com.example.android.wearable.wear.weardrawers.apk'))
pkeys = set(a.get_public_keys_der_v3() + a.get_public_keys_der_v2())
for public_key in pkeys:
from androguard.util import parse_public
from androguard.util import calculate_fingerprint
parsed_key = parse_public(public_key)
self.assertEqual(parsed_key.algorithm, 'rsa')
self.assertEqual(parsed_key.bit_size, 2048)
self.assertEqual(calculate_fingerprint(parsed_key).hex(), '98917cd03c6277d73d58b661d614c442f2981a35a5aa122a61049215ba85c1d4')
if __name__ == '__main__':
unittest.main(failfast=True)