mirror of
https://github.com/androguard/androguard.git
synced 2025-02-18 12:47:47 +00:00
improve wm
This commit is contained in:
parent
94b353fdbe
commit
4a820b447b
@ -86,21 +86,43 @@ class WM :
|
||||
def __init__(self, andro, class_name, wm_type, output_file) :
|
||||
|
||||
if wm_type == [] :
|
||||
wm_type = [ WM_L3 ]
|
||||
raise("....")
|
||||
|
||||
fd = open(output_file, "w")
|
||||
|
||||
for method in andro.get_methods() :
|
||||
# FIXME
|
||||
_method, _vm = andro.get_method_descriptor(method.get_class_name(), method.get_name(), method.get_descriptor())
|
||||
wm.WM( _vm, _method, wm_type )
|
||||
fd.write("<?xml version=\"1.0\"?>\n")
|
||||
fd.write("<andro id=\"androguard wm for %s\">\n" % class_name)
|
||||
|
||||
a = analysis.VMBCA( andro.get_vm() )
|
||||
|
||||
for method in andro.get_methods() :
|
||||
_method, _vm = andro.get_method_descriptor(method.get_class_name(), method.get_name(), method.get_descriptor())
|
||||
w = wm.WM( _vm, _method, wm_type, a )
|
||||
|
||||
fd.write( w.save( ) )
|
||||
|
||||
fd.write("</andro>\n")
|
||||
|
||||
fd.close()
|
||||
|
||||
class WMCheck :
|
||||
def __init__(self, andro, intput_file) :
|
||||
pass
|
||||
def __init__(self, andro, class_name, input_file) :
|
||||
a = analysis.VMBCA( andro.get_vm() )
|
||||
|
||||
fd = open(input_file, "r")
|
||||
buffxml = fd.read()
|
||||
fd.close()
|
||||
|
||||
document = xml.dom.minidom.parseString(buffxml)
|
||||
|
||||
w_orig = wm.WMLoad( document )
|
||||
|
||||
for method in andro.get_methods() :
|
||||
_method, _vm = andro.get_method_descriptor(method.get_class_name(), method.get_name(), method.get_descriptor())
|
||||
|
||||
w_cmp = wm.WMCheck( w_orig, _vm, _method, a )
|
||||
#w_cmp.show()
|
||||
|
||||
|
||||
def OBFU_NAMES_GEN(prefix="") :
|
||||
return prefix + random.choice( string.letters ) + ''.join([ random.choice(string.letters + string.digits) for i in range(10 - 1) ] )
|
||||
|
@ -50,6 +50,22 @@ class ExternalFM :
|
||||
def get_descriptor(self) :
|
||||
return self.descriptor
|
||||
|
||||
class ToString :
|
||||
def __init__(self, tab) :
|
||||
self.__tab = tab
|
||||
self.__string = ""
|
||||
|
||||
def push(self, name) :
|
||||
for i in self.__tab :
|
||||
if name in self.__tab[i] :
|
||||
if len(self.__string) > 0 :
|
||||
if i == 'O' and self.__string[-1] == 'O' :
|
||||
continue
|
||||
self.__string += i
|
||||
|
||||
def get_string(self) :
|
||||
return self.__string
|
||||
|
||||
##### JVM ######
|
||||
FIELDS = {
|
||||
"getfield" : "R",
|
||||
@ -60,6 +76,12 @@ FIELDS = {
|
||||
|
||||
METHODS = [ "invokestatic", "invokevirtual", "invokespecial" ]
|
||||
|
||||
JVM_TOSTRING = { "O" : jvm.MATH_JVM_OPCODES.keys(),
|
||||
"I" : jvm.INVOKE_JVM_OPCODES,
|
||||
"G" : jvm.FIELD_READ_JVM_OPCODES,
|
||||
"P" : jvm.FIELD_WRITE_JVM_OPCODES
|
||||
}
|
||||
|
||||
class JVMBreakBlock :
|
||||
def __init__(self, _vm) :
|
||||
self.__ins = []
|
||||
@ -111,8 +133,8 @@ class JVMBreakBlock :
|
||||
desc = getattr(self.__vm, self.__info[t][0])( o[0], o[1], o[2] )
|
||||
|
||||
# It's an external
|
||||
if desc[0] == None :
|
||||
desc = ( ExternalFM( o[0], o[1], o[2] ), self.__vm )
|
||||
if desc == None :
|
||||
desc = ExternalFM( o[0], o[1], o[2] )
|
||||
|
||||
if desc not in self.__info[t][1] :
|
||||
self.__info[t][1][desc] = []
|
||||
@ -175,30 +197,43 @@ class GVM_BCA :
|
||||
self.__vm = _vm
|
||||
self.__method = _method
|
||||
|
||||
BO = [ jvm.BREAK_JVM_OPCODES, JVMBreakBlock ]
|
||||
|
||||
BO = { "B_O" : jvm.BREAK_JVM_OPCODES, "B_O_C" : JVMBreakBlock, "TS" : JVM_TOSTRING }
|
||||
if self.__vm.get_type() == "DVM" :
|
||||
BO = [ jvm.BREAK_DVM_OPCODES, DVMBreakBlock ]
|
||||
BO = { "B_O" : dvm.BREAK_DVM_OPCODES, "B_O_C" : DVMBreakBlock, "TS" : DVM_TOSTRING }
|
||||
|
||||
self.__TS = ToString( BO[ "TS" ] )
|
||||
|
||||
code = self.__method.get_code()
|
||||
|
||||
current_bb = BO[1]( self.__vm )
|
||||
current_bb = BO["B_O_C"]( self.__vm )
|
||||
self.__bb = [ current_bb ]
|
||||
|
||||
bc = code.get_bc()
|
||||
for i in bc.get() :
|
||||
name = i.get_name()
|
||||
|
||||
# String construction
|
||||
self.__TS.push( name )
|
||||
|
||||
m_name = name.split("_")[0]
|
||||
|
||||
current_bb.push( i )
|
||||
if name in BO[0] or m_name in BO[0] :
|
||||
if name in BO["B_O"] or m_name in BO["B_O"] :
|
||||
current_bb.analyze()
|
||||
|
||||
current_bb = BO[1]( self.__vm )
|
||||
current_bb = BO["B_O_C"]( self.__vm )
|
||||
self.__bb.append( current_bb )
|
||||
|
||||
if len( self.__bb ) > 1 :
|
||||
self.__bb.pop(-1)
|
||||
|
||||
def get_bb(self) :
|
||||
return self.__bb
|
||||
|
||||
def get_ts(self) :
|
||||
return self.__TS.get_string()
|
||||
|
||||
def get_method(self) :
|
||||
return self.__method
|
||||
|
||||
@ -214,6 +249,7 @@ class GVM_BCA :
|
||||
|
||||
def show(self) :
|
||||
print "METHOD", self.__method.get_class_name(), self.__method.get_name(), self.__method.get_descriptor()
|
||||
print "\t TOSTRING = ", self.__TS.get_string()
|
||||
# self.__method.show()
|
||||
|
||||
for i in self.__bb :
|
||||
@ -237,7 +273,7 @@ class GVM_BCA :
|
||||
for i in self.__bb :
|
||||
fields = i.get_fields()
|
||||
for field in fields :
|
||||
print "\t\t-->", field[0].get_class_name(), field[0].get_name(), field[0].get_descriptor()
|
||||
print "\t\t-->", field.get_class_name(), field.get_name(), field.get_descriptor()
|
||||
for context in fields[field] :
|
||||
print "\t\t\t |---|", context.mode, context.details
|
||||
|
||||
@ -247,7 +283,7 @@ class GVM_BCA :
|
||||
for i in self.__bb :
|
||||
methods = i.get_methods()
|
||||
for method in methods :
|
||||
print "\t\t-->", method[0].get_class_name(), method[0].get_name(), method[0].get_descriptor()
|
||||
print "\t\t-->", method.get_class_name(), method.get_name(), method.get_descriptor()
|
||||
for context in methods[method] :
|
||||
print "\t\t\t |---|", context.details
|
||||
|
||||
|
@ -275,6 +275,10 @@ MATH_JVM_OPCODES = { "iand" : '&',
|
||||
"ixor" : '^',
|
||||
}
|
||||
|
||||
INVOKE_JVM_OPCODES = [ "invokestatic", "invokevirtual", "invokespecial" ]
|
||||
FIELD_READ_JVM_OPCODES = [ "getfield", "getstatic" ]
|
||||
FIELD_WRITE_JVM_OPCODES = [ "putfield", "putstatic" ]
|
||||
|
||||
BREAK_JVM_OPCODES = [ "invokevirtual", "invokespecial", "invokestatic" ] + [ "areturn", "astore", "aastore", "bastore", "iinc", "istore", "iastore", "ireturn", "pop", "putfield", "putstatic" ] + [ "if_acmpeq", "if_icmpeq", "if_icmpne", "if_icmplt", "if_icmpge", "if_icmpgt", "if_icmple", "ifeq", "ifne", "iflt", "ifge", "ifgt", "ifle", "ifnonnull", "ifnull" ] #BRANCH_JVM_OPCODES
|
||||
|
||||
|
||||
|
109
core/wm/wm.py
109
core/wm/wm.py
@ -1,6 +1,8 @@
|
||||
import os, sys, hashlib, random, types, itertools, hashlib
|
||||
import os, sys, hashlib, random, types, itertools, hashlib, cPickle, base64
|
||||
|
||||
import analysis, misc
|
||||
from xml.sax.saxutils import escape, unescape
|
||||
|
||||
import misc
|
||||
|
||||
import wm_l1, wm_l3, wm_l4
|
||||
|
||||
@ -15,15 +17,26 @@ WM_BIND = {
|
||||
}
|
||||
|
||||
class WM :
|
||||
def __init__(self, vm, method, wm_type) :
|
||||
def __init__(self, vm, method, wm_type, analysis) :
|
||||
self.__method = method
|
||||
self.__wms = []
|
||||
|
||||
self.__a = analysis
|
||||
|
||||
list_x = []
|
||||
self.__a = analysis.VMBCA( vm )
|
||||
|
||||
|
||||
for i in wm_type :
|
||||
l_x = WM_BIND[ i ]( vm, method, self.__a ).get()
|
||||
wb = WM_BIND[ i ]( vm, method, self.__a )
|
||||
wb.run()
|
||||
|
||||
l_x = wb.get()
|
||||
|
||||
for z in l_x :
|
||||
list_x.append( z )
|
||||
|
||||
self.__wms.append( (i, wb) )
|
||||
|
||||
print list_x
|
||||
if list_x == [] :
|
||||
raise("list is empty")
|
||||
|
||||
@ -44,15 +57,80 @@ class WM :
|
||||
# print ob.verify_with_X( [ 12345668, 90877676, 878978, 87987673, 788789 ] )
|
||||
# print ob.verify_with_X( [ 12345668, 90877676, 878978, 4, 87987673 ] )
|
||||
# print ob.verify_with_X( [ 1, 2, 3, 4, 5, 6 ] )
|
||||
|
||||
def get_hash(self) :
|
||||
return self.__ob.get_hash()
|
||||
|
||||
def get_values(self) :
|
||||
l = []
|
||||
for k, v in self.__ob.get_points() :
|
||||
l.append( v )
|
||||
return l
|
||||
def save(self) :
|
||||
buffer = "<method class=\"%s\" name=\"%s\" descriptor=\"%s\">\n" % ( self.__method.get_class_name(), escape( self.__method.get_name() ), self.__method.get_descriptor() )
|
||||
buffer += "<sss>%s</sss>\n" % ( base64.b64encode( cPickle.dumps( self.__ob.get_y() ) ) )
|
||||
|
||||
|
||||
for i in self.__wms :
|
||||
buffer += "<wm type=\"%d\">%s</wm>\n" % ( i[0], base64.b64encode( cPickle.dumps( i[1].get_export_context() ) ) )
|
||||
|
||||
buffer += "</method>\n"
|
||||
|
||||
return buffer
|
||||
|
||||
class WMMLoad :
|
||||
def __init__(self, item) :
|
||||
self.__class_name = item.getAttribute('class')
|
||||
self.__name = unescape( item.getAttribute('name') )
|
||||
self.__descriptor = item.getAttribute('descriptor')
|
||||
|
||||
self.__wms = []
|
||||
|
||||
x = base64.b64decode( item.getElementsByTagName( 'sss' )[0].firstChild.data )
|
||||
# print cPickle.loads( x )
|
||||
|
||||
|
||||
for s_item in item.getElementsByTagName( 'wm' ) :
|
||||
_type = int( s_item.getAttribute('type') )
|
||||
|
||||
wb = WM_BIND[ _type ]( None, None, None )
|
||||
|
||||
x = cPickle.loads( base64.b64decode( s_item.firstChild.data ) )
|
||||
wb.set_context( x )
|
||||
|
||||
|
||||
self.__wms.append( (_type, wb) )
|
||||
|
||||
def get_wms(self) :
|
||||
return self.__wms
|
||||
|
||||
def get_name(self) :
|
||||
return self.__name
|
||||
|
||||
class WMLoad :
|
||||
def __init__(self, document) :
|
||||
self.__methods = []
|
||||
|
||||
for item in document.getElementsByTagName('method') :
|
||||
self.__methods.append( WMMLoad( item ) )
|
||||
|
||||
def get_methods(self) :
|
||||
return self.__methods
|
||||
|
||||
class WMCheck :
|
||||
def __init__(self, wm_orig, vm, method, analysis) :
|
||||
|
||||
print method.get_name()
|
||||
|
||||
for _method in wm_orig.get_methods() :
|
||||
list_x = []
|
||||
print "\t --->", _method.get_name()
|
||||
for _type, _wm in _method.get_wms() :
|
||||
wb = WM_BIND[ _type ]( vm, method, analysis )
|
||||
|
||||
wb.set_context( _wm.get_import_context() )
|
||||
wb.run()
|
||||
|
||||
l_x = _wm.challenge( wb )
|
||||
|
||||
for i in l_x :
|
||||
list_x.append( i )
|
||||
|
||||
print "\t\t X :", list_x
|
||||
#print wm_orig.get_dwbo().verify_with_X( list_x )
|
||||
print ""
|
||||
|
||||
class Polynomial :
|
||||
def __init__(self, degree, secret_long, length) :
|
||||
@ -221,6 +299,9 @@ class DWBO :
|
||||
result, success = self.__sss.join( coord_x, self.__points.values() )
|
||||
return result, success
|
||||
|
||||
def get_y(self) :
|
||||
return [ self.__points[k] for k in self.__points ]
|
||||
|
||||
def get_points(self) :
|
||||
return self.__points
|
||||
|
||||
|
@ -1,19 +1,72 @@
|
||||
import misc
|
||||
|
||||
import hashlib
|
||||
|
||||
def INIT() :
|
||||
return WM_L1
|
||||
|
||||
def levenshtein(a,b):
|
||||
"Calculates the Levenshtein distance between a and b."
|
||||
n, m = len(a), len(b)
|
||||
if n > m:
|
||||
# Make sure n <= m, to use O(min(n,m)) space
|
||||
a,b = b,a
|
||||
n,m = m,n
|
||||
|
||||
current = range(n+1)
|
||||
for i in range(1,m+1):
|
||||
previous, current = current, [i]+[0]*n
|
||||
for j in range(1,n+1):
|
||||
add, delete = previous[j]+1, current[j-1]+1
|
||||
change = previous[j-1]
|
||||
if a[j-1] != b[i-1]:
|
||||
change = change + 1
|
||||
current[j] = min(add, delete, change)
|
||||
|
||||
return current[n]
|
||||
|
||||
class WM_L1 :
|
||||
def __init__(self, vm, method, analysis) :
|
||||
x = analysis.get(method)
|
||||
self.__vm = vm
|
||||
self.__method = method
|
||||
self.__analysis = analysis
|
||||
|
||||
self.__context = {
|
||||
"L_X" : [],
|
||||
"STRING" : "",
|
||||
}
|
||||
|
||||
self.set_context( )
|
||||
def run(self) :
|
||||
x = self.__analysis.get(self.__method)
|
||||
|
||||
raise("ooops")
|
||||
self.__context[ "STRING" ] = x.get_ts()
|
||||
|
||||
self.__context[ "L_X" ].append(
|
||||
misc.str2long( hashlib.md5( self.__context[ "STRING" ] ).hexdigest() )
|
||||
)
|
||||
|
||||
def challenge(self, external_wm) :
|
||||
distance = levenshtein( self.__context["STRING"], external_wm.get_context()["STRING"] )
|
||||
|
||||
# print distance
|
||||
|
||||
if distance <= 2 :
|
||||
return self.__context[ "L_X" ]
|
||||
|
||||
return []
|
||||
|
||||
def get(self) :
|
||||
return [ ]
|
||||
|
||||
def set_context(self, values) :
|
||||
pass
|
||||
return self.__context[ "L_X" ]
|
||||
|
||||
def set_context(self, values) :
|
||||
for x in values :
|
||||
self.__context[ x ] = values[ x ]
|
||||
|
||||
def get_context(self) :
|
||||
return self.__context
|
||||
|
||||
def get_export_context(self) :
|
||||
return self.__context
|
||||
|
||||
def get_import_context(self) :
|
||||
return {}
|
||||
|
@ -4,7 +4,6 @@ lua_code_init = '''
|
||||
%s = {}
|
||||
function %s:new()
|
||||
o = {}
|
||||
print "coucou"
|
||||
setmetatable(o, self)
|
||||
self.__index = self
|
||||
return o
|
||||
|
@ -1,19 +1,66 @@
|
||||
|
||||
import random
|
||||
|
||||
def INIT() :
|
||||
return WM_L4
|
||||
|
||||
class WM_L4 :
|
||||
def __init__(self, vm, method, analysis) :
|
||||
x = analysis.get(method)
|
||||
self.__vm = vm
|
||||
self.__method = method
|
||||
self.__analysis = analysis
|
||||
|
||||
self.__context = {
|
||||
"L_X" : [],
|
||||
"OP_BIND" : {},
|
||||
}
|
||||
|
||||
|
||||
self.set_context( { "op_bind" : { '&' : 500, '-' : 600, '+' : 700, '^' : 800 } } )
|
||||
def __init_context(self) :
|
||||
self.__context[ "OP_BIND" ][ '&' ] = self.new_randint()
|
||||
self.__context[ "OP_BIND" ][ '|' ] = self.new_randint()
|
||||
self.__context[ "OP_BIND" ][ '-' ] = self.new_randint()
|
||||
self.__context[ "OP_BIND" ][ '+' ] = self.new_randint()
|
||||
self.__context[ "OP_BIND" ][ '^' ] = self.new_randint()
|
||||
|
||||
print method.get_name(), x.get_ops()
|
||||
def run(self) :
|
||||
x = self.__analysis.get(self.__method)
|
||||
|
||||
if self.__context[ "OP_BIND" ] == {} :
|
||||
self.__init_context()
|
||||
|
||||
for i in x.get_bb() :
|
||||
v = 0
|
||||
for j in i.get_ops() :
|
||||
v = v + self.__context[ "OP_BIND" ][ j ]
|
||||
|
||||
if v != 0 :
|
||||
self.__context[ "L_X" ].append( v )
|
||||
|
||||
def new_randint(self) :
|
||||
x = random.randint(300, 1000)
|
||||
|
||||
for i in self.__context[ "OP_BIND" ] :
|
||||
if self.__context[ "OP_BIND" ][i] == x :
|
||||
self.new_randint()
|
||||
|
||||
return x
|
||||
|
||||
def challenge(self, external_wm) :
|
||||
return external_wm.get_context()["L_X"]
|
||||
|
||||
def get(self) :
|
||||
return [ 900090903, 980978789, 656767, 7667 ]
|
||||
return self.__context["L_X"]
|
||||
|
||||
def set_context(self, values) :
|
||||
pass
|
||||
for x in values :
|
||||
self.__context[ x ] = values[ x ]
|
||||
|
||||
def get_context(self) :
|
||||
return self.__context
|
||||
|
||||
def get_export_context(self) :
|
||||
return { "OP_BIND" : self.__context["OP_BIND"] }
|
||||
|
||||
def get_import_context(self) :
|
||||
return { "OP_BIND" : self.__context["OP_BIND"] }
|
||||
|
@ -6,13 +6,13 @@ sys.path.append(PATH_INSTALL + "./")
|
||||
|
||||
import androguard
|
||||
|
||||
TEST = './examples/android/Test/bin/classes/org/t0t0/android/Test1.class'
|
||||
TEST_STEAL = ''
|
||||
TEST_ANDRO = './examples/android/Test/bin/classes.dex'
|
||||
TEST_ORIG = './examples/android/Test/bin/classes/org/t0t0/android/Test1.class'
|
||||
TEST_JAVA_STEAL = './examples/java/test/orig/Test1.class'
|
||||
TEST_ANDRO_STEAL = './examples/android/Test/bin/classes.dex'
|
||||
|
||||
_a = androguard.AndroguardS( TEST )
|
||||
_a = androguard.AndroguardS( TEST_ORIG )
|
||||
|
||||
wm = androguard.WM( _a, "org.t0t0.android.Test1", [ androguard.WM_L1 ], "./wm.xml" )
|
||||
wm = androguard.WM( _a, "org.t0t0.android.Test1", [ androguard.WM_L1, androguard.WM_L4 ], "./wm.xml" )
|
||||
|
||||
_b = androguard.AndroguardS( TEST_ANDRO )
|
||||
androguard.WMCheck( _b, "./wm.xml" )
|
||||
_b = androguard.AndroguardS( TEST_JAVA_STEAL )
|
||||
androguard.WMCheck( _b, "org.t0t0.android.Test1", "./wm.xml" )
|
||||
|
Loading…
x
Reference in New Issue
Block a user